mxm commented on code in PR #14559:
URL: https://github.com/apache/iceberg/pull/14559#discussion_r2517557012
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java:
##########
@@ -302,30 +303,56 @@ private void commitDeltaTxn(
CommitSummary summary,
String newFlinkJobId,
String operatorId) {
- for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
- long checkpointId = e.getKey();
- List<WriteResult> writeResults = e.getValue();
-
- RowDelta rowDelta = table.newRowDelta().scanManifestsWith(workerPool);
- for (WriteResult result : writeResults) {
- // Row delta validations are not needed for streaming changes that
write equality deletes.
- // Equality deletes are applied to data in all previous sequence
numbers, so retries may
- // push deletes further in the future, but do not affect correctness.
Position deletes
- // committed to the table in this path are used only to delete rows
from data files that are
- // being added in this commit. There is no way for data files added
along with the delete
- // files to be concurrently removed, so there is no need to validate
the files referenced by
- // the position delete files that are being committed.
- Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
- Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
+ if (summary.deleteFilesCount() == 0) {
+ // Use append snapshot operation where possible
+ AppendFiles appendFiles =
table.newAppend().scanManifestsWith(workerPool);
+ for (List<WriteResult> resultList : pendingResults.values()) {
Review Comment:
Can we move this loop up one level, as in
https://github.com/apache/iceberg/pull/14559/files#r2513719871? This avoids
repeating it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]