PatrickRen commented on code in PR #40:
URL:
https://github.com/apache/flink-connector-kafka/pull/40#discussion_r1262106035
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -501,6 +501,44 @@ public void testPartitionChangeChecking() throws Throwable
{
}
}
+ @Test
+ public void testEnablePartitionDiscoveryByDefault() {
+ KafkaSourceEnumerator enumerator =
+ new KafkaSourceEnumerator(
+
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)),
+ OffsetsInitializer.earliest(),
+ new NoStoppingOffsetsInitializer(),
+ new Properties(),
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS),
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ new KafkaSourceEnumState(
+ Collections.emptySet(),
Collections.emptySet(), false));
+ long partitionDiscoveryIntervalMs =
+ (long) Whitebox.getInternalState(enumerator,
"partitionDiscoveryIntervalMs");
+ assertThat(partitionDiscoveryIntervalMs)
+
.isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue());
+ }
+
+ @Test
+ public void testDisablePartitionDiscovery() {
+ Properties props = new Properties();
+ props.setProperty(
+ KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
String.valueOf(0));
+ KafkaSourceEnumerator enumerator =
+ new KafkaSourceEnumerator(
+
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)),
+ OffsetsInitializer.earliest(),
+ new NoStoppingOffsetsInitializer(),
+ props,
+ new MockSplitEnumeratorContext<>(NUM_SUBTASKS),
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ new KafkaSourceEnumState(
+ Collections.emptySet(),
Collections.emptySet(), false));
+ long partitionDiscoveryIntervalMs =
+ (long) Whitebox.getInternalState(enumerator,
"partitionDiscoveryIntervalMs");
+ assertThat(partitionDiscoveryIntervalMs).isEqualTo(0);
Review Comment:
If we only check the value of `partitionDiscoveryIntervalMs` here, this test
case only guards the logic of parsing properties and setting up the value of
corresponding variable. I think we can dig deeper, to check if setting
partition discovery interval to 0 does disable the partition discovery, by
adding another assertion:
```
assertThat(context.periodicCallables()).isEmpty();
```
where `context` is the `MockSplitEnumeratorContext` that passed into the
enumerator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]