This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c8fe01e71f Core: Support combining position deletes during writes 
(#11222)
c8fe01e71f is described below

commit c8fe01e71f9996fcaae973c6bfaa8d90f7dd8c6c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Sep 30 21:12:56 2024 -0700

    Core: Support combining position deletes during writes (#11222)
---
 .../iceberg/deletes/BitmapPositionDeleteIndex.java |  25 ++++-
 .../java/org/apache/iceberg/deletes/Deletes.java   |  32 +++++-
 .../org/apache/iceberg/deletes/PositionDelete.java |   7 ++
 .../iceberg/deletes/PositionDeleteIndex.java       |  15 +++
 .../deletes/SortingPositionOnlyDeleteWriter.java   |  58 ++++++++---
 .../iceberg/io/FanoutPositionOnlyDeleteWriter.java |  28 +++++-
 .../src/test/java/org/apache/iceberg/TestBase.java |   4 +
 .../org/apache/iceberg/data/BaseDeleteLoader.java  |   4 +-
 .../apache/iceberg/io/TestPartitioningWriters.java | 111 +++++++++++++++++++++
 9 files changed, 264 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 3a04487856..a1b57a3866 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -18,18 +18,35 @@
  */
 package org.apache.iceberg.deletes;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.function.LongConsumer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.roaringbitmap.longlong.Roaring64Bitmap;
 
 class BitmapPositionDeleteIndex implements PositionDeleteIndex {
   private final Roaring64Bitmap roaring64Bitmap;
+  private final List<DeleteFile> deleteFiles;
 
   BitmapPositionDeleteIndex() {
-    roaring64Bitmap = new Roaring64Bitmap();
+    this.roaring64Bitmap = new Roaring64Bitmap();
+    this.deleteFiles = Lists.newArrayList();
+  }
+
+  BitmapPositionDeleteIndex(Collection<DeleteFile> deleteFiles) {
+    this.roaring64Bitmap = new Roaring64Bitmap();
+    this.deleteFiles = Lists.newArrayList(deleteFiles);
+  }
+
+  BitmapPositionDeleteIndex(DeleteFile deleteFile) {
+    this.roaring64Bitmap = new Roaring64Bitmap();
+    this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) : 
Lists.newArrayList();
   }
 
   void merge(BitmapPositionDeleteIndex that) {
     roaring64Bitmap.or(that.roaring64Bitmap);
+    deleteFiles.addAll(that.deleteFiles);
   }
 
   @Override
@@ -48,6 +65,7 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
       merge((BitmapPositionDeleteIndex) that);
     } else {
       that.forEach(this::delete);
+      deleteFiles.addAll(that.deleteFiles());
     }
   }
 
