amogh-jahagirdar commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2678917066


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1063,16 +1112,121 @@ private List<ManifestFile> newDeleteFilesAsManifests() 
{
       newDeleteFilesBySpec.forEach(
           (specId, deleteFiles) -> {
             PartitionSpec spec = ops().current().spec(specId);
+            if (foundDuplicateDVs) {
+              mergeDVsAndUpdateDeleteFiles(deleteFiles, spec);
+            }
+
+            // Update summaries for all delete files including eq. deletes for 
this partition spec
+            deleteFiles.forEach(file -> addedFilesSummary.addedFile(spec, 
file));
+
             List<ManifestFile> newDeleteManifests = 
writeDeleteManifests(deleteFiles, spec);
             cachedNewDeleteManifests.addAll(newDeleteManifests);
           });
 
       this.hasNewDeleteFiles = false;
+      this.foundDuplicateDVs = false;
     }
 
     return cachedNewDeleteManifests;
   }
 
+  private void mergeDVsAndUpdateDeleteFiles(DeleteFileSet deleteFiles, 
PartitionSpec spec) {
+    // Filter out DVs and group them by referenced data file
+    Map<String, List<DeleteFile>> dvsByReferencedLocation =
+        deleteFiles.stream()
+            .filter(ContentFileUtil::isDV)
+            .collect(
+                Collectors.toMap(
+                    DeleteFile::referencedDataFile,
+                    Lists::newArrayList,
+                    (existingDVs, newDVs) -> {
+                      existingDVs.addAll(newDVs);
+                      return existingDVs;
+                    }));
+
+    // Merge DVs
+    Map<String, List<DeleteFile>> dvsThatNeedMerging =
+        dvsByReferencedLocation.entrySet().stream()
+            .filter(e -> e.getValue().size() > 1)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    List<DeleteFile> newDVs = Lists.newArrayList();
+    Tasks.foreach(dvsThatNeedMerging.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            dvsToMergeForDataFile ->
+                newDVs.add(
+                    mergeAndWriteDV(
+                        dvsToMergeForDataFile.getKey(), 
dvsToMergeForDataFile.getValue(), spec)));
+
+    // Remove the merged DVs from the tracking set
+    for (List<DeleteFile> dvsThatWereMerged : dvsThatNeedMerging.values()) {
+      dvsThatWereMerged.forEach(deleteFiles::remove);
+    }
+
+    // Add the new DVs to the tracking set
+    deleteFiles.addAll(newDVs);
+  }
+
+  private DeleteFile mergeAndWriteDV(
+      String referencedDataFile, List<DeleteFile> dvs, PartitionSpec spec) {
+    DeleteFile firstDV = dvs.get(0);
+    PositionDeleteIndex positionDeleteIndex =
+        Deletes.readDV(firstDV, ops().io(), ops().encryption());
+    for (int i = 1; i < dvs.size(); i++) {
+      DeleteFile dv = dvs.get(i);
+      Preconditions.checkArgument(
+          Objects.equals(dv.dataSequenceNumber(), 
firstDV.dataSequenceNumber()),
+          "Cannot merge duplicate added DVs when data sequence numbers are 
different,"
+              + "expected all to be added with sequence %s, but got %s",

Review Comment:
   Primarily a safety net, because this check is only for the case where 
duplicate DVs are added in a single commit. 
   
   The way I think about it is if someone is adding DVs in a single commit that 
are duplicate AND span multiple sequence numbers, then something is probably 
wrong (it already is just from the duplicate perspective, it's doubly wrong if 
the seq numbers are different for the duplicates) and from a merging 
perspective we'll really have no basis for inferring what the right sequence 
number to use is. So, I think it's reasonable to fail the commit in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to