danielcweeks commented on code in PR #14506:
URL: https://github.com/apache/iceberg/pull/14506#discussion_r2493032054
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -284,6 +289,29 @@ private void commitToTable(
}
}
+ private void validateAndCommit(
+ PendingUpdate<?> pendingUpdate, String branch, Map<Integer, Long>
expectedOffsets) {
+ CommitValidator validator =
+ (base, metadata) -> {
+ Map<Integer, Long> lastCommittedOffsets =
lastCommittedOffsetsForTable(base, branch);
+
+ if (expectedOffsets == null || expectedOffsets.isEmpty()) {
+ return; // there are no stored offsets, so assume we're starting
with new offsets
+ }
+
+ if (!expectedOffsets.equals(lastCommittedOffsets)) {
+ throw new CommitFailedException(
+ "Latest offsets do not match expected offsets for this
commit.");
+ }
+ };
Review Comment:
I debated this, but I'm not sure what the expected behavior should be in
that case. It seems likely that we want to error in that case as well, but the
scenario around it is less clear to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]