This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 596eb9d9349b1f8459e1e7a25503227155016173
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Sep 6 00:02:04 2022 +0800

    [FLINK-28934][Connector/pulsar] Drop checkpoint id for unordered source.
---
 .../fetcher/PulsarUnorderedFetcherManager.java     |  8 ++--
 .../reader/source/PulsarUnorderedSourceReader.java |  5 +-
 .../split/PulsarUnorderedPartitionSplitReader.java |  2 +-
 .../pulsar/source/PulsarSourceITCase.java          | 15 ++++--
 .../testutils/runtime/PulsarRuntimeOperator.java   | 14 +-----
 .../runtime/container/PulsarContainerRuntime.java  | 56 ++++++++++++++++------
 .../testutils/runtime/mock/PulsarMockRuntime.java  |  5 +-
 7 files changed, 64 insertions(+), 41 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
index d2662f06b0a..1523b9a0fca 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
@@ -53,19 +53,19 @@ public class PulsarUnorderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T
         super(elementsQueue, splitReaderSupplier);
     }
 
-    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+    public List<PulsarPartitionSplit> snapshotState() {
         return fetchers.values().stream()
                 .map(SplitFetcher::getSplitReader)
-                .map(splitReader -> snapshotReader(checkpointId, splitReader))
+                .map(this::snapshotReader)
                 .filter(Optional::isPresent)
                 .map(Optional::get)
                 .collect(toCollection(() -> new ArrayList<>(fetchers.size())));
     }
 
     private Optional<PulsarPartitionSplit> snapshotReader(
-            long checkpointId, SplitReader<PulsarMessage<T>, 
PulsarPartitionSplit> splitReader) {
+            SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) {
         return ((PulsarUnorderedPartitionSplitReader<T>) splitReader)
-                .snapshotState(checkpointId)
+                .snapshotState()
                 .map(PulsarPartitionSplitState::toPulsarPartitionSplit);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index 77366c2c842..29f01afd959 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -141,11 +141,10 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
         LOG.debug("Trigger the new transaction for downstream readers.");
         List<PulsarPartitionSplit> splits =
-                ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager)
-                        .snapshotState(checkpointId);
+                ((PulsarUnorderedFetcherManager<OUT>) 
splitFetcherManager).snapshotState();
 
         if (coordinatorClient != null) {
-            // Snapshot the transaction status and commit it after checkpoint 
finished.
+            // Snapshot the transaction status and commit it after checkpoint 
finishing.
             List<TxnID> txnIDs =
                     transactionsToCommit.computeIfAbsent(checkpointId, id -> 
new ArrayList<>());
             for (PulsarPartitionSplit split : splits) {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index daf721c78db..cd55fa88076 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -138,7 +138,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
         }
     }
 
