fapaul commented on code in PR #192:
URL: 
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2409626057


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -131,19 +134,84 @@ public KafkaSourceEnumerator(
         this.context = context;
         this.boundedness = boundedness;
 
-        this.assignedPartitions = new 
HashSet<>(kafkaSourceEnumState.assignedPartitions());
+        Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
+                initializeMigratedSplits(kafkaSourceEnumState.splits());
+        this.assignedSplits = 
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
+        this.unassignedSplits = 
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
         this.pendingPartitionSplitAssignment = new HashMap<>();
         this.partitionDiscoveryIntervalMs =
                 KafkaSourceOptions.getOption(
                         properties,
                         KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
                         Long::parseLong);
         this.consumerGroupId = 
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-        this.unassignedInitialPartitions =
-                new 
HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
         this.initialDiscoveryFinished = 
kafkaSourceEnumState.initialDiscoveryFinished();
     }
 
+    /**
+     * Initialize migrated splits to splits with concrete starting offsets. 
This method ensures that
+     * the costly offset resolution is performed only when there are splits 
that have been
+     * checkpointed with previous enumerator versions.
+     *
+     * <p>Note that this method is deliberately performed in the main thread 
to avoid a checkpoint
+     * of the splits without starting offset.
+     */
+    private Map<AssignmentStatus, List<KafkaPartitionSplit>> 
initializeMigratedSplits(
+            Set<SplitAndAssignmentStatus> splits) {
+        final Set<TopicPartition> migratedPartitions =
+                splits.stream()
+                        .filter(
+                                splitStatus ->
+                                        splitStatus.split().getStartingOffset()
+                                                == 
KafkaPartitionSplit.MIGRATED)
+                        .map(splitStatus -> 
splitStatus.split().getTopicPartition())
+                        .collect(Collectors.toSet());
+
+        if (migratedPartitions.isEmpty()) {
+            return splitByAssignmentStatus(splits.stream());
+        }
+
+        final Map<TopicPartition, Long> startOffsets =
+                startingOffsetInitializer.getPartitionOffsets(
+                        migratedPartitions, getOffsetsRetriever());
+        return splitByAssignmentStatus(
+                splits.stream()
+                        .map(splitStatus -> resolveMigratedSplit(splitStatus, 
startOffsets)));

Review Comment:
   Nit: The flow of extracting the migratedPartitions is overly complex because 
we extract the migrated partitions twice in line 179 and line 161.



##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java:
##########
@@ -316,39 +341,67 @@ public void testDiscoverPartitionsPeriodically() throws 
Throwable {
         }
     }
 
-    @Test
-    public void testAddSplitsBack() throws Throwable {
+    @ParameterizedTest
+    @EnumSource(StandardOffsetsInitializer.class)
+    public void testAddSplitsBack(StandardOffsetsInitializer 
offsetsInitializer) throws Throwable {

Review Comment:
   Can you also add a test to cover the newly added migration story?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to