amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2695645099
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,139 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
return cachedNewDeleteManifests;
}
+ // Merge duplicates, internally takes care of updating newDeleteFilesBySpec
to remove
+ // duplicates and add the newly merged DV
+ private void mergeDVsAndWrite() {
+ Map<Integer, List<MergedDVContent>> mergedIndicesBySpec =
Maps.newConcurrentMap();
+
+ Tasks.foreach(dvsByReferencedFile.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ entry -> {
+ String referencedLocation = entry.getKey();
+ DeleteFileSet dvsToMerge = entry.getValue();
+ // Nothing to merge
+ if (dvsToMerge.size() < 2) {
+ return;
+ }
+
+ MergedDVContent merged = mergePositions(referencedLocation,
dvsToMerge);
+
+ mergedIndicesBySpec
+ .computeIfAbsent(
+ merged.specId, spec ->
Collections.synchronizedList(Lists.newArrayList()))
+ .add(merged);
+ });
+
+ // Update newDeleteFilesBySpec to remove all the duplicates
+ mergedIndicesBySpec.forEach(
+ (specId, mergedDVContent) -> {
+ mergedDVContent.stream()
+ .map(content -> content.mergedDVs)
+ .forEach(duplicateDVs ->
newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs));
+ });
+
+ writeMergedDVs(mergedIndicesBySpec);
+ }
+
+ // Produces a Puffin per partition spec containing the merged DVs for that
spec
Review Comment:
Discussed offline, since it's generally going to be 1 partition spec, we're
OK here, and we can keep the existing behavior.cc @rdblue
I did double check, so while the OutputFileFactory requires a spec to be
passed in the DVFileWriter isn't really bound to a spec (as expected). The
DVFileWriter uses a newLocation() API which doesn't care about the spec or any
tuples (again as expected).
So I think the dummy partition spec idea I had would work if we want to
further simplify all this grouping logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]