aiborodin commented on code in PR #14312:
URL: https://github.com/apache/iceberg/pull/14312#discussion_r2427677246
##########
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java:
##########
@@ -125,26 +126,42 @@ public void prepareSnapshotPreBarrier(long checkpointId)
throws IOException {
}
/**
- * Write all the completed data files to a newly created manifest file and
return the manifest's
+ * Write all the completed data files to a newly created manifest files and
return the manifests'
* avro serialized bytes.
*/
@VisibleForTesting
- byte[] writeToManifest(
- WriteTarget key, Collection<DynamicWriteResult> writeResults, long
checkpointId)
+ byte[][] writeToManifests(
+ String tableName, Collection<WriteResult> writeResults, long
checkpointId)
Review Comment:
Yes, a manifest can only contain files for a single partition spec.
Yes, the `RowDelta` writes multiple manifests behind the scenes. It keeps
track of `DataFile` and `DeleteFile` by their partition specs. See the
implementation of
[MergingSnapshotProducer](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L87):
```java
// update data
private final Map<Integer, DataFileSet> newDataFilesBySpec =
Maps.newHashMap();
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, DeleteFileSet> newDeleteFilesBySpec =
Maps.newHashMap();
```
The current implementation implicitly writes a new temporary manifest for
each unique `WriteTarget`, which creates multiple `DynamicCommittables` per
(table, branch, checkpoint) triplet (incorrect behaviour). See the
implementation of
[DynamicWriteResultAggregator](https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java#L101).
With this refactor, we write even fewer manifests (only for unique partition
specs), which makes the implementation explicit:
1. Create multiple manifests only for different partition specs (similar to
`RowDelta`)
2. Create only one `DynamicCommittable` per checkpoint, and use multiple
manifests for serialisation
3. Remove all assumptions of multiple commit requests in `DynamicCommitter`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]