This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 68e9c56d73197d0bf03e4119de236433913f9555 Author: Ran Tao <[email protected]> AuthorDate: Sat Mar 18 17:55:01 2023 +0800 [FLINK-31319][connectors/kafka] Fix kafka new source partitionDiscoveryIntervalMs error condition check cause bounded source can not quit This closes #8. --- .../source/enumerator/KafkaSourceEnumerator.java | 2 +- .../source/enumerator/KafkaEnumeratorTest.java | 113 +++++++++++++++++++-- 2 files changed, 108 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 9cf233c5..137f4204 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -298,7 +298,7 @@ public class KafkaSourceEnumerator if (t != null) { throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t); } - if (partitionDiscoveryIntervalMs < 0) { + if (partitionDiscoveryIntervalMs <= 0) { LOG.debug("Partition discovery is disabled."); noMoreNewPartitionSplits = true; } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 8c39b547..8d0d3fc1 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -98,6 +98,10 @@ public class KafkaEnumeratorTest { assertThat(context.getOneTimeCallables()) .as("A one time partition discovery callable should have been scheduled") .hasSize(1); + + // enumerator just start noMoreNewPartitionSplits will be false + assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) + .isFalse(); } } @@ -115,6 +119,10 @@ public class KafkaEnumeratorTest { assertThat(context.getPeriodicCallables()) .as("A periodic partition discovery callable should have been scheduled") .hasSize(1); + + // enumerator just start noMoreNewPartitionSplits will be false + assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) + .isFalse(); } } @@ -166,6 +174,78 @@ public class KafkaEnumeratorTest { } } + @Test + public void testRunWithDiscoverPartitionsOnceToCheckNoMoreSplit() throws Throwable { + try (MockSplitEnumeratorContext<KafkaPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + KafkaSourceEnumerator enumerator = + createEnumerator(context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { + + // Start the enumerator and it should schedule a one time task to discover and assign + // partitions. + enumerator.start(); + assertThat(context.getOneTimeCallables()) + .as("A one time partition discovery callable should have been scheduled") + .hasSize(1); + assertThat(context.getPeriodicCallables()).isEmpty(); + // Run the partition discover callable and check the partition assignment. + runOneTimePartitionDiscovery(context); + + // enumerator noMoreNewPartitionSplits first will be false, when execute + // handlePartitionSplitChanges will be set true + assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) + .isTrue(); + } + } + + @Test + public void testRunWithPeriodicPartitionDiscoveryOnceToCheckNoMoreSplit() throws Throwable { + try (MockSplitEnumeratorContext<KafkaPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + KafkaSourceEnumerator enumerator = + createEnumerator(context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { + + // Start the enumerator and it should schedule a one time task to discover and assign + // partitions. + enumerator.start(); + assertThat(context.getOneTimeCallables()).isEmpty(); + assertThat(context.getPeriodicCallables()) + .as("A periodic partition discovery callable should have been scheduled") + .hasSize(1); + // Run the partition discover callable and check the partition assignment. + runPeriodicPartitionDiscovery(context); + + // enumerator noMoreNewPartitionSplits first will be false, even when execute + // handlePartitionSplitChanges it still be false + assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) + .isFalse(); + } + } + + @Test + public void testRunWithDiscoverPartitionsOnceWithZeroMsToCheckNoMoreSplit() throws Throwable { + try (MockSplitEnumeratorContext<KafkaPartitionSplit> context = + new MockSplitEnumeratorContext<>(NUM_SUBTASKS); + // set partitionDiscoveryIntervalMs = 0 + KafkaSourceEnumerator enumerator = createEnumerator(context, 0L)) { + + // Start the enumerator, and it should schedule a one time task to discover and assign + // partitions. + enumerator.start(); + assertThat(context.getOneTimeCallables()) + .as("A one time partition discovery callable should have been scheduled") + .hasSize(1); + assertThat(context.getPeriodicCallables()).isEmpty(); + // Run the partition discover callable and check the partition assignment. + runOneTimePartitionDiscovery(context); + + // enumerator noMoreNewPartitionSplits first will be false, when execute + // handlePartitionSplitChanges will be set true + assertThat((Boolean) Whitebox.getInternalState(enumerator, "noMoreNewPartitionSplits")) + .isTrue(); + } + } + @Test(timeout = 30000L) public void testDiscoverPartitionsPeriodically() throws Throwable { try (MockSplitEnumeratorContext<KafkaPartitionSplit> context = @@ -261,7 +341,7 @@ public class KafkaEnumeratorTest { KafkaSourceEnumerator enumerator = createEnumerator( context2, - ENABLE_PERIODIC_PARTITION_DISCOVERY, + ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1, PRE_EXISTING_TOPICS, preexistingAssignments, new Properties())) { @@ -290,7 +370,7 @@ public class KafkaEnumeratorTest { KafkaSourceEnumerator enumerator = createEnumerator( context, - ENABLE_PERIODIC_PARTITION_DISCOVERY, + ENABLE_PERIODIC_PARTITION_DISCOVERY ? 1 : -1, PRE_EXISTING_TOPICS, Collections.emptySet(), properties)) { @@ -403,6 +483,12 @@ public class KafkaEnumeratorTest { enumContext, enablePeriodicPartitionDiscovery, EXCLUDE_DYNAMIC_TOPIC); } + private KafkaSourceEnumerator createEnumerator( + MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext, + long partitionDiscoveryIntervalMs) { + return createEnumerator(enumContext, partitionDiscoveryIntervalMs, EXCLUDE_DYNAMIC_TOPIC); + } + private KafkaSourceEnumerator createEnumerator( MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext, boolean enablePeriodicPartitionDiscovery, @@ -413,7 +499,23 @@ public class KafkaEnumeratorTest { } return createEnumerator( enumContext, - enablePeriodicPartitionDiscovery, + enablePeriodicPartitionDiscovery ? 1 : -1, + topics, + Collections.emptySet(), + new Properties()); + } + + private KafkaSourceEnumerator createEnumerator( + MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext, + long partitionDiscoveryIntervalMs, + boolean includeDynamicTopic) { + List<String> topics = new ArrayList<>(PRE_EXISTING_TOPICS); + if (includeDynamicTopic) { + topics.add(DYNAMIC_TOPIC_NAME); + } + return createEnumerator( + enumContext, + partitionDiscoveryIntervalMs, topics, Collections.emptySet(), new Properties()); @@ -425,7 +527,7 @@ public class KafkaEnumeratorTest { */ private KafkaSourceEnumerator createEnumerator( MockSplitEnumeratorContext<KafkaPartitionSplit> enumContext, - boolean enablePeriodicPartitionDiscovery, + long partitionDiscoveryIntervalMs, Collection<String> topicsToSubscribe, Set<TopicPartition> assignedPartitions, Properties overrideProperties) { @@ -442,10 +544,9 @@ public class KafkaEnumeratorTest { Properties props = new Properties(KafkaSourceTestEnv.getConsumerProperties(StringDeserializer.class)); KafkaSourceEnumerator.deepCopyProperties(overrideProperties, props); - String partitionDiscoverInterval = enablePeriodicPartitionDiscovery ? "1" : "-1"; props.setProperty( KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), - partitionDiscoverInterval); + String.valueOf(partitionDiscoveryIntervalMs)); return new KafkaSourceEnumerator( subscriber,
