becketqin commented on a change in pull request #15531:
URL: https://github.com/apache/flink/pull/15531#discussion_r655026006



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
##########
@@ -347,6 +347,41 @@ public void testSnapshotState() throws Throwable {
                 state2.assignedPartitions());
     }
 
+    @Test
+    public void testPartitionChangeChecking() throws Throwable {
+        final MockSplitEnumeratorContext<KafkaPartitionSplit> context =
+                new MockSplitEnumeratorContext<>(NUM_SUBTASKS);
+        final KafkaSourceEnumerator enumerator =

Review comment:
       It looks that there is a resource leak here. Supposedly, the 
`KafkaSourceEnumerator` should be closed after the test. As you may notice the 
other test method use the try block to do this. However, the 
`MockSplitEnumeratorContext#close()` did not shutdown the `mainExecutor` and 
`workerExecutor` correctly. This is an existing bug. Would be good to fix it 
either in this patch or a followup patch. In either way, we need to close the 
`KafkaSourceEnumerator` here in the unit tests.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriber.java
##########
@@ -41,36 +41,12 @@
 public interface KafkaSubscriber extends Serializable {
 
     /**
-     * Get the partitions changes compared to the current partition assignment.
+     * Get a set of subscribed {@link TopicPartition}s.
      *
-     * <p>Although Kafka partitions can only expand and will not shrink, the 
partitions may still
-     * disappear when the topic is deleted.
-     *
-     * @param adminClient The admin client used to retrieve partition 
information.
-     * @param currentAssignment the partitions that are currently assigned to 
the source readers.
-     * @return The partition changes compared with the currently assigned 
partitions.
+     * @param adminClient The admin client used to retrieve subscribed topic 
partitions.
+     * @return A set of subscribed {@link TopicPartition}s
      */
-    PartitionChange getPartitionChanges(
-            AdminClient adminClient, Set<TopicPartition> currentAssignment);
-
-    /** A container class to hold the newly added partitions and removed 
partitions. */
-    class PartitionChange {
-        private final Set<TopicPartition> newPartitions;
-        private final Set<TopicPartition> removedPartitions;
-
-        PartitionChange(Set<TopicPartition> newPartitions, Set<TopicPartition> 
removedPartitions) {
-            this.newPartitions = newPartitions;
-            this.removedPartitions = removedPartitions;
-        }
-
-        public Set<TopicPartition> getNewPartitions() {
-            return newPartitions;
-        }
-
-        public Set<TopicPartition> getRemovedPartitions() {
-            return removedPartitions;
-        }
-    }
+    Set<TopicPartition> listSubscribedTopicPartitions(AdminClient adminClient);

Review comment:
       Nit: `getSubscribedTopicPartitions()` might be slightly better from 
naming consistency point of view.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to