PatrickRen commented on code in PR #40:
URL: 
https://github.com/apache/flink-connector-kafka/pull/40#discussion_r1262106035


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -501,6 +501,44 @@ public void testPartitionChangeChecking() throws Throwable 
{
         }
     }
 
+    @Test
+    public void testEnablePartitionDiscoveryByDefault() {
+        KafkaSourceEnumerator enumerator =
+                new KafkaSourceEnumerator(
+                        
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)),
+                        OffsetsInitializer.earliest(),
+                        new NoStoppingOffsetsInitializer(),
+                        new Properties(),
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS),
+                        Boundedness.CONTINUOUS_UNBOUNDED,
+                        new KafkaSourceEnumState(
+                                Collections.emptySet(), 
Collections.emptySet(), false));
+        long partitionDiscoveryIntervalMs =
+                (long) Whitebox.getInternalState(enumerator, 
"partitionDiscoveryIntervalMs");
+        assertThat(partitionDiscoveryIntervalMs)
+                
.isEqualTo(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.defaultValue());
+    }
+
+    @Test
+    public void testDisablePartitionDiscovery() {
+        Properties props = new Properties();
+        props.setProperty(
+                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), 
String.valueOf(0));
+        KafkaSourceEnumerator enumerator =
+                new KafkaSourceEnumerator(
+                        
KafkaSubscriber.getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)),
+                        OffsetsInitializer.earliest(),
+                        new NoStoppingOffsetsInitializer(),
+                        props,
+                        new MockSplitEnumeratorContext<>(NUM_SUBTASKS),
+                        Boundedness.CONTINUOUS_UNBOUNDED,
+                        new KafkaSourceEnumState(
+                                Collections.emptySet(), 
Collections.emptySet(), false));
+        long partitionDiscoveryIntervalMs =
+                (long) Whitebox.getInternalState(enumerator, 
"partitionDiscoveryIntervalMs");
+        assertThat(partitionDiscoveryIntervalMs).isEqualTo(0);

Review Comment:
   If we only check the value of `partitionDiscoveryIntervalMs` here, this test 
case only guards the logic of parsing properties and setting up the value of 
corresponding variable. I think we can dig deeper, to check if setting 
partition discovery interval to 0 does disable the partition discovery, by 
adding another assertion:
   ```
   assertThat(context.periodicCallables()).isEmpty();
   ```
   where `context` is the `MockSplitEnumeratorContext` that passed into the 
enumerator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to