This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a207e5e0f6fb0b33ea840d52054fdb1aaf1f8c8
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Sep 6 00:02:04 2022 +0800

    [FLINK-28934][Connector/pulsar] Support connector testing tools for Pulsar 
unordered source.
---
 .../f4d91193-72ba-4ce4-ad83-98f780dce581           |   6 +
 .../fetcher/PulsarUnorderedFetcherManager.java     |   8 +-
 .../reader/source/PulsarUnorderedSourceReader.java |   5 +-
 .../split/PulsarUnorderedPartitionSplitReader.java |   2 +-
 .../pulsar/source/PulsarSourceITCase.java          |  14 ++-
 .../pulsar/source/PulsarUnorderedSourceITCase.java | 103 +++++++++++++++
 .../cases/SharedSubscriptionConsumingContext.java  |  58 +++++++++
 .../testutils/runtime/PulsarRuntimeOperator.java   |  14 +--
 .../runtime/container/PulsarContainerRuntime.java  |   1 -
 .../runtime/embedded/PulsarEmbeddedRuntime.java    |   1 -
 .../testutils/runtime/mock/PulsarMockRuntime.java  |   5 +-
 .../testframe/testsuites/SourceTestSuiteBase.java  |  43 +++----
 .../testframe/utils/CollectIteratorAssertions.java |  11 +-
 .../utils/UnorderedCollectIteratorAssert.java      | 140 +++++++++++++++++++++
 14 files changed, 359 insertions(+), 52 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
 
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
index 452f99f423a..03d0ae583af 100644
--- 
a/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
+++ 
b/flink-connectors/flink-connector-pulsar/archunit-violations/f4d91193-72ba-4ce4-ad83-98f780dce581
@@ -2,5 +2,11 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase 
does not satisfy: on
 * reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
 * reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
+ or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
+org.apache.flink.connector.pulsar.source.PulsarUnorderedSourceITCase does not 
satisfy: only one of the following predicates match:\
+* reside in a package 'org.apache.flink.runtime.*' and contain any fields that 
are static, final, and of type InternalMiniClusterExtension and annotated with 
@RegisterExtension\
+* reside outside of package 'org.apache.flink.runtime.*' and contain any 
fields that are static, final, and of type MiniClusterExtension and annotated 
with @RegisterExtension\
+* reside in a package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class InternalMiniClusterExtension\
 * reside outside of package 'org.apache.flink.runtime.*' and is annotated with 
@ExtendWith with class MiniClusterExtension\
  or contain any fields that are public, static, and of type 
MiniClusterWithClientResource and final and annotated with @ClassRule or 
contain any fields that is of type MiniClusterWithClientResource and public and 
final and not static and annotated with @Rule
\ No newline at end of file
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
index d2662f06b0a..1523b9a0fca 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java
@@ -53,19 +53,19 @@ public class PulsarUnorderedFetcherManager<T> extends 
PulsarFetcherManagerBase<T
         super(elementsQueue, splitReaderSupplier);
     }
 