-    public Optional<PulsarPartitionSplitState> snapshotState(long 
checkpointId) {
+    public Optional<PulsarPartitionSplitState> snapshotState() {
         if (registeredSplit == null) {
             return Optional.empty();
         }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 8cc2e0afe32..75d8d58da43 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -28,9 +28,16 @@ import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalContext
 import 
org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
 import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
 import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.testutils.junit.FailsOnJava11;
 
-/** Unite test class for {@link PulsarSource}. */
-@SuppressWarnings("unused")
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Unit test class for {@link PulsarSource}. Used for {@link 
SubscriptionType#Exclusive}
+ * subscription.
+ */
+@Category(value = {FailsOnJava11.class})
 class PulsarSourceITCase extends SourceTestSuiteBase<String> {
 
     // Defines test environment on Flink MiniCluster
@@ -39,8 +46,8 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     // Defines pulsar running environment
     @ExternalSystem PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
-    // Defines a external context Factories,
-    // so test cases will be invoked using this external contexts.
+    // Defines an external context Factories,
+    // so test cases will be invoked using these external contexts.
     @ExternalContextFactory
     PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic =
             new PulsarTestContextFactory<>(pulsar, 
SingleTopicConsumingContext::new);
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 467eb82d141..fb01f7be557 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -292,8 +292,7 @@ public class PulsarRuntimeOperator implements Closeable {
      */
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, String key, Collection<T> 
messages) {
-        Producer<T> producer = createProducer(topic, schema);
-        try {
+        try (Producer<T> producer = createProducer(topic, schema)) {
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
@@ -304,20 +303,11 @@ public class PulsarRuntimeOperator implements Closeable {
                 MessageId messageId = builder.send();
                 messageIds.add(messageId);
             }
-
+            producer.flush();
             return messageIds;
         } catch (PulsarClientException e) {
             sneakyThrow(e);
             return emptyList();
-        } finally {
-            try {
-                // Waiting for all the pending messages be sent to the Pulsar.
-                producer.flush();
-                // Directly close without the flush will drop all the pending 
messages.
-                producer.close();
-            } catch (PulsarClientException e) {
-                // Just ignore the exception here.
-            }
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 55607678b2f..0b4c90d037b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -28,16 +28,17 @@ import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.utility.DockerImageName;
 
-import java.io.IOException;
 import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.DockerImageVersions.PULSAR;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
 import static org.testcontainers.containers.PulsarContainer.BROKER_PORT;
+import static org.testcontainers.containers.wait.strategy.Wait.forHttp;
 
 /**
  * {@link PulsarRuntime} implementation, use the TestContainers as the 
backend. We would start a
@@ -45,8 +46,9 @@ import static 
org.testcontainers.containers.PulsarContainer.BROKER_PORT;
  */
 public class PulsarContainerRuntime implements PulsarRuntime {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarContainerRuntime.class);
-    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
 
+    // The default host for connecting in docker environment.
+    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
     // This url is used on the container side.
     public static final String PULSAR_SERVICE_URL =
             String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, 
BROKER_PORT);
@@ -60,50 +62,74 @@ public class PulsarContainerRuntime implements 
PulsarRuntime {
      */
     private final PulsarContainer container = new 
PulsarContainer(DockerImageName.parse(PULSAR));
 
+    private final AtomicBoolean started = new AtomicBoolean(false);
+
+    private boolean boundFlink = false;
     private PulsarRuntimeOperator operator;
 
     public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> 
flinkContainer) {
+        checkArgument(
+                !started.get(),
+                "This Pulsar container has been started, we can't bind it to a 
Flink container.");
+
         this.container
                 .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME)
                 .dependsOn(flinkContainer)
                 .withNetwork(flinkContainer.getNetwork());
+        this.boundFlink = true;
         return this;
     }
 
     @Override
     public void startUp() {
-        // Prepare Pulsar Container.
+        boolean havenStartedBefore = started.compareAndSet(false, true);
+        if (!havenStartedBefore) {
+            LOG.warn("You have started the Pulsar Container. We will skip this 
execution.");
+            return;
+        }
+
+        // Override the default configuration in container for enabling the 
Pulsar transaction.
         container.withClasspathResourceMapping(
                 "containers/txnStandalone.conf",
                 "/pulsar/conf/standalone.conf",
                 BindMode.READ_ONLY);
-        container.addExposedPort(2181);
+        // Waiting for the Pulsar border is ready.
         container.waitingFor(
-                new HttpWaitStrategy()
+                forHttp("/admin/v2/namespaces/public/default")
                         .forPort(BROKER_HTTP_PORT)
                         .forStatusCode(200)
-                        .forPath("/admin/v2/namespaces/public/default")
                         .withStartupTimeout(Duration.ofMinutes(5)));
-
         // Start the Pulsar Container.
         container.start();
+        // Append the output to this runtime logger. Used for local debug 
purpose.
         container.followOutput(new 
Slf4jLogConsumer(LOG).withSeparateOutputStreams());
 
         // Create the operator.
-        this.operator =
-                new PulsarRuntimeOperator(
-                        container.getPulsarBrokerUrl(), 
container.getHttpServiceUrl());
+        if (boundFlink) {
+            this.operator =
+                    new PulsarRuntimeOperator(
+                            container.getPulsarBrokerUrl(),
+                            PULSAR_SERVICE_URL,
+                            container.getHttpServiceUrl(),
+                            PULSAR_ADMIN_URL);
+        } else {
+            this.operator =
+                    new PulsarRuntimeOperator(
+                            container.getPulsarBrokerUrl(), 
container.getHttpServiceUrl());
+        }
     }
 
     @Override
     public void tearDown() {
         try {
-            operator.close();
-            this.operator = null;
-        } catch (IOException e) {
+            if (operator != null) {
+                operator.close();
+            }
+            container.stop();
+            started.compareAndSet(true, false);
+        } catch (Exception e) {
             throw new IllegalStateException(e);
         }
-        container.stop();
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index bd5dd997c56..71384585c63 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -75,8 +75,9 @@ public class PulsarMockRuntime implements PulsarRuntime {
     public void tearDown() {
         try {
             pulsarService.close();
-            operator.close();
-            this.operator = null;
+            if (operator != null) {
+                operator.close();
+            }
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }

Reply via email to