amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2680201703


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1060,19 +1086,119 @@ private List<ManifestFile> newDeleteFilesAsManifests() 
{
     }
 
     if (cachedNewDeleteManifests.isEmpty()) {
+      // Found duplicates, merge them and update newDeleteFilesBySpec to 
remove duplicates and add
+      // the new merged one
+      if (!duplicateDVsForDataFile.isEmpty()) {
+        Map<String, DeleteFile> mergedDVs = mergeDuplicateDVs();
+        for (Map.Entry<String, DeleteFile> mergedDV : mergedDVs.entrySet()) {
+          String referencedFile = mergedDV.getKey();
+          DeleteFile newDV = mergedDV.getValue();
+          DeleteFileSet duplicateDVs = 
duplicateDVsForDataFile.get(referencedFile);
+          DeleteFileSet allDeleteFilesForSpec = 
newDeleteFilesBySpec.get(newDV.specId());
+          allDeleteFilesForSpec.removeAll(duplicateDVs);
+          allDeleteFilesForSpec.add(newDV);
+        }
+      }
+
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops().current().spec(specId);
+            // Update summaries for all added delete files including eq. 
deletes for this partition
+            // spec
+            deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, 
file));
             List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
             cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
 
       this.hasNewDeleteFiles = false;
+      this.duplicateDVsForDataFile.clear();
     }
 
     return cachedNewDeleteManifests;
   }
 
+  // Find duplicate DVs for a given partition spec, and return a Pair of the 
new DVs and the DVs
+  // that were merged
+  private Map<String, DeleteFile> mergeDuplicateDVs() {
+    Map<String, DeleteFile> mergedDVs = Maps.newConcurrentMap();
+    Tasks.foreach(duplicateDVsForDataFile.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            dvsToMergeForDataFile -> {
+              String referencedLocation = dvsToMergeForDataFile.getKey();
+              mergedDVs.put(
+                  referencedLocation,
+                  mergeAndWriteDV(referencedLocation, 
dvsToMergeForDataFile.getValue()));
+            });
+
+    return mergedDVs;
+  }
+
+  private DeleteFile mergeAndWriteDV(String referencedDataFile, DeleteFileSet 
dvs) {
+    Iterator<DeleteFile> dvIterator = dvs.iterator();
+    DeleteFile firstDV = dvIterator.next();
+    PositionDeleteIndex positionDeleteIndex =
+        Deletes.readDV(firstDV, ops().io(), ops().encryption());
+    PartitionSpec spec = spec(firstDV.specId());
+    while (dvIterator.hasNext()) {
+      DeleteFile dv = dvIterator.next();
+      Preconditions.checkArgument(
+          Objects.equals(dv.dataSequenceNumber(), 
firstDV.dataSequenceNumber()),
+          "Cannot merge duplicate added DVs when data sequence numbers are 
different,"
+              + "expected all to be added with sequence %s, but got %s",
+          firstDV.dataSequenceNumber(),
+          dv.dataSequenceNumber());
+      Preconditions.checkArgument(
+          dv.specId() == firstDV.specId(),
+          "Cannot merge duplicate added DVs when partition specs are 
different,"
+              + "expected all to be added with spec %s, but got %s",
+          firstDV.specId(),
+          dv.specId());
+      Preconditions.checkArgument(
+          Objects.equals(dv.partition(), firstDV.partition()),
+          "Cannot merge duplicate added DVs when partition tuples are 
different");
+      positionDeleteIndex.merge(Deletes.readDV(dv, ops().io(), 
ops().encryption()));
+    }
+
+    return writeDV(
+        referencedDataFile,
+        positionDeleteIndex,
+        spec,
+        firstDV.partition(),
+        firstDV.dataSequenceNumber());
+  }
+
+  private DeleteFile writeDV(

Review Comment:
   I would group all the merged DVs and then write out the puffin, right now in 
case of duplicates we'd be producing a tiny puffin per data file with 
duplicates. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to