-    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
+    public List<PulsarPartitionSplit> snapshotState() {
         return fetchers.values().stream()
                 .map(SplitFetcher::getSplitReader)
-                .map(splitReader -> snapshotReader(checkpointId, splitReader))
+                .map(this::snapshotReader)
                 .filter(Optional::isPresent)
                 .map(Optional::get)
                 .collect(toCollection(() -> new ArrayList<>(fetchers.size())));
     }
 
     private Optional<PulsarPartitionSplit> snapshotReader(
-            long checkpointId, SplitReader<PulsarMessage<T>, 
PulsarPartitionSplit> splitReader) {
+            SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) {
         return ((PulsarUnorderedPartitionSplitReader<T>) splitReader)
-                .snapshotState(checkpointId)
+                .snapshotState()
                 .map(PulsarPartitionSplitState::toPulsarPartitionSplit);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
index b20b117652d..0d52b5e4bb4 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java
@@ -138,11 +138,10 @@ public class PulsarUnorderedSourceReader<OUT> extends 
PulsarSourceReaderBase<OUT
     public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
         LOG.debug("Trigger the new transaction for downstream readers.");
         List<PulsarPartitionSplit> splits =
-                ((PulsarUnorderedFetcherManager<OUT>) splitFetcherManager)
-                        .snapshotState(checkpointId);
+                ((PulsarUnorderedFetcherManager<OUT>) 
splitFetcherManager).snapshotState();
 
         if (coordinatorClient != null) {
-            // Snapshot the transaction status and commit it after checkpoint 
finished.
+            // Snapshot the transaction status and commit it after checkpoint 
finishing.
             List<TxnID> txnIDs =
                     transactionsToCommit.computeIfAbsent(checkpointId, id -> 
new ArrayList<>());
             for (PulsarPartitionSplit split : splits) {
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
index 73223acc9af..b5809ef9b7b 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -136,7 +136,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> 
extends PulsarPartitionSpl
         }
     }
 
-    public Optional<PulsarPartitionSplitState> snapshotState(long 
checkpointId) {
+    public Optional<PulsarPartitionSplitState> snapshotState() {
         if (registeredSplit == null) {
             return Optional.empty();
         }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index b28e449c678..54fc4d71dd5 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -30,11 +30,17 @@ import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem
 import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.testutils.junit.FailsOnJava11;
 
-/** Unite test class for {@link PulsarSource}. */
-@SuppressWarnings("unused")
-class PulsarSourceITCase extends SourceTestSuiteBase<String> {
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.experimental.categories.Category;
 
+/**
+ * Unit test class for {@link PulsarSource}. Used for {@link 
SubscriptionType#Exclusive}
+ * subscription.
+ */
+@Category(value = {FailsOnJava11.class})
+class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     // Defines test environment on Flink MiniCluster
     @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
 
@@ -46,7 +52,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
 
     // Defines an external context Factories,
-    // so test cases will be invoked using this external contexts.
+    // so test cases will be invoked using these external contexts.
     @TestContext
     PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic =
             new PulsarTestContextFactory<>(pulsar, 
SingleTopicConsumingContext::new);
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
new file mode 100644
index 00000000000..6bad7220886
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarUnorderedSourceITCase.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import 
org.apache.flink.connector.pulsar.testutils.cases.SharedSubscriptionConsumingContext;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
+import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestContext;
+import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.testutils.junit.FailsOnJava11;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Disabled;
+
+import java.util.List;
+
+import static java.util.concurrent.CompletableFuture.runAsync;
+import static 
org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertUnordered;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Unit test class for {@link PulsarSource}. Used for {@link 
SubscriptionType#Shared} subscription.
+ */
+@Category(value = {FailsOnJava11.class})
+public class PulsarUnorderedSourceITCase extends SourceTestSuiteBase<String> {
+    // Defines test environment on Flink MiniCluster
+    @TestEnv MiniClusterTestEnvironment flink = new 
MiniClusterTestEnvironment();
+
+    // Defines pulsar running environment
+    @TestExternalSystem
+    PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
+
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @TestContext
+    PulsarTestContextFactory<String, SharedSubscriptionConsumingContext> 
singleTopic =
+            new PulsarTestContextFactory<>(pulsar, 
SharedSubscriptionConsumingContext::new);
+
+    @Override
+    protected void checkResultWithSemantic(
+            CloseableIterator<String> resultIterator,
+            List<List<String>> testData,
+            CheckpointingMode semantic,
+            Integer limit) {
+        Runnable runnable =
+                () ->
+                        assertUnordered(resultIterator)
+                                .withNumRecordsLimit(getExpectedSize(testData, 
limit))
+                                .matchesRecordsFromSource(testData, semantic);
+
+        
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+    }
+
+    /**
+     * Shared subscription will have multiple readers on same partition, this 
would make hard to
+     * automatically stop like a bounded source.
+     */
+    private static int getExpectedSize(List<List<String>> testData, Integer 
limit) {
+        if (limit == null) {
+            return testData.stream().mapToInt(List::size).sum();
+        } else {
+            return limit;
+        }
+    }
+
+    @Override
+    @Disabled("We don't have any idle readers in Pulsar's shared 
subscription.")
+    public void testIdleReader(
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<String> externalContext,
+            CheckpointingMode semantic)
+            throws Exception {
+        super.testIdleReader(testEnv, externalContext, semantic);
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java
new file mode 100644
index 00000000000..8001b6e7300
--- /dev/null
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SharedSubscriptionConsumingContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A consuming context with {@link SubscriptionType#Shared}, it's almost the 
same as {@link
+ * MultipleTopicConsumingContext}.
+ */
+public class SharedSubscriptionConsumingContext extends 
MultipleTopicTemplateContext {
+
+    public SharedSubscriptionConsumingContext(PulsarTestEnvironment 
environment) {
+        this(environment, Collections.emptyList());
+    }
+
+    public SharedSubscriptionConsumingContext(
+            PulsarTestEnvironment environment, List<URL> connectorJarPaths) {
+        super(environment, connectorJarPaths);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message with shared subscription";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "flink-shared-subscription-test";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Shared;
+    }
+}
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index 1fe98128e3d..9787a8ceeb9 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -299,8 +299,7 @@ public class PulsarRuntimeOperator implements Closeable {
      */
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, String key, Collection<T> 
messages) {
-        Producer<T> producer = createProducer(topic, schema);
-        try {
+        try (Producer<T> producer = createProducer(topic, schema)) {
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
@@ -311,20 +310,11 @@ public class PulsarRuntimeOperator implements Closeable {
                 MessageId messageId = builder.send();
                 messageIds.add(messageId);
             }
-
+            producer.flush();
             return messageIds;
         } catch (PulsarClientException e) {
             sneakyThrow(e);
             return emptyList();
-        } finally {
-            try {
-                // Waiting for all the pending messages be sent to the Pulsar.
-                producer.flush();
-                // Directly close without the flush will drop all the pending 
messages.
-                producer.close();
-            } catch (PulsarClientException e) {
-                // Just ignore the exception here.
-            }
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 3d66728fded..10bd5120a46 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -124,7 +124,6 @@ public class PulsarContainerRuntime implements 
PulsarRuntime {
         try {
             if (operator != null) {
                 operator.close();
-                this.operator = null;
             }
             container.stop();
             started.compareAndSet(true, false);
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
index 07981cc3d2a..2ca9a51f3c5 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/embedded/PulsarEmbeddedRuntime.java
@@ -86,7 +86,6 @@ public class PulsarEmbeddedRuntime implements PulsarRuntime {
         try {
             if (operator != null) {
                 operator.close();
-                this.operator = null;
             }
             if (pulsarService != null) {
                 pulsarService.close();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index a86ff5283f5..326c3ff6a89 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -65,8 +65,9 @@ public class PulsarMockRuntime implements PulsarRuntime {
     public void tearDown() {
         try {
             pulsarService.close();
-            operator.close();
-            this.operator = null;
+            if (operator != null) {
+                operator.close();
+            }
         } catch (Exception e) {
             throw new IllegalStateException(e);
         }
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index b345b4114e5..9fe1900be90 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -64,17 +64,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.Collections.singletonList;
+import static java.util.concurrent.CompletableFuture.runAsync;
 import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
 import static 
org.apache.flink.connector.testframe.utils.MetricQuerier.getJobDetails;
 import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
@@ -153,11 +152,11 @@ public abstract class SourceTestSuiteBase<T> {
         try (CollectResultIterator<T> resultIterator = 
iteratorBuilder.build(jobClient)) {
             // Check test result
             LOG.info("Checking test results");
-            checkResultWithSemantic(resultIterator, 
Arrays.asList(testRecords), semantic, null);
+            checkResultWithSemantic(resultIterator, 
singletonList(testRecords), semantic, null);
         }
 
         // Step 5: Clean up
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
+        waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED));
     }
 
     /**
@@ -341,7 +340,7 @@ public abstract class SourceTestSuiteBase<T> {
                         .stopWithSavepoint(
                                 true, testEnv.getCheckpointUri(), 
SavepointFormatType.CANONICAL)
                         .get(30, TimeUnit.SECONDS);
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
+        waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED));
 
         // Step 5: Generate new test data
         final List<List<T>> newTestRecordCollections = new ArrayList<>();
@@ -372,7 +371,7 @@ public abstract class SourceTestSuiteBase<T> {
 
         final JobClient restartJobClient = restartEnv.executeAsync("Restart 
Test");
 
-        waitForJobStatus(restartJobClient, 
Collections.singletonList(JobStatus.RUNNING));
+        waitForJobStatus(restartJobClient, singletonList(JobStatus.RUNNING));
 
         try {
             iterator.setJobClient(restartJobClient);
@@ -526,7 +525,7 @@ public abstract class SourceTestSuiteBase<T> {
         }
 
         // Step 5: Clean up
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.FINISHED));
+        waitForJobStatus(jobClient, singletonList(JobStatus.FINISHED));
     }
 
     /**
@@ -589,7 +588,7 @@ public abstract class SourceTestSuiteBase<T> {
         LOG.info("Checking records before killing TaskManagers");
         checkResultWithSemantic(
                 iterator,
-                Arrays.asList(testRecordsBeforeFailure),
+                singletonList(testRecordsBeforeFailure),
                 semantic,
                 testRecordsBeforeFailure.size());
 
@@ -598,7 +597,7 @@ public abstract class SourceTestSuiteBase<T> {
         controller.triggerTaskManagerFailover(jobClient, () -> {});
 
         LOG.info("Waiting for job recovering from failure");
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.RUNNING));
+        waitForJobStatus(jobClient, singletonList(JobStatus.RUNNING));
 
         // Step 6: Write test data again to external system
         List<T> testRecordsAfterFailure =
@@ -614,13 +613,13 @@ public abstract class SourceTestSuiteBase<T> {
         LOG.info("Checking records after job failover");
         checkResultWithSemantic(
                 iterator,
-                Arrays.asList(testRecordsAfterFailure),
+                singletonList(testRecordsAfterFailure),
                 semantic,
                 testRecordsAfterFailure.size());
 
         // Step 8: Clean up
         terminateJob(jobClient);
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.CANCELED));
+        waitForJobStatus(jobClient, singletonList(JobStatus.CANCELED));
         iterator.close();
     }
 
@@ -727,21 +726,19 @@ public abstract class SourceTestSuiteBase<T> {
      * @param semantic the supported semantic, see {@link CheckpointingMode}
      * @param limit expected number of the data to read from the job
      */
-    private void checkResultWithSemantic(
+    protected void checkResultWithSemantic(
             CloseableIterator<T> resultIterator,
             List<List<T>> testData,
             CheckpointingMode semantic,
             Integer limit) {
         if (limit != null) {
-            assertThat(
-                            CompletableFuture.supplyAsync(
-                                    () -> {
-                                        
CollectIteratorAssertions.assertThat(resultIterator)
-                                                .withNumRecordsLimit(limit)
-                                                
.matchesRecordsFromSource(testData, semantic);
-                                        return true;
-                                    }))
-                    .succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
+            Runnable runnable =
+                    () ->
+                            
CollectIteratorAssertions.assertThat(resultIterator)
+                                    .withNumRecordsLimit(limit)
+                                    .matchesRecordsFromSource(testData, 
semantic);
+
+            
assertThat(runAsync(runnable)).succeedsWithin(DEFAULT_COLLECT_DATA_TIMEOUT);
         } else {
             CollectIteratorAssertions.assertThat(resultIterator)
                     .matchesRecordsFromSource(testData, semantic);
@@ -768,7 +765,7 @@ public abstract class SourceTestSuiteBase<T> {
 
     private void killJob(JobClient jobClient) throws Exception {
         terminateJob(jobClient);
-        waitForJobStatus(jobClient, 
Collections.singletonList(JobStatus.CANCELED));
+        waitForJobStatus(jobClient, singletonList(JobStatus.CANCELED));
     }
 
     /** Builder class for constructing {@link CollectResultIterator} of 
collect sink. */
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
index 8bbb52972ef..8e10c60488e 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
@@ -24,8 +24,17 @@ import java.util.Iterator;
  * Entry point for assertion methods for {@link CollectIteratorAssert}. Each 
method in this class is
  * a static factory.
  */
-public class CollectIteratorAssertions {
+public final class CollectIteratorAssertions {
+
+    private CollectIteratorAssertions() {
+        // no constructor.
+    }
+
     public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual) {
         return new CollectIteratorAssert<>(actual);
     }
+
+    public static <T> UnorderedCollectIteratorAssert<T> 
assertUnordered(Iterator<T> actual) {
+        return new UnorderedCollectIteratorAssert<>(actual);
+    }
 }
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
new file mode 100644
index 00000000000..8b03ecd798f
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/UnorderedCollectIteratorAssert.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * This assertion used to compare records in the collect iterator to the 
target test data with
+ * different semantics (AT_LEAST_ONCE, EXACTLY_ONCE) for unordered messages.
+ *
+ * @param <T> The type of records in the test data and collect iterator
+ */
+public class UnorderedCollectIteratorAssert<T>
+        extends AbstractAssert<UnorderedCollectIteratorAssert<T>, Iterator<T>> 
{
+
+    private final Iterator<T> collectorIterator;
+    private final Set<T> allRecords;
+    private final Set<T> matchedRecords;
+
+    private Integer limit = null;
+
+    protected UnorderedCollectIteratorAssert(Iterator<T> collectorIterator) {
+        super(collectorIterator, UnorderedCollectIteratorAssert.class);
+        this.collectorIterator = collectorIterator;
+        this.allRecords = new HashSet<>();
+        this.matchedRecords = new HashSet<>();
+    }
+
+    public UnorderedCollectIteratorAssert<T> withNumRecordsLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
+    public void matchesRecordsFromSource(
+            List<List<T>> recordsBySplitsFromSource, CheckpointingMode 
semantic) {
+        for (List<T> list : recordsBySplitsFromSource) {
+            for (T t : list) {
+                assertTrue(allRecords.add(t), "All the records should be 
unique.");
+            }
+        }
+
+        if (limit != null && limit > allRecords.size()) {
+            throw new IllegalArgumentException(
+                    "Limit validation size should be less than or equal to 
total number of records from source");
+        }
+
+        switch (semantic) {
+            case AT_LEAST_ONCE:
+                compareWithAtLeastOnceSemantic();
+                break;
+            case EXACTLY_ONCE:
+                compareWithExactlyOnceSemantic();
+                break;
+            default:
+                throw new IllegalArgumentException("Unrecognized semantic \"" 
+ semantic + "\"");
+        }
+    }
+
+    private void compareWithAtLeastOnceSemantic() {
+        int recordCounter = 0;
+        while (collectorIterator.hasNext()) {
+            final T record = collectorIterator.next();
+            if (allRecords.contains(record)) {
+                if (matchedRecords.add(record)) {
+                    recordCounter++;
+                }
+            } else {
+                throw new IllegalArgumentException("Record " + record + " is 
not expected.");
+            }
+
+            if (limit != null && recordCounter >= limit) {
+                break;
+            }
+        }
+
+        verifyMatchedRecords();
+    }
+
+    private void compareWithExactlyOnceSemantic() {
+        int recordCounter = 0;
+        while (collectorIterator.hasNext()) {
+            final T record = collectorIterator.next();
+            if (allRecords.contains(record)) {
+                assertTrue(
+                        matchedRecords.add(record),
+                        "Record " + record + " is duplicated in 
exactly-once.");
+                recordCounter++;
+            } else {
+                throw new IllegalArgumentException("Record " + record + " is 
not expected.");
+            }
+
+            if (limit != null && recordCounter >= limit) {
+                break;
+            }
+        }
+
+        verifyMatchedRecords();
+    }
+
+    private void verifyMatchedRecords() {
+        if (limit == null && allRecords.size() > matchedRecords.size()) {
+            Set<T> missingResults =
+                    
allRecords.stream().filter(not(matchedRecords::contains)).collect(toSet());
+            if (!missingResults.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Expected to have "
+                                + allRecords.size()
+                                + " elements. But we missing: "
+                                + missingResults);
+            }
+        }
+    }
+}

Reply via email to