This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 596eb9d9349b1f8459e1e7a25503227155016173 Author: Yufan Sheng <[email protected]> AuthorDate: Tue Sep 6 00:02:04 2022 +0800 [FLINK-28934][Connector/pulsar] Drop checkpoint id for unordered source. --- .../fetcher/PulsarUnorderedFetcherManager.java | 8 ++-- .../reader/source/PulsarUnorderedSourceReader.java | 5 +- .../split/PulsarUnorderedPartitionSplitReader.java | 2 +- .../pulsar/source/PulsarSourceITCase.java | 15 ++++-- .../testutils/runtime/PulsarRuntimeOperator.java | 14 +----- .../runtime/container/PulsarContainerRuntime.java | 56 ++++++++++++++++------ .../testutils/runtime/mock/PulsarMockRuntime.java | 5 +- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java index d2662f06b0a..1523b9a0fca 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java @@ -53,19 +53,19 @@ public class PulsarUnorderedFetcherManager<T> extends PulsarFetcherManagerBase<T super(elementsQueue, splitReaderSupplier); } - public List<PulsarPartitionSplit> snapshotState(long checkpointId) { + public List<PulsarPartitionSplit> snapshotState() { return fetchers.values().stream() .map(SplitFetcher::getSplitReader) - .map(splitReader -> snapshotReader(checkpointId, splitReader)) + .map(this::snapshotReader) .filter(Optional::isPresent) .map(Optional::get) .collect(toCollection(() -> new ArrayList<>(fetchers.size()))); } private Optional<PulsarPartitionSplit> snapshotReader( - long checkpointId, SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { + SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { return ((PulsarUnorderedPartitionSplitReader<T>) splitReader) - .snapshotState(checkpointId) + .snapshotState() .map(PulsarPartitionSplitState::toPulsarPartitionSplit); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java index 77366c2c842..29f01afd959 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java @@ -141,11 +141,10 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT public List<PulsarPartitionSplit> snapshotState(long checkpointId) { LOG.debug("Trigger the new transaction for downstream readers."); List<PulsarPartitionSplit> splits = - ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager) - .snapshotState(checkpointId); + ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager).snapshotState(); if (coordinatorClient != null) { - // Snapshot the transaction status and commit it after checkpoint finished. + // Snapshot the transaction status and commit it after checkpoint finishing. List<TxnID> txnIDs = transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList<>()); for (PulsarPartitionSplit split : splits) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index daf721c78db..cd55fa88076 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -138,7 +138,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl } } - public Optional<PulsarPartitionSplitState> snapshotState(long checkpointId) { + public Optional<PulsarPartitionSplitState> snapshotState() { if (registeredSplit == null) { return Optional.empty(); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java index 8cc2e0afe32..75d8d58da43 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java @@ -28,9 +28,16 @@ import org.apache.flink.connectors.test.common.junit.annotations.ExternalContext import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; +import org.apache.flink.testutils.junit.FailsOnJava11; -/** Unite test class for {@link PulsarSource}. */ -@SuppressWarnings("unused") +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.experimental.categories.Category; + +/** + * Unit test class for {@link PulsarSource}. Used for {@link SubscriptionType#Exclusive} + * subscription. + */ +@Category(value = {FailsOnJava11.class}) class PulsarSourceITCase extends SourceTestSuiteBase<String> { // Defines test environment on Flink MiniCluster @@ -39,8 +46,8 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> { // Defines pulsar running environment @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock()); - // Defines a external context Factories, - // so test cases will be invoked using this external contexts. + // Defines an external context Factories, + // so test cases will be invoked using these external contexts. @ExternalContextFactory PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic = new PulsarTestContextFactory<>(pulsar, SingleTopicConsumingContext::new); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index 467eb82d141..fb01f7be557 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -292,8 +292,7 @@ public class PulsarRuntimeOperator implements Closeable { */ public <T> List<MessageId> sendMessages( String topic, Schema<T> schema, String key, Collection<T> messages) { - Producer<T> producer = createProducer(topic, schema); - try { + try (Producer<T> producer = createProducer(topic, schema)) { List<MessageId> messageIds = new ArrayList<>(messages.size()); for (T message : messages) { @@ -304,20 +303,11 @@ public class PulsarRuntimeOperator implements Closeable { MessageId messageId = builder.send(); messageIds.add(messageId); } - + producer.flush(); return messageIds; } catch (PulsarClientException e) { sneakyThrow(e); return emptyList(); - } finally { - try { - // Waiting for all the pending messages be sent to the Pulsar. - producer.flush(); - // Directly close without the flush will drop all the pending messages. - producer.close(); - } catch (PulsarClientException e) { - // Just ignore the exception here. - } } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 55607678b2f..0b4c90d037b 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -28,16 +28,17 @@ import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.DockerImageName; -import java.io.IOException; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.DockerImageVersions.PULSAR; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT; import static org.testcontainers.containers.PulsarContainer.BROKER_PORT; +import static org.testcontainers.containers.wait.strategy.Wait.forHttp; /** * {@link PulsarRuntime} implementation, use the TestContainers as the backend. We would start a @@ -45,8 +46,9 @@ import static org.testcontainers.containers.PulsarContainer.BROKER_PORT; */ public class PulsarContainerRuntime implements PulsarRuntime { private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerRuntime.class); - private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar"; + // The default host for connecting in docker environment. + private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar"; // This url is used on the container side. public static final String PULSAR_SERVICE_URL = String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_PORT); @@ -60,50 +62,74 @@ public class PulsarContainerRuntime implements PulsarRuntime { */ private final PulsarContainer container = new PulsarContainer(DockerImageName.parse(PULSAR)); + private final AtomicBoolean started = new AtomicBoolean(false); + + private boolean boundFlink = false; private PulsarRuntimeOperator operator; public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> flinkContainer) { + checkArgument( + !started.get(), + "This Pulsar container has been started, we can't bind it to a Flink container."); + this.container .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME) .dependsOn(flinkContainer) .withNetwork(flinkContainer.getNetwork()); + this.boundFlink = true; return this; } @Override public void startUp() { - // Prepare Pulsar Container. + boolean havenStartedBefore = started.compareAndSet(false, true); + if (!havenStartedBefore) { + LOG.warn("You have started the Pulsar Container. We will skip this execution."); + return; + } + + // Override the default configuration in container for enabling the Pulsar transaction. container.withClasspathResourceMapping( "containers/txnStandalone.conf", "/pulsar/conf/standalone.conf", BindMode.READ_ONLY); - container.addExposedPort(2181); + // Waiting for the Pulsar border is ready. container.waitingFor( - new HttpWaitStrategy() + forHttp("/admin/v2/namespaces/public/default") .forPort(BROKER_HTTP_PORT) .forStatusCode(200) - .forPath("/admin/v2/namespaces/public/default") .withStartupTimeout(Duration.ofMinutes(5))); - // Start the Pulsar Container. container.start(); + // Append the output to this runtime logger. Used for local debug purpose. container.followOutput(new Slf4jLogConsumer(LOG).withSeparateOutputStreams()); // Create the operator. - this.operator = - new PulsarRuntimeOperator( - container.getPulsarBrokerUrl(), container.getHttpServiceUrl()); + if (boundFlink) { + this.operator = + new PulsarRuntimeOperator( + container.getPulsarBrokerUrl(), + PULSAR_SERVICE_URL, + container.getHttpServiceUrl(), + PULSAR_ADMIN_URL); + } else { + this.operator = + new PulsarRuntimeOperator( + container.getPulsarBrokerUrl(), container.getHttpServiceUrl()); + } } @Override public void tearDown() { try { - operator.close(); - this.operator = null; - } catch (IOException e) { + if (operator != null) { + operator.close(); + } + container.stop(); + started.compareAndSet(true, false); + } catch (Exception e) { throw new IllegalStateException(e); } - container.stop(); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java index bd5dd997c56..71384585c63 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java @@ -75,8 +75,9 @@ public class PulsarMockRuntime implements PulsarRuntime { public void tearDown() { try { pulsarService.close(); - operator.close(); - this.operator = null; + if (operator != null) { + operator.close(); + } } catch (Exception e) { throw new IllegalStateException(e); }
