This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c8fe01e71f Core: Support combining position deletes during writes
(#11222)
c8fe01e71f is described below
commit c8fe01e71f9996fcaae973c6bfaa8d90f7dd8c6c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Sep 30 21:12:56 2024 -0700
Core: Support combining position deletes during writes (#11222)
---
.../iceberg/deletes/BitmapPositionDeleteIndex.java | 25 ++++-
.../java/org/apache/iceberg/deletes/Deletes.java | 32 +++++-
.../org/apache/iceberg/deletes/PositionDelete.java | 7 ++
.../iceberg/deletes/PositionDeleteIndex.java | 15 +++
.../deletes/SortingPositionOnlyDeleteWriter.java | 58 ++++++++---
.../iceberg/io/FanoutPositionOnlyDeleteWriter.java | 28 +++++-
.../src/test/java/org/apache/iceberg/TestBase.java | 4 +
.../org/apache/iceberg/data/BaseDeleteLoader.java | 4 +-
.../apache/iceberg/io/TestPartitioningWriters.java | 111 +++++++++++++++++++++
9 files changed, 264 insertions(+), 20 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
index 3a04487856..a1b57a3866 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/BitmapPositionDeleteIndex.java
@@ -18,18 +18,35 @@
*/
package org.apache.iceberg.deletes;
+import java.util.Collection;
+import java.util.List;
import java.util.function.LongConsumer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.roaringbitmap.longlong.Roaring64Bitmap;
class BitmapPositionDeleteIndex implements PositionDeleteIndex {
private final Roaring64Bitmap roaring64Bitmap;
+ private final List<DeleteFile> deleteFiles;
BitmapPositionDeleteIndex() {
- roaring64Bitmap = new Roaring64Bitmap();
+ this.roaring64Bitmap = new Roaring64Bitmap();
+ this.deleteFiles = Lists.newArrayList();
+ }
+
+ BitmapPositionDeleteIndex(Collection<DeleteFile> deleteFiles) {
+ this.roaring64Bitmap = new Roaring64Bitmap();
+ this.deleteFiles = Lists.newArrayList(deleteFiles);
+ }
+
+ BitmapPositionDeleteIndex(DeleteFile deleteFile) {
+ this.roaring64Bitmap = new Roaring64Bitmap();
+ this.deleteFiles = deleteFile != null ? Lists.newArrayList(deleteFile) :
Lists.newArrayList();
}
void merge(BitmapPositionDeleteIndex that) {
roaring64Bitmap.or(that.roaring64Bitmap);
+ deleteFiles.addAll(that.deleteFiles);
}
@Override
@@ -48,6 +65,7 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
merge((BitmapPositionDeleteIndex) that);
} else {
that.forEach(this::delete);
+ deleteFiles.addAll(that.deleteFiles());
}
}
@@ -65,4 +83,9 @@ class BitmapPositionDeleteIndex implements
PositionDeleteIndex {
public void forEach(LongConsumer consumer) {
roaring64Bitmap.forEach(consumer::accept);
}
+
+ @Override
+ public Collection<DeleteFile> deleteFiles() {
+ return deleteFiles;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
index a72e016130..2256b378f6 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.iceberg.Accessor;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
@@ -123,6 +124,11 @@ public class Deletes {
}
}
+ public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex>
toPositionIndexes(
+ CloseableIterable<T> posDeletes) {
+ return toPositionIndexes(posDeletes, null /* unknown delete file */);
+ }
+
/**
* Builds a map of position delete indexes by path.
*
@@ -131,10 +137,11 @@ public class Deletes {
* entire delete file content is needed (e.g. caching).
*
* @param posDeletes position deletes
+ * @param file the source delete file for the deletes
* @return the map of position delete indexes by path
*/
public static <T extends StructLike> CharSequenceMap<PositionDeleteIndex>
toPositionIndexes(
- CloseableIterable<T> posDeletes) {
+ CloseableIterable<T> posDeletes, DeleteFile file) {
CharSequenceMap<PositionDeleteIndex> indexes = CharSequenceMap.create();
try (CloseableIterable<T> deletes = posDeletes) {
@@ -142,7 +149,7 @@ public class Deletes {
CharSequence filePath = (CharSequence) FILENAME_ACCESSOR.get(delete);
long position = (long) POSITION_ACCESSOR.get(delete);
PositionDeleteIndex index =
- indexes.computeIfAbsent(filePath, key -> new
BitmapPositionDeleteIndex());
+ indexes.computeIfAbsent(filePath, key -> new
BitmapPositionDeleteIndex(file));
index.delete(position);
}
} catch (IOException e) {
@@ -152,6 +159,20 @@ public class Deletes {
return indexes;
}
+ public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
+ CharSequence dataLocation, CloseableIterable<T> posDeletes, DeleteFile
file) {
+ CloseableIterable<Long> positions = extractPositions(dataLocation,
posDeletes);
+ List<DeleteFile> files = ImmutableList.of(file);
+ return toPositionIndex(positions, files);
+ }
+
+ private static <T extends StructLike> CloseableIterable<Long>
extractPositions(
+ CharSequence dataLocation, CloseableIterable<T> rows) {
+ DataFileFilter<T> filter = new DataFileFilter<>(dataLocation);
+ CloseableIterable<T> filteredRows = filter.filter(rows);
+ return CloseableIterable.transform(filteredRows, row -> (Long)
POSITION_ACCESSOR.get(row));
+ }
+
public static <T extends StructLike> PositionDeleteIndex toPositionIndex(
CharSequence dataLocation, List<CloseableIterable<T>> deleteFiles) {
return toPositionIndex(dataLocation, deleteFiles,
ThreadPools.getDeleteWorkerPool());
@@ -176,8 +197,13 @@ public class Deletes {
}
public static PositionDeleteIndex toPositionIndex(CloseableIterable<Long>
posDeletes) {
+ return toPositionIndex(posDeletes, ImmutableList.of());
+ }
+
+ private static PositionDeleteIndex toPositionIndex(
+ CloseableIterable<Long> posDeletes, List<DeleteFile> files) {
try (CloseableIterable<Long> deletes = posDeletes) {
- PositionDeleteIndex positionDeleteIndex = new
BitmapPositionDeleteIndex();
+ PositionDeleteIndex positionDeleteIndex = new
BitmapPositionDeleteIndex(files);
deletes.forEach(positionDeleteIndex::delete);
return positionDeleteIndex;
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index 655428ce77..57e188567f 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -31,6 +31,13 @@ public class PositionDelete<R> implements StructLike {
private PositionDelete() {}
+ public PositionDelete<R> set(CharSequence newPath, long newPos) {
+ this.path = newPath;
+ this.pos = newPos;
+ this.row = null;
+ return this;
+ }
+
public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
this.path = newPath;
this.pos = newPos;
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
index c0086fe6aa..3655b8b7e8 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteIndex.java
@@ -18,7 +18,10 @@
*/
package org.apache.iceberg.deletes;
+import java.util.Collection;
import java.util.function.LongConsumer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
public interface PositionDeleteIndex {
/**
@@ -42,6 +45,9 @@ public interface PositionDeleteIndex {
* @param that the other index to merge
*/
default void merge(PositionDeleteIndex that) {
+ if (!that.deleteFiles().isEmpty()) {
+ throw new UnsupportedOperationException(getClass().getName() + " does
not support merge");
+ }
that.forEach(this::delete);
}
@@ -72,6 +78,15 @@ public interface PositionDeleteIndex {
}
}
+ /**
+ * Returns delete files that this index was created from or an empty
collection if unknown.
+ *
+ * @return delete files that this index was created from
+ */
+ default Collection<DeleteFile> deleteFiles() {
+ return ImmutableList.of();
+ }
+
/** Returns an empty immutable position delete index. */
static PositionDeleteIndex empty() {
return EmptyPositionDeleteIndex.get();
diff --git
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
index 1d4d131dfe..818529c024 100644
---
a/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
+++
b/core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java
@@ -21,17 +21,18 @@ package org.apache.iceberg.deletes;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceMap;
import org.apache.iceberg.util.CharSequenceSet;
-import org.roaringbitmap.longlong.PeekableLongIterator;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.iceberg.util.ContentFileUtil;
/**
* A position delete writer that is capable of handling unordered deletes
without rows.
@@ -41,6 +42,11 @@ import org.roaringbitmap.longlong.Roaring64Bitmap;
* records are not ordered by file and position as required by the spec. If
the incoming deletes are
* ordered by an external process, use {@link PositionDeleteWriter} instead.
*
+ * <p>If configured, this writer can also load previous deletes using the
provided function and
+ * merge them with incoming ones prior to flushing the deletes into a file.
Callers must ensure only
+ * previous file-scoped deletes are loaded because partition-scoped deletes
can apply to multiple
+ * data files and can't be safely discarded.
+ *
* <p>Note this writer stores only positions. It does not store deleted
records.
*/
public class SortingPositionOnlyDeleteWriter<T>
@@ -48,7 +54,8 @@ public class SortingPositionOnlyDeleteWriter<T>
private final Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>>
writers;
private final DeleteGranularity granularity;
- private final CharSequenceMap<Roaring64Bitmap> positionsByPath;
+ private final CharSequenceMap<PositionDeleteIndex> positionsByPath;
+ private final Function<CharSequence, PositionDeleteIndex>
loadPreviousDeletes;
private DeleteWriteResult result = null;
public SortingPositionOnlyDeleteWriter(FileWriter<PositionDelete<T>,
DeleteWriteResult> writer) {
@@ -58,17 +65,26 @@ public class SortingPositionOnlyDeleteWriter<T>
public SortingPositionOnlyDeleteWriter(
Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
DeleteGranularity granularity) {
+ this(writers, granularity, path -> null /* no access to previous deletes
*/);
+ }
+
+ public SortingPositionOnlyDeleteWriter(
+ Supplier<FileWriter<PositionDelete<T>, DeleteWriteResult>> writers,
+ DeleteGranularity granularity,
+ Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.writers = writers;
this.granularity = granularity;
this.positionsByPath = CharSequenceMap.create();
+ this.loadPreviousDeletes = loadPreviousDeletes;
}
@Override
public void write(PositionDelete<T> positionDelete) {
CharSequence path = positionDelete.path();
long position = positionDelete.pos();
- Roaring64Bitmap positions = positionsByPath.computeIfAbsent(path,
Roaring64Bitmap::new);
- positions.add(position);
+ PositionDeleteIndex positions =
+ positionsByPath.computeIfAbsent(path, key -> new
BitmapPositionDeleteIndex());
+ positions.delete(position);
}
@Override
@@ -106,14 +122,16 @@ public class SortingPositionOnlyDeleteWriter<T>
private DeleteWriteResult writeFileDeletes() throws IOException {
List<DeleteFile> deleteFiles = Lists.newArrayList();
CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+ List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();
for (CharSequence path : positionsByPath.keySet()) {
DeleteWriteResult writeResult = writeDeletes(ImmutableList.of(path));
deleteFiles.addAll(writeResult.deleteFiles());
referencedDataFiles.addAll(writeResult.referencedDataFiles());
+ rewrittenDeleteFiles.addAll(writeResult.rewrittenDeleteFiles());
}
- return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles,
rewrittenDeleteFiles);
}
@SuppressWarnings("CollectionUndefinedEquality")
@@ -123,22 +141,38 @@ public class SortingPositionOnlyDeleteWriter<T>
}
FileWriter<PositionDelete<T>, DeleteWriteResult> writer = writers.get();
+ List<DeleteFile> rewrittenDeleteFiles = Lists.newArrayList();
try {
PositionDelete<T> positionDelete = PositionDelete.create();
for (CharSequence path : sort(paths)) {
- // the iterator provides values in ascending sorted order
- PeekableLongIterator positions =
positionsByPath.get(path).getLongIterator();
- while (positions.hasNext()) {
- long position = positions.next();
- writer.write(positionDelete.set(path, position, null /* no row */));
+ PositionDeleteIndex positions = positionsByPath.get(path);
+ PositionDeleteIndex previousPositions =
loadPreviousDeletes.apply(path);
+ if (previousPositions != null && previousPositions.isNotEmpty()) {
+ validatePreviousDeletes(previousPositions);
+ positions.merge(previousPositions);
+ rewrittenDeleteFiles.addAll(previousPositions.deleteFiles());
}
+ positions.forEach(position -> writer.write(positionDelete.set(path,
position)));
}
} finally {
writer.close();
}
- return writer.result();
+ DeleteWriteResult writerResult = writer.result();
+ List<DeleteFile> deleteFiles = writerResult.deleteFiles();
+ CharSequenceSet referencedDataFiles = writerResult.referencedDataFiles();
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles,
rewrittenDeleteFiles);
+ }
+
+ private void validatePreviousDeletes(PositionDeleteIndex index) {
+ Preconditions.checkArgument(
+ index.deleteFiles().stream().allMatch(this::isFileScoped),
+ "Previous deletes must be file-scoped");
+ }
+
+ private boolean isFileScoped(DeleteFile deleteFile) {
+ return ContentFileUtil.referencedDataFile(deleteFile) != null;
}
private Collection<CharSequence> sort(Collection<CharSequence> paths) {
diff --git
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
index c6a55064b7..9c527f4b32 100644
---
a/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
+++
b/core/src/main/java/org/apache/iceberg/io/FanoutPositionOnlyDeleteWriter.java
@@ -19,11 +19,13 @@
package org.apache.iceberg.io;
import java.util.List;
+import java.util.function.Function;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.deletes.SortingPositionOnlyDeleteWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
@@ -45,6 +47,8 @@ public class FanoutPositionOnlyDeleteWriter<T>
private final DeleteGranularity granularity;
private final List<DeleteFile> deleteFiles;
private final CharSequenceSet referencedDataFiles;
+ private final List<DeleteFile> rewrittenDeleteFiles;
+ private final Function<CharSequence, PositionDeleteIndex>
loadPreviousDeletes;
public FanoutPositionOnlyDeleteWriter(
FileWriterFactory<T> writerFactory,
@@ -60,6 +64,22 @@ public class FanoutPositionOnlyDeleteWriter<T>
FileIO io,
long targetFileSizeInBytes,
DeleteGranularity granularity) {
+ this(
+ writerFactory,
+ fileFactory,
+ io,
+ targetFileSizeInBytes,
+ granularity,
+ path -> null /* no access to previous deletes */);
+ }
+
+ public FanoutPositionOnlyDeleteWriter(
+ FileWriterFactory<T> writerFactory,
+ OutputFileFactory fileFactory,
+ FileIO io,
+ long targetFileSizeInBytes,
+ DeleteGranularity granularity,
+ Function<CharSequence, PositionDeleteIndex> loadPreviousDeletes) {
this.writerFactory = writerFactory;
this.fileFactory = fileFactory;
this.io = io;
@@ -67,6 +87,8 @@ public class FanoutPositionOnlyDeleteWriter<T>
this.granularity = granularity;
this.deleteFiles = Lists.newArrayList();
this.referencedDataFiles = CharSequenceSet.empty();
+ this.rewrittenDeleteFiles = Lists.newArrayList();
+ this.loadPreviousDeletes = loadPreviousDeletes;
}
@Override
@@ -76,17 +98,19 @@ public class FanoutPositionOnlyDeleteWriter<T>
() ->
new RollingPositionDeleteWriter<>(
writerFactory, fileFactory, io, targetFileSizeInBytes, spec,
partition),
- granularity);
+ granularity,
+ loadPreviousDeletes);
}
@Override
protected void addResult(DeleteWriteResult result) {
deleteFiles.addAll(result.deleteFiles());
referencedDataFiles.addAll(result.referencedDataFiles());
+ rewrittenDeleteFiles.addAll(result.rewrittenDeleteFiles());
}
@Override
protected DeleteWriteResult aggregatedResult() {
- return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles,
rewrittenDeleteFiles);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java
b/core/src/test/java/org/apache/iceberg/TestBase.java
index a0b52b346b..f3bbb79795 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -665,6 +665,10 @@ public class TestBase {
.build();
}
+ protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos) {
+ return positionDelete(path, pos, null /* no row */);
+ }
+
protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos,
T row) {
PositionDelete<T> positionDelete = PositionDelete.create();
return positionDelete.set(path, pos, row);
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 91b7fd1c1d..8a1ebf95ab 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -166,13 +166,13 @@ public class BaseDeleteLoader implements DeleteLoader {
private CharSequenceMap<PositionDeleteIndex> readPosDeletes(DeleteFile
deleteFile) {
CloseableIterable<Record> deletes = openDeletes(deleteFile,
POS_DELETE_SCHEMA);
- return Deletes.toPositionIndexes(deletes);
+ return Deletes.toPositionIndexes(deletes, deleteFile);
}
private PositionDeleteIndex readPosDeletes(DeleteFile deleteFile,
CharSequence filePath) {
Expression filter =
Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), filePath);
CloseableIterable<Record> deletes = openDeletes(deleteFile,
POS_DELETE_SCHEMA, filter);
- return Deletes.toPositionIndex(filePath, ImmutableList.of(deletes));
+ return Deletes.toPositionIndex(filePath, deletes, deleteFile);
}
private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema
projection) {
diff --git
a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index 8dc031314e..1c8453bd6a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -20,13 +20,17 @@ package org.apache.iceberg.io;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
@@ -34,10 +38,16 @@ import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
@@ -719,4 +729,105 @@ public abstract class TestPartitioningWriters<T> extends
WriterTestBase<T> {
List<T> expectedRows = ImmutableList.of(toRow(11, "aaa"), toRow(12,
"aaa"));
assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
}
+
+ @TestTemplate
+ public void testRewriteOfPreviousDeletes() throws IOException {
+ assumeThat(format()).isIn(FileFormat.PARQUET, FileFormat.ORC);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file
+ List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"),
toRow(11, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file
+ List<T> rows2 = ImmutableList.of(toRow(3, "aaa"), toRow(4, "aaa"),
toRow(12, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ PartitionSpec spec = table.spec();
+
+ // init the first delete writer without access to previous deletes
+ FanoutPositionOnlyDeleteWriter<T> writer1 =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory, fileFactory, table.io(), TARGET_FILE_SIZE,
DeleteGranularity.FILE);
+
+ // write initial deletes for both data files
+ writer1.write(positionDelete(dataFile1.path(), 1L), spec, null);
+ writer1.write(positionDelete(dataFile2.path(), 1L), spec, null);
+ writer1.close();
+
+ // verify the writer result
+ DeleteWriteResult result1 = writer1.result();
+ assertThat(result1.deleteFiles()).hasSize(2);
+ assertThat(result1.referencedDataFiles()).hasSize(2);
+ assertThat(result1.referencesDataFiles()).isTrue();
+ assertThat(result1.rewrittenDeleteFiles()).isEmpty();
+
+ // commit the initial deletes
+ RowDelta rowDelta1 = table.newRowDelta();
+ result1.deleteFiles().forEach(rowDelta1::addDeletes);
+ rowDelta1.commit();
+
+ // verify correctness of the first delete operation
+ List<T> expectedRows1 =
+ ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa"), toRow(11, "aaa"),
toRow(12, "aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows1));
+
+ // populate previous delete mapping
+ Map<String, DeleteFile> previousDeletes = Maps.newHashMap();
+ for (DeleteFile deleteFile : result1.deleteFiles()) {
+ String dataLocation =
ContentFileUtil.referencedDataFile(deleteFile).toString();
+ previousDeletes.put(dataLocation, deleteFile);
+ }
+
+ // init the second delete writer with access to previous deletes
+ FanoutPositionOnlyDeleteWriter<T> writer2 =
+ new FanoutPositionOnlyDeleteWriter<>(
+ writerFactory,
+ fileFactory,
+ table.io(),
+ TARGET_FILE_SIZE,
+ DeleteGranularity.FILE,
+ new PreviousDeleteLoader(table, previousDeletes));
+
+ // write more deletes for both data files
+ writer2.write(positionDelete(dataFile1.path(), 0L), spec, null);
+ writer2.write(positionDelete(dataFile2.path(), 0L), spec, null);
+ writer2.close();
+
+ // verify the writer result
+ DeleteWriteResult result2 = writer2.result();
+ assertThat(result2.deleteFiles()).hasSize(2);
+ assertThat(result2.referencedDataFiles()).hasSize(2);
+ assertThat(result2.referencesDataFiles()).isTrue();
+ assertThat(result2.rewrittenDeleteFiles()).hasSize(2);
+
+ // add new and remove rewritten delete files
+ RowDelta rowDelta2 = table.newRowDelta();
+ result2.deleteFiles().forEach(rowDelta2::addDeletes);
+ result2.rewrittenDeleteFiles().forEach(rowDelta2::removeDeletes);
+ rowDelta2.commit();
+
+ // verify correctness of the second delete operation
+ List<T> expectedRows2 = ImmutableList.of(toRow(11, "aaa"), toRow(12,
"aaa"));
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows2));
+ }
+
+ private static class PreviousDeleteLoader implements Function<CharSequence,
PositionDeleteIndex> {
+ private final Map<String, DeleteFile> deleteFiles;
+ private final DeleteLoader deleteLoader;
+
+ PreviousDeleteLoader(Table table, Map<String, DeleteFile> deleteFiles) {
+ this.deleteFiles = deleteFiles;
+ this.deleteLoader = new BaseDeleteLoader(deleteFile ->
table.io().newInputFile(deleteFile));
+ }
+
+ @Override
+ public PositionDeleteIndex apply(CharSequence path) {
+ DeleteFile deleteFile = deleteFiles.get(path);
+ return deleteLoader.loadPositionDeletes(ImmutableList.of(deleteFile),
path);
+ }
+ }
}