becketqin commented on a change in pull request #15397:
URL: https://github.com/apache/flink/pull/15397#discussion_r603120795
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
##########
@@ -170,7 +170,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
stoppingOffsetsInitializer,
props,
enumContext,
- checkpoint.getCurrentAssignment());
+ checkpoint.currentAssignedSplits());
Review comment:
Maybe just `assignedSplits`?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
##########
@@ -107,18 +112,17 @@ public KafkaSourceEnumerator(
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
- Map<Integer, Set<KafkaPartitionSplit>> currentSplitsAssignments) {
+ Set<KafkaPartitionSplit> currentAssignedSplits) {
this.subscriber = subscriber;
this.startingOffsetInitializer = startingOffsetInitializer;
this.stoppingOffsetInitializer = stoppingOffsetInitializer;
this.properties = properties;
this.context = context;
this.discoveredPartitions = new HashSet<>();
- this.readerIdToSplitAssignments = new
HashMap<>(currentSplitsAssignments);
- this.readerIdToSplitAssignments.forEach(
- (reader, splits) ->
- splits.forEach(s ->
discoveredPartitions.add(s.getTopicPartition())));
+ this.currentAssignedSplits = new HashSet<>(currentAssignedSplits);
+ currentAssignedSplits.forEach(
Review comment:
Could probably be replaced with `discoveredPartitions.addAll()`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java
##########
@@ -35,7 +35,7 @@
public class KafkaSourceEnumStateSerializer
implements SimpleVersionedSerializer<KafkaSourceEnumState> {
- private static final int CURRENT_VERSION = 0;
+ private static final int CURRENT_VERSION = 1;
Review comment:
Can we add a unit test to test the backwards compatibility?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]