openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r544343472
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -229,33 +232,71 @@ private void commitUpToCheckpoint(NavigableMap<Long,
byte[]> manifestsMap,
}
}
- private void replacePartitions(List<DataFile> dataFiles, String
newFlinkJobId, long checkpointId) {
+ private void replacePartitions(NavigableMap<Long, WriteResult>
pendingResults, String newFlinkJobId,
+ long checkpointId) {
+ // Merge all the pending results into a single write result.
+ WriteResult result =
WriteResult.builder().add(pendingResults.values()).build();
+
+ // Partition overwrite does not support delete files.
+ Preconditions.checkArgument(result.deleteFiles().length == 0,
+ "Cannot overwrite partitions with delete files.");
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+ // Commit the overwrite transaction.
int numFiles = 0;
- for (DataFile file : dataFiles) {
+ for (DataFile file : result.dataFiles()) {
numFiles += 1;
dynamicOverwrite.addFile(file);
}
- commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite",
newFlinkJobId, checkpointId);
+ commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition
overwrite", newFlinkJobId, checkpointId);
}
- private void append(List<DataFile> dataFiles, String newFlinkJobId, long
checkpointId) {
- AppendFiles appendFiles = table.newAppend();
+ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults,
String newFlinkJobId, long checkpointId) {
+ // Merge all pending results into a single write result.
+ WriteResult mergedResult =
WriteResult.builder().add(pendingResults.values()).build();
- int numFiles = 0;
- for (DataFile file : dataFiles) {
- numFiles += 1;
- appendFiles.appendFile(file);
- }
+ if (mergedResult.deleteFiles().length < 1) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
+
+ int numFiles = 0;
+ for (DataFile file : mergedResult.dataFiles()) {
+ numFiles += 1;
+ appendFiles.appendFile(file);
+ }
+
+ commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId,
checkpointId);
+ } else {
+ // To be compatible with iceberg format V2.
+ for (Map.Entry<Long, WriteResult> e : pendingResults.entrySet()) {
+ // We don't commit the merged result into a single transaction because
for the sequential transaction txn1 and
Review comment:
I will provide an unit test to address it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]