@@ -65,4 +83,9 @@ class BitmapPositionDeleteIndex implements 
PositionDeleteIndex {
   public void forEach(LongConsumer consumer) {
     roaring64Bitmap.forEach(consumer::accept);
   }
+
+  @Override
+  public Collection<DeleteFile> deleteFiles() {
+    return deleteFiles;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java 
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index a72e016130..2256b378f6 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.iceberg.Accessor;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
@@ -123,6 +124,11 @@ public class Deletes {
     }
   }
 
+  public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> 
toPositionIndexes(
+      CloseableIterable<T> posDeletes) {
+    return toPositionIndexes(posDeletes, null /* unknown delete file */);
+  }
+
   /**
    * Builds a map of position delete indexes by path.
    *
@@ -131,10 +137,11 @@ public class Deletes {
    * entire delete file content is needed (e.g. caching).
    *
    * @param posDeletes position deletes
+   * @param file the source delete file for the deletes
    * @return the map of position delete indexes by path
    */
   public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex> 
toPositionIndexes(
-      CloseableIterable<T> posDeletes) {
+      CloseableIterable<T> posDeletes, DeleteFile file) {
     CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();
 
     try (CloseableIterable<T> deletes = posDeletes) {
@@ -142,7 +149,7 @@ public class Deletes {
         CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
         long position = (long) POSITION_ACCESSOR.get(delete);
         PositionDeleteIndex index =
-            indexes.computeIfAbsent(filePath, key -> new 
BitmapPositionDeleteIndex());
+            indexes.computeIfAbsent(filePath, key -> new 
BitmapPositionDeleteIndex(file));
         index.delete(position);
       }
     } catch (IOException e) {
@@ -152,6 +159,20 @@ public class Deletes {
     return indexes;
   }
 
+  public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
+      CharSequence dataLocation, CloseableIterable<T> posDeletes, DeleteFile 
file) {
+    CloseableIterable<Long> positions = extractPositions(dataLocation, 
posDeletes);
+    List<DeleteFile> files = ImmutableList.of(file);
+    return toPositionIndex(positions, files);
+  }
+
+  private static <T extends StructLike> CloseableIterable<Long> 
extractPositions(
+      CharSequence dataLocation, CloseableIterable<T> rows) {
+    DataFileFilter<T> filter = new DataFileFilter<>(dataLocation);
+    CloseableIterable<T> filteredRows = filter.filter(rows);
+    return CloseableIterable.transform(filteredRows, row -> (Long) 
POSITION_ACCESSOR.get(row));
+  }
+
   public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
       CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
     return toPositionIndex(dataLocation, deleteFiles, 
ThreadPools.getDeleteWorkerPool());
@@ -176,8 +197,13 @@ public class Deletes {
   }
 
   public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long> 
posDeletes) {
+    return toPositionIndex(posDeletes, ImmutableList.of());
+  }
+
+  private static PositionDeleteIndex toPositionIndex(
+      CloseableIterable<Long> posDeletes, List<DeleteFile> files) {
     try (CloseableIterable<Long> deletes = posDeletes) {
-      PositionDeleteIndex positionDeleteIndex = new 
BitmapPositionDeleteIndex();
+      PositionDeleteIndex positionDeleteIndex = new 
BitmapPositionDeleteIndex(files);
       deletes.forEach(positionDeleteIndex::delete);
       return positionDeleteIndex;
     } catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index 655428ce77..57e188567f 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -31,6 +31,13 @@ public class PositionDelete<R> implements StructLike {
 
   private PositionDelete() {}
 
+  public PositionDelete<R> set(CharSequence newPath, long newPos) {
+    this.path = newPath;
+    this.pos = newPos;
+    this.row = null;
+    return this;
+  }
+
   public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
     this.path = newPath;
     this.pos = newPos;
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java 
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index c0086fe6aa..3655b8b7e8 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -18,7 +18,10 @@
  */
 package org.apache.iceberg.deletes;
 
+import java.util.Collection;
 import java.util.function.LongConsumer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 
 public interface PositionDeleteIndex {
   /**
@@ -42,6 +45,9 @@ public interface PositionDeleteIndex {
    * @param that the other index to merge
    */
   default void merge(PositionDeleteIndex that) {
+    if (!that.deleteFiles().isEmpty()) {
+      throw new UnsupportedOperationException(getClass().getName() + " does 
not support merge");
+    }
     that.forEach(this::delete);
   }
 
@@ -72,6 +78,15 @@ public interface PositionDeleteIndex {
     }
   }
 
+  /**
+   * Returns delete files that this index was created from or an empty 
collection if unknown.
+   *
+   * @return delete files that this index was created from
+   */
+  default Collection<DeleteFile> deleteFiles() {
+    return ImmutableList.of();
+  }
+
   /** Returns an empty immutable position delete index. */
   static PositionDeleteIndex empty() {
     return EmptyPositionDeleteIndex.get();
diff --git 
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
index 1d4d131dfe..818529c024 100644
--- 
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
+++ 
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -21,17 +21,18 @@ package org.apache.iceberg.deletes;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.io.DeleteWriteResult;
 import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.util.CharSequenceMap;
 import org.apache.iceberg.util.CharSequenceSet;
-import org.roaringbitmap.longlong.PeekableLongIterator;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.iceberg.util.ContentFileUtil;
 
 /**
  * A position delete writer that is capable of handling unordered deletes 
without rows.
@@ -41,6 +42,11 @@ import org.roaringbitmap.longlong.Roaring64Bitmap;
  * records are not ordered by file and position as required by the spec. If 
the incoming deletes are
  * ordered by an external process, use {@link PositionDeleteWriter} instead.
  *
+ * <p>If configured, this writer can also load previous deletes using the 
provided function and
+ * merge them with incoming ones prior to flushing the deletes into a file. 
Callers must ensure only
+ * previous file-scoped deletes are loaded because partition-scoped deletes 
can apply to multiple
+ * data files and can't be safely discarded.
+ *
  * <p>Note this writer stores only positions. It does not store deleted 
records.
  */
 public class SortingPositionOnlyDeleteWriter<T>
@@ -48,7 +54,8 @@ public class SortingPositionOnlyDeleteWriter<T>
 
   private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> 
writers;
   private final DeleteGranularity granularity;
-  private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
+  private final CharSequenceMap<PositionDeleteIndex> positionsByPath;
+  private final Function<CharSequence, PositionDeleteIndex> 
loadPreviousDeletes;
   private DeleteWriteResult result = null;
 
   public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>, 
DeleteWriteResult> writer) {
@@ -58,17 +65,26 @@ public class SortingPositionOnlyDeleteWriter<T>
   public SortingPositionOnlyDeleteWriter(
       Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
       DeleteGranularity granularity) {
+    this(writers, granularity, path -> null /* no access to previous deletes 
*/);
+  }
+
+  public SortingPositionOnlyDeleteWriter(
+      Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+      DeleteGranularity granularity,
+      Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
     this.writers = writers;
     this.granularity = granularity;
     this.positionsByPath = CharSequenceMap.create();
+    this.loadPreviousDeletes = loadPreviousDeletes;
   }
 
   @Override
   public void write(PositionDelete<T> positionDelete) {
     CharSequence path = positionDelete.path();
     long position = positionDelete.pos();
-    Roaring64Bitmap positions = positionsByPath.computeIfAbsent(path, 
Roaring64Bitmap::new);
-    positions.add(position);
+    PositionDeleteIndex positions =
+        positionsByPath.computeIfAbsent(path, key -> new 
BitmapPositionDeleteIndex());
+    positions.delete(position);
   }
 
   @Override
@@ -106,14 +122,16 @@ public class SortingPositionOnlyDeleteWriter<T>
   private DeleteWriteResult writeFileDeletes() throws IOException {
     List<DeleteFile> deleteFiles = Lists.newArrayList();
     CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+    List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();
 
     for (CharSequence path : positionsByPath.keySet()) {
       DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
       deleteFiles.addAll(writeResult.deleteFiles());
       referencedDataFiles.addAll(writeResult.referencedDataFiles());
+      rewrittenDeleteFiles.addAll(writeResult.rewrittenDeleteFiles());
     }
 
-    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles, 
rewrittenDeleteFiles);
   }
 
   @SuppressWarnings("CollectionUndefinedEquality")
@@ -123,22 +141,38 @@ public class SortingPositionOnlyDeleteWriter<T>
     }
 
     FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();
+    List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();
 
     try {
       PositionDelete<T> positionDelete = PositionDelete.create();
       for (CharSequence path : sort(paths)) {
-        // the iterator provides values in ascending sorted order
-        PeekableLongIterator positions = 
positionsByPath.get(path).getLongIterator();
-        while (positions.hasNext()) {
-          long position = positions.next();
-          writer.write(positionDelete.set(path, position, null /* no row */));
+        PositionDeleteIndex positions = positionsByPath.get(path);
+        PositionDeleteIndex previousPositions = 
loadPreviousDeletes.apply(path);
+        if (previousPositions != null && previousPositions.isNotEmpty()) {
+          validatePreviousDeletes(previousPositions);
+          positions.merge(previousPositions);
+          rewrittenDeleteFiles.addAll(previousPositions.deleteFiles());
         }
+        positions.forEach(position -> writer.write(positionDelete.set(path, 
position)));
       }
     } finally {
       writer.close();
     }
 
-    return writer.result();
+    DeleteWriteResult writerResult = writer.result();
+    List<DeleteFile> deleteFiles = writerResult.deleteFiles();
+    CharSequenceSet referencedDataFiles = writerResult.referencedDataFiles();
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles, 
rewrittenDeleteFiles);
+  }
+
+  private void validatePreviousDeletes(PositionDeleteIndex index) {
+    Preconditions.checkArgument(
+        index.deleteFiles().stream().allMatch(this::isFileScoped),
+        "Previous deletes must be file-scoped");
+  }
+
+  private boolean isFileScoped(DeleteFile deleteFile) {
+    return ContentFileUtil.referencedDataFile(deleteFile) != null;
   }
 
   private Collection<CharSequence> sort(Collection<CharSequence> paths) {
diff --git 
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
index c6a55064b7..9c527f4b32 100644
--- 
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
+++ 
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -19,11 +19,13 @@
 package org.apache.iceberg.io;
 
 import java.util.List;
+import java.util.function.Function;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.CharSequenceSet;
@@ -45,6 +47,8 @@ public class FanoutPositionOnlyDeleteWriter<T>
   private final DeleteGranularity granularity;
   private final List<DeleteFile> deleteFiles;
   private final CharSequenceSet referencedDataFiles;
+  private final List<DeleteFile> rewrittenDeleteFiles;
+  private final Function<CharSequence, PositionDeleteIndex> 
loadPreviousDeletes;
 
   public FanoutPositionOnlyDeleteWriter(
       FileWriterFactory<T> writerFactory,
@@ -60,6 +64,22 @@ public class FanoutPositionOnlyDeleteWriter<T>
       FileIO io,
       long targetFileSizeInBytes,
       DeleteGranularity granularity) {
+    this(
+        writerFactory,
+        fileFactory,
+        io,
+        targetFileSizeInBytes,
+        granularity,
+        path -> null /* no access to previous deletes */);
+  }
+
+  public FanoutPositionOnlyDeleteWriter(
+      FileWriterFactory<T> writerFactory,
+      OutputFileFactory fileFactory,
+      FileIO io,
+      long targetFileSizeInBytes,
+      DeleteGranularity granularity,
+      Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
     this.writerFactory = writerFactory;
     this.fileFactory = fileFactory;
     this.io = io;
@@ -67,6 +87,8 @@ public class FanoutPositionOnlyDeleteWriter<T>
     this.granularity = granularity;
     this.deleteFiles = Lists.newArrayList();
     this.referencedDataFiles = CharSequenceSet.empty();
+    this.rewrittenDeleteFiles = Lists.newArrayList();
+    this.loadPreviousDeletes = loadPreviousDeletes;
   }
 
   @Override
@@ -76,17 +98,19 @@ public class FanoutPositionOnlyDeleteWriter<T>
         () ->
             new RollingPositionDeleteWriter<>(
                 writerFactory, fileFactory, io, targetFileSizeInBytes, spec, 
partition),
-        granularity);
+        granularity,
+        loadPreviousDeletes);
   }
 
   @Override
   protected void addResult(DeleteWriteResult result) {
     deleteFiles.addAll(result.deleteFiles());
     referencedDataFiles.addAll(result.referencedDataFiles());
+    rewrittenDeleteFiles.addAll(result.rewrittenDeleteFiles());
   }
 
   @Override
   protected DeleteWriteResult aggregatedResult() {
-    return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+    return new DeleteWriteResult(deleteFiles, referencedDataFiles, 
rewrittenDeleteFiles);
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java 
b/core/src/test/java/org/apache/iceberg/TestBase.java
index a0b52b346b..f3bbb79795 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -665,6 +665,10 @@ public class TestBase {
         .build();
   }
 
+  protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos) {
+    return positionDelete(path, pos, null /* no row */);
+  }
+
   protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos, 
