nastra commented on code in PR #15006:
URL: https://github.com/apache/iceberg/pull/15006#discussion_r2698501632


##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1073,6 +1088,136 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
     return cachedNewDeleteManifests;
   }
 
+  // Merge duplicates, internally takes care of updating newDeleteFilesBySpec 
to remove
+  // duplicates and add the newly merged DV
+  private void mergeDVsAndWrite() {
+    Map<Integer, List<MergedDVContent>> mergedIndicesBySpec = 
Maps.newConcurrentMap();
+    Map<String, DeleteFileSet> dataFilesWithDuplicateDVs =
+        dvsByReferencedFile.entrySet().stream()
+            .filter(entry -> entry.getValue().size() > 1)
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    Tasks.foreach(dataFilesWithDuplicateDVs.entrySet())
+        .executeWith(ThreadPools.getDeleteWorkerPool())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .run(
+            entry -> {
+              String referencedLocation = entry.getKey();
+              DeleteFileSet duplicateDVs = entry.getValue();
+              MergedDVContent merged = mergePositions(referencedLocation, 
duplicateDVs);
+              mergedIndicesBySpec
+                  .computeIfAbsent(
+                      merged.specId, spec -> 
Collections.synchronizedList(Lists.newArrayList()))
+                  .add(merged);
+            });
+
+    // Update newDeleteFilesBySpec to remove all the duplicates
+    mergedIndicesBySpec.forEach(
+        (specId, mergedDVContent) -> {
+          mergedDVContent.stream()
+              .map(content -> content.duplicateDVs)
+              .forEach(duplicateDVs -> 
newDeleteFilesBySpec.get(specId).removeAll(duplicateDVs));
+        });
+
+    writeMergedDVs(mergedIndicesBySpec);
+  }
+
+  // Produces a Puffin per partition spec containing the merged DVs for that 
spec
+  private void writeMergedDVs(Map<Integer, List<MergedDVContent>> 
mergedDVContentBySpec) {
+    Map<Integer, DeleteFileSet> mergedDVsBySpec = Maps.newHashMap();
+
+    mergedDVContentBySpec.forEach(
+        (specId, mergedDVsForSpec) -> {
+          try (DVFileWriter dvFileWriter =
+              new BaseDVFileWriter(
+                  OutputFileFactory.builderFor(ops(), spec(specId), 
FileFormat.PUFFIN, 1, 1)
+                      .build(),
+                  path -> null)) {
+
+            for (MergedDVContent mergedDV : mergedDVsForSpec) {
+              LOG.warn(
+                  "Merged {} duplicate deletion vectors for data file {} in 
table {}. The duplicate DVs are orphaned, and writers should merge DVs per file 
before committing",
+                  mergedDV.duplicateDVs.size(),
+                  mergedDV.referencedLocation,
+                  tableName);
+              dvFileWriter.delete(
+                  mergedDV.referencedLocation,
+                  mergedDV.mergedPositions,
+                  spec(mergedDV.specId),
+                  mergedDV.partition);
+            }
+
+            dvFileWriter.close();
+            DeleteWriteResult result = dvFileWriter.result();
+
+            DeleteFileSet dvsForSpec =
+                mergedDVsBySpec.computeIfAbsent(specId, k -> 
DeleteFileSet.create());
+            dvsForSpec.addAll(
+                result.deleteFiles().stream()
+                    .map(file -> Delegates.pendingDeleteFile(file, 
file.dataSequenceNumber()))
+                    .collect(Collectors.toList()));
+
+            // Add the merged DV to the delete files by spec
+            newDeleteFilesBySpec.get(specId).addAll(dvsForSpec);
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+  }
+
+  // Data class for referenced file, DVs that were merged, the merged position 
delete index,
+  // partition spec and tuple
+  private static class MergedDVContent {
+    private DeleteFileSet duplicateDVs;

Review Comment:
   looks like these can all be final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to