openinx commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545715370
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
}
}
- private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+ private void commitUpToCheckpoint(NavigableMap<Long, byte[]>
deltaManifestsMap,
String newFlinkJobId,
long checkpointId) throws IOException {
- NavigableMap<Long, byte[]> pendingManifestMap =
manifestsMap.headMap(checkpointId, true);
+ NavigableMap<Long, byte[]> pendingMap =
deltaManifestsMap.headMap(checkpointId, true);
- List<ManifestFile> manifestFiles = Lists.newArrayList();
- List<DataFile> pendingDataFiles = Lists.newArrayList();
- for (byte[] manifestData : pendingManifestMap.values()) {
- if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+ for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+ if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
// Skip the empty flink manifest.
continue;
}
- ManifestFile manifestFile =
-
SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE,
manifestData);
-
- manifestFiles.add(manifestFile);
- pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile,
table.io()));
+ DeltaManifests deltaManifests = SimpleVersionedSerialization
+ .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE,
e.getValue());
+ pendingResults.put(e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+ Iterables.addAll(manifests, deltaManifests);
}
if (replacePartitions) {
- replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+ replacePartitions(pendingResults, newFlinkJobId, checkpointId);
} else {
- append(pendingDataFiles, newFlinkJobId, checkpointId);
+ commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
}
- pendingManifestMap.clear();
+ pendingMap.clear();
- // Delete the committed manifests and clear the committed data files from
dataFilesPerCheckpoint.
- for (ManifestFile manifestFile : manifestFiles) {
+ // Delete the committed manifests.
+ for (ManifestFile manifest : manifests) {
try {
- table.io().deleteFile(manifestFile.path());
+ table.io().deleteFile(manifest.path());
} catch (Exception e) {
// The flink manifests cleaning failure shouldn't abort the completed
checkpoint.
String details = MoreObjects.toStringHelper(this)
.add("flinkJobId", newFlinkJobId)
.add("checkpointId", checkpointId)
- .add("manifestPath", manifestFile.path())
+ .add("manifestPath", manifest.path())
.toString();
LOG.warn("The iceberg transaction has been committed, but we failed to
clean the temporary flink manifests: {}",
details, e);
}
}
}
- private void replacePartitions(List<DataFile> dataFiles, String
newFlinkJobId, long checkpointId) {
+ private void replacePartitions(NavigableMap<Long, WriteResult>
pendingResults, String newFlinkJobId,
+ long checkpointId) {
+ // Partition overwrite does not support delete files.
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r ->
r.deleteFiles().length).sum();
+ Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions
with delete files.");
+
+ // Commit the overwrite transaction.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
int numFiles = 0;
- for (DataFile file : dataFiles) {
- numFiles += 1;
- dynamicOverwrite.addFile(file);
+ for (WriteResult result : pendingResults.values()) {
+ numFiles += result.dataFiles().length;
+ Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}
- commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite",
newFlinkJobId, checkpointId);
+ commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition
overwrite", newFlinkJobId, checkpointId);
}
- private void append(List<DataFile> dataFiles, String newFlinkJobId, long
checkpointId) {
- AppendFiles appendFiles = table.newAppend();
+ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults,
String newFlinkJobId, long checkpointId) {
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r ->
r.deleteFiles().length).sum();
- int numFiles = 0;
- for (DataFile file : dataFiles) {
- numFiles += 1;
- appendFiles.appendFile(file);
- }
+ if (deleteFilesNum == 0) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
- commitOperation(appendFiles, numFiles, "append", newFlinkJobId,
checkpointId);
+ int numFiles = 0;
+ for (WriteResult result : pendingResults.values()) {
Review comment:
It sounds like a separate improvement , so I created an issue for this ,
let's discuss there, https://github.com/apache/iceberg/issues/1959.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]