T row) {
     PositionDelete<T> positionDelete = PositionDelete.create();
     return positionDelete.set(path, pos, row);
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 91b7fd1c1d..8a1ebf95ab 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -166,13 +166,13 @@ public class BaseDeleteLoader implements DeleteLoader {
 
   private CharSequenceMap<PositionDeleteIndex> readPosDeletes(DeleteFile 
deleteFile) {
     CloseableIterable<Record> deletes = openDeletes(deleteFile, 
POS_DELETE_SCHEMA);
-    return Deletes.toPositionIndexes(deletes);
+    return Deletes.toPositionIndexes(deletes, deleteFile);
   }
 
   private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile, 
CharSequence filePath) {
     Expression filter = 
Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
     CloseableIterable<Record> deletes = openDeletes(deleteFile, 
POS_DELETE_SCHEMA, filter);
-    return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes));
+    return Deletes.toPositionIndex(filePath, deletes, deleteFile);
   }
 
   private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema 
projection) {
diff --git 
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java 
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 8dc031314e..1c8453bd6a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -20,13 +20,17 @@ package org.apache.iceberg.io;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
@@ -34,10 +38,16 @@ import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.DeleteGranularity;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -719,4 +729,105 @@ public abstract class TestPartitioningWriters<T> extends 
WriterTestBase<T> {
     List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12, 
"aaa"));
     assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
   }
