bryanck commented on code in PR #14525:
URL: https://github.com/apache/iceberg/pull/14525#discussion_r2503975452


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -206,7 +206,15 @@ private void commitToTable(
 
     String branch = 
config.tableConfig(tableIdentifier.toString()).commitBranch();
 
+    // Control topic partition offsets may include a subset of partition ids 
if there were no
+    // records for other partitions.  Merge the updated topic partitions with 
the last committed
+    // offsets.
     Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, 
branch);
+    Map<Integer, Long> mergedOffsets =
+        Stream.of(committedOffsets, controlTopicOffsets)
+            .flatMap(map -> map.entrySet().stream())
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, 
Long::max));

Review Comment:
   This is the correct behavior. The offsets in the table correspond to the 
data committed. If the user did want to roll back the offsets, they also need 
to roll back the offsets in the table, either by rolling back the snapshot or 
by updating the offsets manually. (The hope was we'd have tools to help with 
that at some point.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to