AHeise commented on code in PR #192:
URL: 
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2409711198


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -131,19 +134,84 @@ public KafkaSourceEnumerator(
         this.context = context;
         this.boundedness = boundedness;
 
-        this.assignedPartitions = new 
HashSet<>(kafkaSourceEnumState.assignedPartitions());
+        Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
+                initializeMigratedSplits(kafkaSourceEnumState.splits());
+        this.assignedSplits = 
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
+        this.unassignedSplits = 
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
         this.pendingPartitionSplitAssignment = new HashMap<>();
         this.partitionDiscoveryIntervalMs =
                 KafkaSourceOptions.getOption(
                         properties,
                         KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
                         Long::parseLong);
         this.consumerGroupId = 
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
-        this.unassignedInitialPartitions =
-                new 
HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
         this.initialDiscoveryFinished = 
kafkaSourceEnumState.initialDiscoveryFinished();
     }
 
+    /**
+     * Initialize migrated splits to splits with concrete starting offsets. 
This method ensures that
+     * the costly offset resolution is performed only when there are splits 
that have been
+     * checkpointed with previous enumerator versions.
+     *
+     * <p>Note that this method is deliberately performed in the main thread 
to avoid a checkpoint
+     * of the splits without starting offset.
+     */
+    private Map<AssignmentStatus, List<KafkaPartitionSplit>> 
initializeMigratedSplits(
+            Set<SplitAndAssignmentStatus> splits) {
+        final Set<TopicPartition> migratedPartitions =
+                splits.stream()
+                        .filter(
+                                splitStatus ->
+                                        splitStatus.split().getStartingOffset()
+                                                == 
KafkaPartitionSplit.MIGRATED)
+                        .map(splitStatus -> 
splitStatus.split().getTopicPartition())
+                        .collect(Collectors.toSet());
+
+        if (migratedPartitions.isEmpty()) {
+            return splitByAssignmentStatus(splits.stream());
+        }
+
+        final Map<TopicPartition, Long> startOffsets =
+                startingOffsetInitializer.getPartitionOffsets(
+                        migratedPartitions, getOffsetsRetriever());
+        return splitByAssignmentStatus(
+                splits.stream()
+                        .map(splitStatus -> resolveMigratedSplit(splitStatus, 
startOffsets)));

Review Comment:
   It's unfortunately necessary by design:
   * 161 extracts the partitions which are used to jointly look up the 
partition offsets 
   * This is expensive as it uses admin client to contact Kafka cluster
   * The design of offset initializer is to jointly look up all partitions to 
have 1 request to Kafka brokers only
   * Now that we received all offsets, 179 is applying them to the split. It 
could be a simple map lookup but I decided to add some assertion, so it went 
into a different method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to