This is an automated email from the ASF dual-hosted git repository.
martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git
The following commit(s) were added to refs/heads/main by this push:
new f5372f2 [FLINK-33018][Tests] Fix flaky test when cancelling source
f5372f2 is described below
commit f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab
Author: Ryan Skraba <[email protected]>
AuthorDate: Mon Sep 11 17:44:13 2023 +0200
[FLINK-33018][Tests] Fix flaky test when cancelling source
---
.../connectors/gcp/pubsub/PubSubConsumingTest.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
index c6049d8..abcb15e 100644
---
a/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
+++
b/flink-connector-gcp-pubsub/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubConsumingTest.java
@@ -108,19 +108,19 @@ class PubSubConsumingTest {
Thread thread = createSourceThread(pubSubSource, lock, results);
try {
thread.start();
- awaitRecordCount(results, 2);
-
- // we do not emit the end of stream record
- assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A",
"B"));
- pubSubSource.snapshotState(0, 0);
- pubSubSource.notifyCheckpointComplete(0);
- // we acknowledge also the end of the stream record
- assertThat(testPubSubSubscriber.getAcknowledgedIds())
- .isEqualTo(Arrays.asList("1", "2", "3"));
+ // The source thread will finish automatically, without waiting
for records or
+ // explicitly cancelling the source.
} finally {
- pubSubSource.cancel();
thread.join();
}
+
+ // we do not emit the end of stream record
+ assertThat(new ArrayList<>(results)).isEqualTo(Arrays.asList("A",
"B"));
+ pubSubSource.snapshotState(0, 0);
+ pubSubSource.notifyCheckpointComplete(0);
+ // we acknowledge also the end of the stream record
+ assertThat(testPubSubSubscriber.getAcknowledgedIds())
+ .isEqualTo(Arrays.asList("1", "2", "3"));
}
@Test