shangxinli commented on code in PR #14182: URL: https://github.com/apache/iceberg/pull/14182#discussion_r2378994041
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java: ########## @@ -137,38 +139,68 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests) commitRequestMap.entrySet()) { Table table = catalog.loadTable(TableIdentifier.parse(entry.getKey().tableName())); DynamicCommittable last = entry.getValue().lastEntry().getValue().get(0).getCommittable(); - long maxCommittedCheckpointId = + + CheckpointInfo maxCommittedCheckpointIds = getMaxCommittedCheckpointId( table, last.jobId(), last.operatorId(), entry.getKey().branch()); // Mark the already committed FilesCommittable(s) as finished entry .getValue() - .headMap(maxCommittedCheckpointId, true) + .headMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, false) .values() .forEach(list -> list.forEach(CommitRequest::signalAlreadyCommitted)); + // Filter out all committables until the last checkpoint id, which may be partially committed. NavigableMap<Long, List<CommitRequest<DynamicCommittable>>> uncommitted = - entry.getValue().tailMap(maxCommittedCheckpointId, false); + entry.getValue().tailMap(maxCommittedCheckpointIds.maxProcessedCheckpointId, true); + if (!uncommitted.isEmpty()) { commitPendingRequests( - table, entry.getKey().branch(), uncommitted, last.jobId(), last.operatorId()); + table, + entry.getKey().branch(), + uncommitted, + last.jobId(), + last.operatorId(), + maxCommittedCheckpointIds); + } + } + } + + private static class CheckpointInfo { + private final long maxProcessedCheckpointId; + private final int maxCommittedWriteResultIndex; + + private CheckpointInfo(long maxProcessedCheckpointId, int maxCommittedWriteResultIndex) { + this.maxProcessedCheckpointId = maxProcessedCheckpointId; + this.maxCommittedWriteResultIndex = maxCommittedWriteResultIndex; + } + + int writeResultIndexFor(long checkpointId) { + if (checkpointId == this.maxProcessedCheckpointId) { + return this.maxCommittedWriteResultIndex; } + return -1; } } - private static long getMaxCommittedCheckpointId( + private static CheckpointInfo getMaxCommittedCheckpointId( Table table, String flinkJobId, String operatorId, String branch) { Snapshot snapshot = table.snapshot(branch); long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID; + int writeResultIndex = INITIAL_WRITE_RESULT_INDEX; while (snapshot != null) { Map<String, String> summary = snapshot.summary(); String snapshotFlinkJobId = summary.get(FLINK_JOB_ID); String snapshotOperatorId = summary.get(OPERATOR_ID); if (flinkJobId.equals(snapshotFlinkJobId) && (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) { - String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID); - if (value != null) { - lastCommittedCheckpointId = Long.parseLong(value); + String maxCheckpointIdString = summary.get(MAX_COMMITTED_CHECKPOINT_ID); + if (maxCheckpointIdString != null) { + lastCommittedCheckpointId = Long.parseLong(maxCheckpointIdString); + String writeResultIndexString = summary.get(MAX_WRITE_RESULT_INDEX); + if (writeResultIndexString != null) { + writeResultIndex = Integer.parseInt(writeResultIndexString); + } Review Comment: Not sure if we have test to cover the case of 'writeResultIndexString == null' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org