+
+  @TestTemplate
+  public void testRewriteOfPreviousDeletes() throws IOException {
+    assumeThat(format()).isIn(FileFormat.PARQUET, FileFormat.ORC);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add the first data file
+    List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), 
toRow(11, "aaa"));
+    DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile1).commit();
+
+    // add the second data file
+    List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"), 
toRow(12, "aaa"));
+    DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile2).commit();
+
+    PartitionSpec spec = table.spec();
+
+    // init the first delete writer without access to previous deletes
+    FanoutPositionOnlyDeleteWriter<T> writer1 =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE, 
DeleteGranularity.FILE);
+
+    // write initial deletes for both data files
+    writer1.write(positionDelete(dataFile1.path(), 1L), spec, null);
+    writer1.write(positionDelete(dataFile2.path(), 1L), spec, null);
+    writer1.close();
+
+    // verify the writer result
+    DeleteWriteResult result1 = writer1.result();
+    assertThat(result1.deleteFiles()).hasSize(2);
+    assertThat(result1.referencedDataFiles()).hasSize(2);
+    assertThat(result1.referencesDataFiles()).isTrue();
+    assertThat(result1.rewrittenDeleteFiles()).isEmpty();
+
+    // commit the initial deletes
+    RowDelta rowDelta1 = table.newRowDelta();
+    result1.deleteFiles().forEach(rowDelta1::addDeletes);
+    rowDelta1.commit();
+
+    // verify correctness of the first delete operation
+    List<T> expectedRows1 =
+        ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa"), toRow(11, "aaa"), 
toRow(12, "aaa"));
+    assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows1));
+
+    // populate previous delete mapping
+    Map<String, DeleteFile> previousDeletes = Maps.newHashMap();
+    for (DeleteFile deleteFile : result1.deleteFiles()) {
+      String dataLocation = 
ContentFileUtil.referencedDataFile(deleteFile).toString();
+      previousDeletes.put(dataLocation, deleteFile);
+    }
+
+    // init the second delete writer with access to previous deletes
+    FanoutPositionOnlyDeleteWriter<T> writer2 =
+        new FanoutPositionOnlyDeleteWriter<>(
+            writerFactory,
+            fileFactory,
+            table.io(),
+            TARGET_FILE_SIZE,
+            DeleteGranularity.FILE,
+            new PreviousDeleteLoader(table, previousDeletes));
+
+    // write more deletes for both data files
+    writer2.write(positionDelete(dataFile1.path(), 0L), spec, null);
+    writer2.write(positionDelete(dataFile2.path(), 0L), spec, null);
+    writer2.close();
+
+    // verify the writer result
+    DeleteWriteResult result2 = writer2.result();
+    assertThat(result2.deleteFiles()).hasSize(2);
+    assertThat(result2.referencedDataFiles()).hasSize(2);
+    assertThat(result2.referencesDataFiles()).isTrue();
+    assertThat(result2.rewrittenDeleteFiles()).hasSize(2);
+
+    // add new and remove rewritten delete files
+    RowDelta rowDelta2 = table.newRowDelta();
+    result2.deleteFiles().forEach(rowDelta2::addDeletes);
+    result2.rewrittenDeleteFiles().forEach(rowDelta2::removeDeletes);
+    rowDelta2.commit();
+
+    // verify correctness of the second delete operation
+    List<T> expectedRows2 = ImmutableList.of(toRow(11, "aaa"), toRow(12, 
"aaa"));
+    assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows2));
+  }
+
+  private static class PreviousDeleteLoader implements Function<CharSequence, 
PositionDeleteIndex> {
+    private final Map<String, DeleteFile> deleteFiles;
+    private final DeleteLoader deleteLoader;
+
+    PreviousDeleteLoader(Table table, Map<String, DeleteFile> deleteFiles) {
+      this.deleteFiles = deleteFiles;
+      this.deleteLoader = new BaseDeleteLoader(deleteFile -> 
table.io().newInputFile(deleteFile));
+    }
+
+    @Override
+    public PositionDeleteIndex apply(CharSequence path) {
+      DeleteFile deleteFile = deleteFiles.get(path);
+      return deleteLoader.loadPositionDeletes(ImmutableList.of(deleteFile), 
path);
+    }
+  }
 }

Reply via email to