amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2695348368
##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,139 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
return cachedNewDeleteManifests;
}
+ // Merge duplicates, internally takes care of updating newDeleteFilesBySpec
to remove
+ // duplicates and add the newly merged DV
+ private void mergeDVsAndWrite() {
+ Map<Integer, List<MergedDVContent>> mergedIndicesBySpec =
Maps.newConcurrentMap();
+
+ Tasks.foreach(dvsByReferencedFile.entrySet())
+ .executeWith(ThreadPools.getDeleteWorkerPool())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .run(
+ entry -> {
+ String referencedLocation = entry.getKey();
+ DeleteFileSet dvsToMerge = entry.getValue();
+ // Nothing to merge
+ if (dvsToMerge.size() < 2) {
+ return;
+ }
+
+ MergedDVContent merged = mergePositions(referencedLocation,
dvsToMerge);
Review Comment:
We'd have to make newDeleteFilesBySpec a concurrent map, which I can do but
I think there was hesitation around making state more complex. The content
class also is helpful for keeping track of the spec for a given file, which we
end up needing for the output file factory. Though if we use a dummy
unpartitioned spec for the output file factory just for this case, then we can
reduce complexity here as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]