AHeise commented on code in PR #192:
URL:
https://github.com/apache/flink-connector-kafka/pull/192#discussion_r2408038369
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumStateSerializer.java:
##########
@@ -132,22 +197,43 @@ private static KafkaSourceEnumState
deserializeAssignedTopicPartitions(
}
}
- private static KafkaSourceEnumState
deserializeTopicPartitionAndAssignmentStatus(
- byte[] serialized) throws IOException {
+ @VisibleForTesting
+ static byte[] serializeV2(
+ Collection<SplitAndAssignmentStatus> splits, boolean
initialDiscoveryFinished)
+ throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeInt(splits.size());
+ for (SplitAndAssignmentStatus splitAndAssignmentStatus : splits) {
+ final TopicPartition topicPartition =
+ splitAndAssignmentStatus.split().getTopicPartition();
+ out.writeUTF(topicPartition.topic());
+ out.writeInt(topicPartition.partition());
+
out.writeInt(splitAndAssignmentStatus.assignmentStatus().getStatusCode());
+ }
+ out.writeBoolean(initialDiscoveryFinished);
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ private static KafkaSourceEnumState deserializeVersion2(byte[] serialized)
throws IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
final int numPartitions = in.readInt();
- Set<TopicPartitionAndAssignmentStatus> partitions = new
HashSet<>(numPartitions);
+ Set<SplitAndAssignmentStatus> partitions = new
HashSet<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
final String topic = in.readUTF();
final int partition = in.readInt();
final int statusCode = in.readInt();
partitions.add(
- new TopicPartitionAndAssignmentStatus(
- new TopicPartition(topic, partition),
+ new SplitAndAssignmentStatus(
+ new KafkaPartitionSplit(
+ new TopicPartition(topic, partition),
+ DEFAULT_STARTING_OFFSET),
Review Comment:
Yes, added a new MIGRATED offset to indicate that this needs to be
initialized on recovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]