rdblue commented on a change in pull request #1939:
URL: https://github.com/apache/iceberg/pull/1939#discussion_r545994813
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -184,78 +185,106 @@ public void notifyCheckpointComplete(long checkpointId)
throws Exception {
}
}
- private void commitUpToCheckpoint(NavigableMap<Long, byte[]> manifestsMap,
+ private void commitUpToCheckpoint(NavigableMap<Long, byte[]>
deltaManifestsMap,
String newFlinkJobId,
long checkpointId) throws IOException {
- NavigableMap<Long, byte[]> pendingManifestMap =
manifestsMap.headMap(checkpointId, true);
+ NavigableMap<Long, byte[]> pendingMap =
deltaManifestsMap.headMap(checkpointId, true);
- List<ManifestFile> manifestFiles = Lists.newArrayList();
- List<DataFile> pendingDataFiles = Lists.newArrayList();
- for (byte[] manifestData : pendingManifestMap.values()) {
- if (Arrays.equals(EMPTY_MANIFEST_DATA, manifestData)) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
+ for (Map.Entry<Long, byte[]> e : pendingMap.entrySet()) {
+ if (Arrays.equals(EMPTY_MANIFEST_DATA, e.getValue())) {
// Skip the empty flink manifest.
continue;
}
- ManifestFile manifestFile =
-
SimpleVersionedSerialization.readVersionAndDeSerialize(FlinkManifestSerializer.INSTANCE,
manifestData);
-
- manifestFiles.add(manifestFile);
- pendingDataFiles.addAll(FlinkManifestUtil.readDataFiles(manifestFile,
table.io()));
+ DeltaManifests deltaManifests = SimpleVersionedSerialization
+ .readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE,
e.getValue());
+ pendingResults.put(e.getKey(),
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+ Iterables.addAll(manifests, deltaManifests);
}
if (replacePartitions) {
- replacePartitions(pendingDataFiles, newFlinkJobId, checkpointId);
+ replacePartitions(pendingResults, newFlinkJobId, checkpointId);
} else {
- append(pendingDataFiles, newFlinkJobId, checkpointId);
+ commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
}
- pendingManifestMap.clear();
+ pendingMap.clear();
- // Delete the committed manifests and clear the committed data files from
dataFilesPerCheckpoint.
- for (ManifestFile manifestFile : manifestFiles) {
+ // Delete the committed manifests.
+ for (ManifestFile manifest : manifests) {
try {
- table.io().deleteFile(manifestFile.path());
+ table.io().deleteFile(manifest.path());
} catch (Exception e) {
// The flink manifests cleaning failure shouldn't abort the completed
checkpoint.
String details = MoreObjects.toStringHelper(this)
.add("flinkJobId", newFlinkJobId)
.add("checkpointId", checkpointId)
- .add("manifestPath", manifestFile.path())
+ .add("manifestPath", manifest.path())
.toString();
LOG.warn("The iceberg transaction has been committed, but we failed to
clean the temporary flink manifests: {}",
details, e);
}
}
}
- private void replacePartitions(List<DataFile> dataFiles, String
newFlinkJobId, long checkpointId) {
+ private void replacePartitions(NavigableMap<Long, WriteResult>
pendingResults, String newFlinkJobId,
+ long checkpointId) {
+ // Partition overwrite does not support delete files.
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r ->
r.deleteFiles().length).sum();
+ Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions
with delete files.");
+
+ // Commit the overwrite transaction.
ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
int numFiles = 0;
- for (DataFile file : dataFiles) {
- numFiles += 1;
- dynamicOverwrite.addFile(file);
+ for (WriteResult result : pendingResults.values()) {
+ numFiles += result.dataFiles().length;
+ Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}
- commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite",
newFlinkJobId, checkpointId);
+ commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition
overwrite", newFlinkJobId, checkpointId);
}
- private void append(List<DataFile> dataFiles, String newFlinkJobId, long
checkpointId) {
- AppendFiles appendFiles = table.newAppend();
+ private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults,
String newFlinkJobId, long checkpointId) {
+ int deleteFilesNum = pendingResults.values().stream().mapToInt(r ->
r.deleteFiles().length).sum();
- int numFiles = 0;
- for (DataFile file : dataFiles) {
- numFiles += 1;
- appendFiles.appendFile(file);
- }
+ if (deleteFilesNum == 0) {
+ // To be compatible with iceberg format V1.
+ AppendFiles appendFiles = table.newAppend();
- commitOperation(appendFiles, numFiles, "append", newFlinkJobId,
checkpointId);
+ int numFiles = 0;
+ for (WriteResult result : pendingResults.values()) {
Review comment:
> Maybe we can add a similar API in DeleteFiles interface?
We don't currently do this because we need delete entries to exist when we
delete files. That way we can track when something was deleted and clean it up
incrementally in `ExpireSnapshots`. If we did have a method like this, it would
always rewrite the manifest with deletes, or would need to ensure that the
manifest that is added contains only deletes, and these requirements are not
very obvious. I think it is better to pass the deleted files through the
existing methods.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]