openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544862996
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -85,9 +88,9 @@
// iceberg table when the next checkpoint happen.
private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint =
Maps.newTreeMap();
- // The data files cache for current checkpoint. Once the snapshot barrier
received, it will be flushed to the
+ // The completed files cache for current checkpoint. Once the snapshot
barrier received, it will be flushed to the
// 'dataFilesPerCheckpoint'.
- private final List<DataFile> dataFilesOfCurrentCheckpoint =
Lists.newArrayList();
+ private final List<WriteResult> writeResultsOfCurrentCkpt =
Lists.newArrayList();
Review comment:
Yes, it's correct here. Because if there're 5 `IcebergStreamWriter`,
then each writer will emit a `WriteResult`. For the one parallelism
`IcebergFilesCommitter`, it will collect all the `WriteResult`(s) in this
`writeResultsOfCurrentCkpt` cache, and then merge them into a single
`WriteResult`. Finally, write those files into delete + data manifests and
update the flink statebackend.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]