This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 166edc7298 Core: Support DVs in DeleteLoader (#11481)
166edc7298 is described below
commit 166edc7298825321f677e1e70cf88d7249e8035c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 8 18:09:49 2024 +0100
Core: Support DVs in DeleteLoader (#11481)
---
.../org/apache/iceberg/util/ContentFileUtil.java | 5 +
.../org/apache/iceberg/data/BaseDeleteLoader.java | 82 ++++++++
.../java/org/apache/iceberg/data/DeleteLoader.java | 5 +-
.../java/org/apache/iceberg/io/TestDVWriters.java | 221 ++++++++++++++++++++-
4 files changed, 310 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
index 9e4a65be02..beffd3a955 100644
--- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -94,6 +95,10 @@ public class ContentFileUtil {
return deleteFile.format() == FileFormat.PUFFIN;
}
+ public static boolean containsSingleDV(Iterable<DeleteFile> deleteFiles) {
+ return Iterables.size(deleteFiles) == 1 && Iterables.all(deleteFiles,
ContentFileUtil::isDV);
+ }
+
public static String dvDesc(DeleteFile deleteFile) {
return String.format(
"DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 8a1ebf95ab..796f4f92be 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -42,15 +42,20 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.OrcRowReader;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
@@ -143,9 +148,48 @@ public class BaseDeleteLoader implements DeleteLoader {
}
}
+ /**
+ * Loads the content of a deletion vector or position delete files for a
given data file path into
+ * a position index.
+ *
+ * <p>The deletion vector is currently loaded without caching as the
existing Puffin reader
+ * requires at least 3 requests to fetch the entire file. Caching a single
deletion vector may
+ * only be useful when multiple data file splits are processed on the same
node, which is unlikely
+ * as task locality is not guaranteed.
+ *
+ * <p>For position delete files, however, there is no efficient way to read
deletes for a
+ * particular data file. Therefore, caching may be more effective as such
delete files potentially
+ * apply to many data files, especially in unpartitioned tables and tables
with deep partitions.
+ * If a position delete file qualifies for caching, this method will attempt
to cache a position
+ * index for each referenced data file.
+ *
+ * @param deleteFiles a deletion vector or position delete files
+ * @param filePath the data file path for which to load deletes
+ * @return a position delete index for the provided data file path
+ */
@Override
public PositionDeleteIndex loadPositionDeletes(
Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
+ if (ContentFileUtil.containsSingleDV(deleteFiles)) {
+ DeleteFile dv = Iterables.getOnlyElement(deleteFiles);
+ validateDV(dv, filePath);
+ return readDV(dv);
+ } else {
+ return getOrReadPosDeletes(deleteFiles, filePath);
+ }
+ }
+
+ private PositionDeleteIndex readDV(DeleteFile dv) {
+ LOG.trace("Opening DV file {}", dv.location());
+ InputFile inputFile = loadInputFile.apply(dv);
+ long offset = dv.contentOffset();
+ int length = dv.contentSizeInBytes().intValue();
+ byte[] bytes = readBytes(inputFile, offset, length);
+ return PositionDeleteIndex.deserialize(bytes, dv);
+ }
+
+ private PositionDeleteIndex getOrReadPosDeletes(
+ Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
Iterable<PositionDeleteIndex> deletes =
execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile,
filePath));
return PositionDeleteIndexUtil.merge(deletes);
@@ -259,4 +303,42 @@ public class BaseDeleteLoader implements DeleteLoader {
private int estimateRecordSize(Schema schema) {
return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
}
+
+ private void validateDV(DeleteFile dv, CharSequence filePath) {
+ Preconditions.checkArgument(
+ dv.contentOffset() != null,
+ "Invalid DV, offset cannot be null: %s",
+ ContentFileUtil.dvDesc(dv));
+ Preconditions.checkArgument(
+ dv.contentSizeInBytes() != null,
+ "Invalid DV, length is null: %s",
+ ContentFileUtil.dvDesc(dv));
+ Preconditions.checkArgument(
+ dv.contentSizeInBytes() <= Integer.MAX_VALUE,
+ "Can't read DV larger than 2GB: %s",
+ dv.contentSizeInBytes());
+ Preconditions.checkArgument(
+ filePath.toString().equals(dv.referencedDataFile()),
+ "DV is expected to reference %s, not %s",
+ filePath,
+ dv.referencedDataFile());
+ }
+
+ private static byte[] readBytes(InputFile inputFile, long offset, int
length) {
+ try (SeekableInputStream stream = inputFile.newStream()) {
+ byte[] bytes = new byte[length];
+
+ if (stream instanceof RangeReadable) {
+ RangeReadable rangeReadable = (RangeReadable) stream;
+ rangeReadable.readFully(offset, bytes);
+ } else {
+ stream.seek(offset);
+ ByteStreams.readFully(stream, bytes);
+ }
+
+ return bytes;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
index 07bdce6d83..0fc0b93f73 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
@@ -35,9 +35,10 @@ public interface DeleteLoader {
StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, Schema
projection);
/**
- * Loads the content of position delete files for a given data file path
into a position index.
+ * Loads the content of a deletion vector or position delete files for a
given data file path into
+ * a position index.
*
- * @param deleteFiles position delete files
+ * @param deleteFiles a deletion vector or position delete files
* @param filePath the data file path for which to load deletes
* @return a position delete index for the provided data file path
*/
diff --git a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
index ce742b1c46..23e0090ca4 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.io;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
import java.util.Arrays;
@@ -28,17 +29,25 @@ import java.util.function.Function;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.BaseDeleteLoader;
import org.apache.iceberg.data.DeleteLoader;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
@@ -49,10 +58,11 @@ public abstract class TestDVWriters<T> extends
WriterTestBase<T> {
@Parameters(name = "formatVersion = {0}")
protected static List<Object> parameters() {
- return Arrays.asList(new Object[] {3});
+ return Arrays.asList(new Object[] {2, 3});
}
private OutputFileFactory fileFactory = null;
+ private OutputFileFactory parquetFileFactory = null;
protected abstract StructLikeSet toSet(Iterable<T> records);
@@ -65,10 +75,14 @@ public abstract class TestDVWriters<T> extends
WriterTestBase<T> {
public void setupTable() throws Exception {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.fileFactory = OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PUFFIN).build();
+ this.parquetFileFactory =
+ OutputFileFactory.builderFor(table, 1,
1).format(FileFormat.PARQUET).build();
}
@TestTemplate
public void testBasicDVs() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
// add the first data file
@@ -100,6 +114,211 @@ public abstract class TestDVWriters<T> extends
WriterTestBase<T> {
.contains(dataFile1.location())
.contains(dataFile2.location());
assertThat(result.referencesDataFiles()).isTrue();
+
+ // commit the deletes
+ commit(result);
+
+ // verify correctness
+ assertRows(ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")));
+ }
+
+ @TestTemplate
+ public void testRewriteDVs() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add a data file with 3 data records
+ List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3,
"aaa"));
+ DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile).commit();
+
+ // write the first DV
+ DVFileWriter dvWriter1 =
+ new BaseDVFileWriter(fileFactory, new PreviousDeleteLoader(table,
ImmutableMap.of()));
+ dvWriter1.delete(dataFile.location(), 1L, table.spec(), null);
+ dvWriter1.close();
+
+ // validate the writer result
+ DeleteWriteResult result1 = dvWriter1.result();
+ assertThat(result1.deleteFiles()).hasSize(1);
+
assertThat(result1.referencedDataFiles()).containsOnly(dataFile.location());
+ assertThat(result1.referencesDataFiles()).isTrue();
+ assertThat(result1.rewrittenDeleteFiles()).isEmpty();
+
+ // commit the first DV
+ commit(result1);
+
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
+
+ // verify correctness after committing the first DV
+ assertRows(ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa")));
+
+ // write the second DV, merging with the first one
+ DeleteFile dv1 = Iterables.getOnlyElement(result1.deleteFiles());
+ DVFileWriter dvWriter2 =
+ new BaseDVFileWriter(
+ fileFactory,
+ new PreviousDeleteLoader(table,
ImmutableMap.of(dataFile.location(), dv1)));
+ dvWriter2.delete(dataFile.location(), 2L, table.spec(), null);
+ dvWriter2.close();
+
+ // validate the writer result
+ DeleteWriteResult result2 = dvWriter2.result();
+ assertThat(result2.deleteFiles()).hasSize(1);
+
assertThat(result2.referencedDataFiles()).containsOnly(dataFile.location());
+ assertThat(result2.referencesDataFiles()).isTrue();
+ assertThat(result2.rewrittenDeleteFiles()).hasSize(1);
+
+ // replace DVs
+ commit(result2);
+
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
+
+ // verify correctness after replacing DVs
+ assertRows(ImmutableList.of(toRow(1, "aaa")));
+ }
+
+ @TestTemplate
+ public void testRewriteFileScopedPositionDeletes() throws IOException {
+ assumeThat(formatVersion).isEqualTo(2);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add a data file with 3 records
+ List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3,
"aaa"));
+ DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile).commit();
+
+ // add a file-scoped position delete file
+ DeleteFile deleteFile =
+ writePositionDeletes(writerFactory,
ImmutableList.of(Pair.of(dataFile.location(), 0L)));
+ table.newRowDelta().addDeletes(deleteFile).commit();
+
+ // verify correctness after adding the file-scoped position delete
+ assertRows(ImmutableList.of(toRow(2, "aaa"), toRow(3, "aaa")));
+
+ // upgrade the table to V3 to enable DVs
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
+
+ // write a DV, merging with the file-scoped position delete
+ DVFileWriter dvWriter =
+ new BaseDVFileWriter(
+ fileFactory,
+ new PreviousDeleteLoader(table,
ImmutableMap.of(dataFile.location(), deleteFile)));
+ dvWriter.delete(dataFile.location(), 1L, table.spec(), null);
+ dvWriter.close();
+
+ // validate the writer result
+ DeleteWriteResult result = dvWriter.result();
+ assertThat(result.deleteFiles()).hasSize(1);
+ assertThat(result.referencedDataFiles()).containsOnly(dataFile.location());
+ assertThat(result.referencesDataFiles()).isTrue();
+ assertThat(result.rewrittenDeleteFiles()).hasSize(1);
+
+ // replace the position delete file with the DV
+ commit(result);
+
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
+
+ // verify correctness
+ assertRows(ImmutableList.of(toRow(3, "aaa")));
+ }
+
+ @TestTemplate
+ public void testApplyPartitionScopedPositionDeletes() throws IOException {
+ assumeThat(formatVersion).isEqualTo(2);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add the first data file with 3 records
+ List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"),
toRow(3, "aaa"));
+ DataFile dataFile1 = writeData(writerFactory, parquetFileFactory, rows1,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile1).commit();
+
+ // add the second data file with 3 records
+ List<T> rows2 = ImmutableList.of(toRow(4, "aaa"), toRow(5, "aaa"),
toRow(6, "aaa"));
+ DataFile dataFile2 = writeData(writerFactory, parquetFileFactory, rows2,
table.spec(), null);
+ table.newFastAppend().appendFile(dataFile2).commit();
+
+ // add a position delete file with deletes for both data files
+ DeleteFile deleteFile =
+ writePositionDeletes(
+ writerFactory,
+ ImmutableList.of(
+ Pair.of(dataFile1.location(), 0L),
+ Pair.of(dataFile1.location(), 1L),
+ Pair.of(dataFile2.location(), 0L)));
+ table.newRowDelta().addDeletes(deleteFile).commit();
+
+ // verify correctness with the position delete file
+ assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(5, "aaa"), toRow(6,
"aaa")));
+
+ // upgrade the table to V3 to enable DVs
+ table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
+
+ // write a DV, applying old positions but keeping the position delete file
in place
+ DVFileWriter dvWriter =
+ new BaseDVFileWriter(
+ fileFactory,
+ new PreviousDeleteLoader(table,
ImmutableMap.of(dataFile2.location(), deleteFile)));
+ dvWriter.delete(dataFile2.location(), 1L, table.spec(), null);
+ dvWriter.close();
+
+ // validate the writer result
+ DeleteWriteResult result = dvWriter.result();
+ assertThat(result.deleteFiles()).hasSize(1);
+
assertThat(result.referencedDataFiles()).containsOnly(dataFile2.location());
+ assertThat(result.referencesDataFiles()).isTrue();
+ assertThat(result.rewrittenDeleteFiles()).isEmpty();
+ DeleteFile dv = Iterables.getOnlyElement(result.deleteFiles());
+
+ // commit the DV, ensuring the position delete file remains
+ commit(result);
+
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
+
+ // verify correctness with DVs and position delete files
+ assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(6, "aaa")));
+
+ // verify the position delete file applies only to the data file without
the DV
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ for (FileScanTask task : tasks) {
+ DeleteFile taskDeleteFile = Iterables.getOnlyElement(task.deletes());
+ if (task.file().location().equals(dataFile1.location())) {
+
assertThat(taskDeleteFile.location()).isEqualTo(deleteFile.location());
+ } else {
+ assertThat(taskDeleteFile.location()).isEqualTo(dv.location());
+ }
+ }
+ }
+ }
+
+ private void commit(DeleteWriteResult result) {
+ RowDelta rowDelta = table.newRowDelta();
+ result.rewrittenDeleteFiles().forEach(rowDelta::removeDeletes);
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+ }
+
+ private void assertRows(Iterable<T> expectedRows) throws IOException {
+ assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+ }
+
+ private DeleteFile writePositionDeletes(
+ FileWriterFactory<T> writerFactory, List<Pair<String, Long>> deletes)
throws IOException {
+ EncryptedOutputFile file = parquetFileFactory.newOutputFile(table.spec(),
null);
+ PositionDeleteWriter<T> writer =
+ writerFactory.newPositionDeleteWriter(file, table.spec(), null);
+ PositionDelete<T> posDelete = PositionDelete.create();
+
+ try (PositionDeleteWriter<T> closableWriter = writer) {
+ for (Pair<String, Long> delete : deletes) {
+ closableWriter.write(posDelete.set(delete.first(), delete.second()));
+ }
+ }
+
+ return writer.toDeleteFile();
}
private static class PreviousDeleteLoader implements Function<String,
PositionDeleteIndex> {