wombatu-kun commented on code in PR #16360:
URL: https://github.com/apache/iceberg/pull/16360#discussion_r3377402522
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -229,8 +229,37 @@ private void commitToTable(
// records for other partitions. Merge the updated topic partitions with
the last committed
// offsets.
Map<Integer, Long> committedOffsets = lastCommittedOffsetsForTable(table,
branch);
+
+ // Detect if the control topic was reset (e.g., after Kafka cluster
recreation).
+ // If the current coordinator's observed control topic offsets are lower
than the
+ // previously committed offsets stored in the snapshot, the control topic
has likely
+ // been reset and the stored offsets are stale. In this case, skip
deduplication
+ // to avoid silently dropping all data files and blocking metadata commits.
+ boolean controlTopicReset =
+ !committedOffsets.isEmpty()
+ && committedOffsets.entrySet().stream()
+ .anyMatch(
+ e -> {
+ Long current = controlTopicOffsets.get(e.getKey());
+ return current != null && current < e.getValue();
+ });
+
+ if (controlTopicReset) {
+ LOG.warn(
+ "Coordinator {}: detected possible Kafka cluster recreation for
table {}. "
+ + "Control topic offsets {} are lower than previously committed
offsets {}. "
+ + "Skipping offset deduplication and resetting stored offset
baseline.",
+ taskId,
+ tableIdentifier,
+ controlTopicOffsets,
+ committedOffsets);
+ }
+
+ // When a reset is detected, base the stored offsets only on the current
control topic
+ // offsets so subsequent commits use the correct (new-cluster) baseline.
+ Map<Integer, Long> baseOffsets = controlTopicReset ? Map.of() :
committedOffsets;
Review Comment:
`baseOffsets = controlTopicReset ? Map.of() : committedOffsets` empties the
dedup baseline for every partition as soon as any one partition looks reset,
but the description states the reset is per-partition. Scoping it per partition
keeps dedup where it is still valid: build `baseOffsets` by filtering
`committedOffsets` to entries where `controlTopicOffsets.get(p) == null ||
current >= committed`, dropping the reset ones. Then both the merge and the
dedup filter run off `baseOffsets` (reset partitions have no entry, so
`Long::max` stores their new low offset and the `minOffset == null` branch
passes their events), leaving `controlTopicReset` to gate only the warn log.
This is behavior-identical for the single-partition control topic, so no
regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]