rdblue commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2719100024


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,125 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
     return cachedNewDeleteManifests;
   }
 
+  // Merge duplicates, internally takes care of updating newDeleteFilesBySpec 
to remove
+  // duplicates and add the newly merged DV
+  private void mergeDVsAndWrite() {
+    Map<String, DeleteFileSet> dataFilesWithDuplicateDVs =
+        dvsByReferencedFile.entrySet().stream()
+            .filter(entry -> entry.getValue().size() > 1)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    List<MergedDVContent> mergedDVs = 
Collections.synchronizedList(Lists.newArrayList());
+    Tasks.foreach(dataFilesWithDuplicateDVs.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            entry -> {
+              String referencedLocation = entry.getKey();
+              DeleteFileSet duplicateDVs = entry.getValue();
+              mergedDVs.add(mergePositions(referencedLocation, duplicateDVs));
+            });
+
+    // Update newDeleteFilesBySpec to remove all the duplicates
+    mergedDVs.forEach(
+        mergedDV -> 
newDeleteFilesBySpec.get(mergedDV.specId).removeAll(mergedDV.duplicateDVs));

Review Comment:
   I recommend avoiding methods like this that modify the instance state of 
snapshot producer. That should only happen in cases where we need to cache 
results, and cached results should be held in separate instance state. For 
example, this is how `newDataFilesBySpec` and `cachedNewDataManifests` work. 
The new data files are kept organized by spec, but that state is not rewritten. 
Instead, `cachedNewDataManifests` is used in case of retry.
   
   Modifying the instance state like this hides how the class was configured by 
the caller after a merge happens, and relies on side-effects that are not 
obvious to people working with the code later.
   
   I think this should be refactored so that as many of these new methods as 
possible are static and in a `DVUtil` class rather than inline in this already 
huge class. Merging DVs is independently useful and shouldn't live here. This 
method may also be a good candidate for being a utility if you refactor it to 
pass a threadpool to `Tasks`, but it may make more sense to leave it as an 
instance method if it needs to update a merged DV cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to