fapaul commented on code in PR #192:
URL:
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2409626057
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java:
##########
@@ -131,19 +134,84 @@ public KafkaSourceEnumerator(
this.context = context;
this.boundedness = boundedness;
- this.assignedPartitions = new
HashSet<>(kafkaSourceEnumState.assignedPartitions());
+ Map<AssignmentStatus, List<KafkaPartitionSplit>> splits =
+ initializeMigratedSplits(kafkaSourceEnumState.splits());
+ this.assignedSplits =
indexByPartition(splits.get(AssignmentStatus.ASSIGNED));
+ this.unassignedSplits =
indexByPartition(splits.get(AssignmentStatus.UNASSIGNED));
this.pendingPartitionSplitAssignment = new HashMap<>();
this.partitionDiscoveryIntervalMs =
KafkaSourceOptions.getOption(
properties,
KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS,
Long::parseLong);
this.consumerGroupId =
properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
- this.unassignedInitialPartitions =
- new
HashSet<>(kafkaSourceEnumState.unassignedInitialPartitions());
this.initialDiscoveryFinished =
kafkaSourceEnumState.initialDiscoveryFinished();
}
+ /**
+ * Initialize migrated splits to splits with concrete starting offsets.
This method ensures that
+ * the costly offset resolution is performed only when there are splits
that have been
+ * checkpointed with previous enumerator versions.
+ *
+ * <p>Note that this method is deliberately performed in the main thread
to avoid a checkpoint
+ * of the splits without starting offset.
+ */
+ private Map<AssignmentStatus, List<KafkaPartitionSplit>>
initializeMigratedSplits(
+ Set<SplitAndAssignmentStatus> splits) {
+ final Set<TopicPartition> migratedPartitions =
+ splits.stream()
+ .filter(
+ splitStatus ->
+ splitStatus.split().getStartingOffset()
+ ==
KafkaPartitionSplit.MIGRATED)
+ .map(splitStatus ->
splitStatus.split().getTopicPartition())
+ .collect(Collectors.toSet());
+
+ if (migratedPartitions.isEmpty()) {
+ return splitByAssignmentStatus(splits.stream());
+ }
+
+ final Map<TopicPartition, Long> startOffsets =
+ startingOffsetInitializer.getPartitionOffsets(
+ migratedPartitions, getOffsetsRetriever());
+ return splitByAssignmentStatus(
+ splits.stream()
+ .map(splitStatus -> resolveMigratedSplit(splitStatus,
startOffsets)));
Review Comment:
Nit: The flow of extracting the migratedPartitions is overly complex because
we extract the migrated partitions twice in line 179 and line 161.
##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java:
##########
@@ -316,39 +341,67 @@ public void testDiscoverPartitionsPeriodically() throws
Throwable {
}
}
- @Test
- public void testAddSplitsBack() throws Throwable {
+ @ParameterizedTest
+ @EnumSource(StandardOffsetsInitializer.class)
+ public void testAddSplitsBack(StandardOffsetsInitializer
offsetsInitializer) throws Throwable {
Review Comment:
Can you also add a test to cover the newly added migration story?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]