wombatu-kun commented on code in PR #16360:
URL: https://github.com/apache/iceberg/pull/16360#discussion_r3377402522


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -229,8 +229,37 @@ private void commitToTable(
     // records for other partitions.  Merge the updated topic partitions with 
the last committed
     // offsets.
     Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table, 
branch);
+
+    // Detect if the control topic was reset (e.g., after Kafka cluster 
recreation).
+    // If the current coordinator's observed control topic offsets are lower 
than the
+    // previously committed offsets stored in the snapshot, the control topic 
has likely
+    // been reset and the stored offsets are stale.  In this case, skip 
deduplication
+    // to avoid silently dropping all data files and blocking metadata commits.
+    boolean controlTopicReset =
+        !committedOffsets.isEmpty()
+            && committedOffsets.entrySet().stream()
+                .anyMatch(
+                    e -> {
+                      Long current = controlTopicOffsets.get(e.getKey());
+                      return current != null && current < e.getValue();
+                    });
+
+    if (controlTopicReset) {
+      LOG.warn(
+          "Coordinator {}: detected possible Kafka cluster recreation for 
table {}. "
+              + "Control topic offsets {} are lower than previously committed 
offsets {}. "
+              + "Skipping offset deduplication and resetting stored offset 
baseline.",
+          taskId,
+          tableIdentifier,
+          controlTopicOffsets,
+          committedOffsets);
+    }
+
+    // When a reset is detected, base the stored offsets only on the current 
control topic
+    // offsets so subsequent commits use the correct (new-cluster) baseline.
+    Map<Integer, Long> baseOffsets = controlTopicReset ? Map.of() : 
committedOffsets;

Review Comment:
   `baseOffsets = controlTopicReset ? Map.of() : committedOffsets` empties the 
dedup baseline for every partition as soon as any one partition looks reset, 
but the description states the reset is per-partition. Scoping it per partition 
keeps dedup where it is still valid: build `baseOffsets` by filtering 
`committedOffsets` to entries where `controlTopicOffsets.get(p) == null || 
current >= committed`, dropping the reset ones. Then both the merge and the 
dedup filter run off `baseOffsets` (reset partitions have no entry, so 
`Long::max` stores their new low offset and the `minOffset == null` branch 
passes their events), leaving `controlTopicReset` to gate only the warn log. 
This is behavior-identical for the single-partition control topic, so no 
regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to