This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 166edc7298 Core: Support DVs in DeleteLoader (#11481)
166edc7298 is described below

commit 166edc7298825321f677e1e70cf88d7249e8035c
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 8 18:09:49 2024 +0100

    Core: Support DVs in DeleteLoader (#11481)
---
 .../org/apache/iceberg/util/ContentFileUtil.java   |   5 +
 .../org/apache/iceberg/data/BaseDeleteLoader.java  |  82 ++++++++
 .../java/org/apache/iceberg/data/DeleteLoader.java |   5 +-
 .../java/org/apache/iceberg/io/TestDVWriters.java  | 221 ++++++++++++++++++++-
 4 files changed, 310 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java 
b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
index 9e4a65be02..beffd3a955 100644
--- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
 
@@ -94,6 +95,10 @@ public class ContentFileUtil {
     return deleteFile.format() == FileFormat.PUFFIN;
   }
 
+  public static boolean containsSingleDV(Iterable<DeleteFile> deleteFiles) {
+    return Iterables.size(deleteFiles) == 1 && Iterables.all(deleteFiles, 
ContentFileUtil::isDV);
+  }
+
   public static String dvDesc(DeleteFile deleteFile) {
     return String.format(
         "DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
index 8a1ebf95ab..796f4f92be 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
@@ -42,15 +42,20 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.DeleteSchemaUtil;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.orc.OrcRowReader;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.CharSequenceMap;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
@@ -143,9 +148,48 @@ public class BaseDeleteLoader implements DeleteLoader {
     }
   }
 
+  /**
+   * Loads the content of a deletion vector or position delete files for a 
given data file path into
+   * a position index.
+   *
+   * <p>The deletion vector is currently loaded without caching as the 
existing Puffin reader
+   * requires at least 3 requests to fetch the entire file. Caching a single 
deletion vector may
+   * only be useful when multiple data file splits are processed on the same 
node, which is unlikely
+   * as task locality is not guaranteed.
+   *
+   * <p>For position delete files, however, there is no efficient way to read 
deletes for a
+   * particular data file. Therefore, caching may be more effective as such 
delete files potentially
+   * apply to many data files, especially in unpartitioned tables and tables 
with deep partitions.
+   * If a position delete file qualifies for caching, this method will attempt 
to cache a position
+   * index for each referenced data file.
+   *
+   * @param deleteFiles a deletion vector or position delete files
+   * @param filePath the data file path for which to load deletes
+   * @return a position delete index for the provided data file path
+   */
   @Override
   public PositionDeleteIndex loadPositionDeletes(
       Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
+    if (ContentFileUtil.containsSingleDV(deleteFiles)) {
+      DeleteFile dv = Iterables.getOnlyElement(deleteFiles);
+      validateDV(dv, filePath);
+      return readDV(dv);
+    } else {
+      return getOrReadPosDeletes(deleteFiles, filePath);
+    }
+  }
+
+  private PositionDeleteIndex readDV(DeleteFile dv) {
+    LOG.trace("Opening DV file {}", dv.location());
+    InputFile inputFile = loadInputFile.apply(dv);
+    long offset = dv.contentOffset();
+    int length = dv.contentSizeInBytes().intValue();
+    byte[] bytes = readBytes(inputFile, offset, length);
+    return PositionDeleteIndex.deserialize(bytes, dv);
+  }
+
+  private PositionDeleteIndex getOrReadPosDeletes(
+      Iterable<DeleteFile> deleteFiles, CharSequence filePath) {
     Iterable<PositionDeleteIndex> deletes =
         execute(deleteFiles, deleteFile -> getOrReadPosDeletes(deleteFile, 
filePath));
     return PositionDeleteIndexUtil.merge(deletes);
@@ -259,4 +303,42 @@ public class BaseDeleteLoader implements DeleteLoader {
   private int estimateRecordSize(Schema schema) {
     return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
   }
+
+  private void validateDV(DeleteFile dv, CharSequence filePath) {
+    Preconditions.checkArgument(
+        dv.contentOffset() != null,
+        "Invalid DV, offset cannot be null: %s",
+        ContentFileUtil.dvDesc(dv));
+    Preconditions.checkArgument(
+        dv.contentSizeInBytes() != null,
+        "Invalid DV, length is null: %s",
+        ContentFileUtil.dvDesc(dv));
+    Preconditions.checkArgument(
+        dv.contentSizeInBytes() <= Integer.MAX_VALUE,
+        "Can't read DV larger than 2GB: %s",
+        dv.contentSizeInBytes());
+    Preconditions.checkArgument(
+        filePath.toString().equals(dv.referencedDataFile()),
+        "DV is expected to reference %s, not %s",
+        filePath,
+        dv.referencedDataFile());
+  }
+
+  private static byte[] readBytes(InputFile inputFile, long offset, int 
length) {
+    try (SeekableInputStream stream = inputFile.newStream()) {
+      byte[] bytes = new byte[length];
+
+      if (stream instanceof RangeReadable) {
+        RangeReadable rangeReadable = (RangeReadable) stream;
+        rangeReadable.readFully(offset, bytes);
+      } else {
+        stream.seek(offset);
+        ByteStreams.readFully(stream, bytes);
+      }
+
+      return bytes;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
 }
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java 
b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
index 07bdce6d83..0fc0b93f73 100644
--- a/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteLoader.java
@@ -35,9 +35,10 @@ public interface DeleteLoader {
   StructLikeSet loadEqualityDeletes(Iterable<DeleteFile> deleteFiles, Schema 
projection);
 
   /**
-   * Loads the content of position delete files for a given data file path 
into a position index.
+   * Loads the content of a deletion vector or position delete files for a 
given data file path into
+   * a position index.
    *
-   * @param deleteFiles position delete files
+   * @param deleteFiles a deletion vector or position delete files
    * @param filePath the data file path for which to load deletes
    * @return a position delete index for the provided data file path
    */
diff --git a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java 
b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
index ce742b1c46..23e0090ca4 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestDVWriters.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.io;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -28,17 +29,25 @@ import java.util.function.Function;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.BaseDeleteLoader;
 import org.apache.iceberg.data.DeleteLoader;
 import org.apache.iceberg.deletes.BaseDVFileWriter;
 import org.apache.iceberg.deletes.DVFileWriter;
+import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.deletes.PositionDeleteIndex;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -49,10 +58,11 @@ public abstract class TestDVWriters<T> extends 
WriterTestBase<T> {
 
   @Parameters(name = "formatVersion = {0}")
   protected static List<Object> parameters() {
-    return Arrays.asList(new Object[] {3});
+    return Arrays.asList(new Object[] {2, 3});
   }
 
   private OutputFileFactory fileFactory = null;
+  private OutputFileFactory parquetFileFactory = null;
 
   protected abstract StructLikeSet toSet(Iterable<T> records);
 
@@ -65,10 +75,14 @@ public abstract class TestDVWriters<T> extends 
WriterTestBase<T> {
   public void setupTable() throws Exception {
     this.table = create(SCHEMA, PartitionSpec.unpartitioned());
     this.fileFactory = OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PUFFIN).build();
+    this.parquetFileFactory =
+        OutputFileFactory.builderFor(table, 1, 
1).format(FileFormat.PARQUET).build();
   }
 
   @TestTemplate
   public void testBasicDVs() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
     FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
 
     // add the first data file
@@ -100,6 +114,211 @@ public abstract class TestDVWriters<T> extends 
WriterTestBase<T> {
         .contains(dataFile1.location())
         .contains(dataFile2.location());
     assertThat(result.referencesDataFiles()).isTrue();
+
+    // commit the deletes
+    commit(result);
+
+    // verify correctness
+    assertRows(ImmutableList.of(toRow(11, "aaa"), toRow(12, "aaa")));
+  }
+
+  @TestTemplate
+  public void testRewriteDVs() throws IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add a data file with 3 data records
+    List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, 
"aaa"));
+    DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile).commit();
+
+    // write the first DV
+    DVFileWriter dvWriter1 =
+        new BaseDVFileWriter(fileFactory, new PreviousDeleteLoader(table, 
ImmutableMap.of()));
+    dvWriter1.delete(dataFile.location(), 1L, table.spec(), null);
+    dvWriter1.close();
+
+    // validate the writer result
+    DeleteWriteResult result1 = dvWriter1.result();
+    assertThat(result1.deleteFiles()).hasSize(1);
+    
assertThat(result1.referencedDataFiles()).containsOnly(dataFile.location());
+    assertThat(result1.referencesDataFiles()).isTrue();
+    assertThat(result1.rewrittenDeleteFiles()).isEmpty();
+
+    // commit the first DV
+    commit(result1);
+    
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+    
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
+
+    // verify correctness after committing the first DV
+    assertRows(ImmutableList.of(toRow(1, "aaa"), toRow(3, "aaa")));
+
+    // write the second DV, merging with the first one
+    DeleteFile dv1 = Iterables.getOnlyElement(result1.deleteFiles());
+    DVFileWriter dvWriter2 =
+        new BaseDVFileWriter(
+            fileFactory,
+            new PreviousDeleteLoader(table, 
ImmutableMap.of(dataFile.location(), dv1)));
+    dvWriter2.delete(dataFile.location(), 2L, table.spec(), null);
+    dvWriter2.close();
+
+    // validate the writer result
+    DeleteWriteResult result2 = dvWriter2.result();
+    assertThat(result2.deleteFiles()).hasSize(1);
+    
assertThat(result2.referencedDataFiles()).containsOnly(dataFile.location());
+    assertThat(result2.referencesDataFiles()).isTrue();
+    assertThat(result2.rewrittenDeleteFiles()).hasSize(1);
+
+    // replace DVs
+    commit(result2);
+    
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+    
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
+
+    // verify correctness after replacing DVs
+    assertRows(ImmutableList.of(toRow(1, "aaa")));
+  }
+
+  @TestTemplate
+  public void testRewriteFileScopedPositionDeletes() throws IOException {
+    assumeThat(formatVersion).isEqualTo(2);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add a data file with 3 records
+    List<T> rows = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), toRow(3, 
"aaa"));
+    DataFile dataFile = writeData(writerFactory, parquetFileFactory, rows, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile).commit();
+
+    // add a file-scoped position delete file
+    DeleteFile deleteFile =
+        writePositionDeletes(writerFactory, 
ImmutableList.of(Pair.of(dataFile.location(), 0L)));
+    table.newRowDelta().addDeletes(deleteFile).commit();
+
+    // verify correctness after adding the file-scoped position delete
+    assertRows(ImmutableList.of(toRow(2, "aaa"), toRow(3, "aaa")));
+
+    // upgrade the table to V3 to enable DVs
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
+
+    // write a DV, merging with the file-scoped position delete
+    DVFileWriter dvWriter =
+        new BaseDVFileWriter(
+            fileFactory,
+            new PreviousDeleteLoader(table, 
ImmutableMap.of(dataFile.location(), deleteFile)));
+    dvWriter.delete(dataFile.location(), 1L, table.spec(), null);
+    dvWriter.close();
+
+    // validate the writer result
+    DeleteWriteResult result = dvWriter.result();
+    assertThat(result.deleteFiles()).hasSize(1);
+    assertThat(result.referencedDataFiles()).containsOnly(dataFile.location());
+    assertThat(result.referencesDataFiles()).isTrue();
+    assertThat(result.rewrittenDeleteFiles()).hasSize(1);
+
+    // replace the position delete file with the DV
+    commit(result);
+    
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+    
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).hasSize(1);
+
+    // verify correctness
+    assertRows(ImmutableList.of(toRow(3, "aaa")));
+  }
+
+  @TestTemplate
+  public void testApplyPartitionScopedPositionDeletes() throws IOException {
+    assumeThat(formatVersion).isEqualTo(2);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+    // add the first data file with 3 records
+    List<T> rows1 = ImmutableList.of(toRow(1, "aaa"), toRow(2, "aaa"), 
toRow(3, "aaa"));
+    DataFile dataFile1 = writeData(writerFactory, parquetFileFactory, rows1, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile1).commit();
+
+    // add the second data file with 3 records
+    List<T> rows2 = ImmutableList.of(toRow(4, "aaa"), toRow(5, "aaa"), 
toRow(6, "aaa"));
+    DataFile dataFile2 = writeData(writerFactory, parquetFileFactory, rows2, 
table.spec(), null);
+    table.newFastAppend().appendFile(dataFile2).commit();
+
+    // add a position delete file with deletes for both data files
+    DeleteFile deleteFile =
+        writePositionDeletes(
+            writerFactory,
+            ImmutableList.of(
+                Pair.of(dataFile1.location(), 0L),
+                Pair.of(dataFile1.location(), 1L),
+                Pair.of(dataFile2.location(), 0L)));
+    table.newRowDelta().addDeletes(deleteFile).commit();
+
+    // verify correctness with the position delete file
+    assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(5, "aaa"), toRow(6, 
"aaa")));
+
+    // upgrade the table to V3 to enable DVs
+    table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
+
+    // write a DV, applying old positions but keeping the position delete file 
in place
+    DVFileWriter dvWriter =
+        new BaseDVFileWriter(
+            fileFactory,
+            new PreviousDeleteLoader(table, 
ImmutableMap.of(dataFile2.location(), deleteFile)));
+    dvWriter.delete(dataFile2.location(), 1L, table.spec(), null);
+    dvWriter.close();
+
+    // validate the writer result
+    DeleteWriteResult result = dvWriter.result();
+    assertThat(result.deleteFiles()).hasSize(1);
+    
assertThat(result.referencedDataFiles()).containsOnly(dataFile2.location());
+    assertThat(result.referencesDataFiles()).isTrue();
+    assertThat(result.rewrittenDeleteFiles()).isEmpty();
+    DeleteFile dv = Iterables.getOnlyElement(result.deleteFiles());
+
+    // commit the DV, ensuring the position delete file remains
+    commit(result);
+    
assertThat(table.currentSnapshot().addedDeleteFiles(table.io())).hasSize(1);
+    
assertThat(table.currentSnapshot().removedDeleteFiles(table.io())).isEmpty();
+
+    // verify correctness with DVs and position delete files
+    assertRows(ImmutableList.of(toRow(3, "aaa"), toRow(6, "aaa")));
+
+    // verify the position delete file applies only to the data file without 
the DV
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      for (FileScanTask task : tasks) {
+        DeleteFile taskDeleteFile = Iterables.getOnlyElement(task.deletes());
+        if (task.file().location().equals(dataFile1.location())) {
+          
assertThat(taskDeleteFile.location()).isEqualTo(deleteFile.location());
+        } else {
+          assertThat(taskDeleteFile.location()).isEqualTo(dv.location());
+        }
+      }
+    }
+  }
+
+  private void commit(DeleteWriteResult result) {
+    RowDelta rowDelta = table.newRowDelta();
+    result.rewrittenDeleteFiles().forEach(rowDelta::removeDeletes);
+    result.deleteFiles().forEach(rowDelta::addDeletes);
+    rowDelta.commit();
+  }
+
+  private void assertRows(Iterable<T> expectedRows) throws IOException {
+    assertThat(actualRowSet("*")).isEqualTo(toSet(expectedRows));
+  }
+
+  private DeleteFile writePositionDeletes(
+      FileWriterFactory<T> writerFactory, List<Pair<String, Long>> deletes) 
throws IOException {
+    EncryptedOutputFile file = parquetFileFactory.newOutputFile(table.spec(), 
null);
+    PositionDeleteWriter<T> writer =
+        writerFactory.newPositionDeleteWriter(file, table.spec(), null);
+    PositionDelete<T> posDelete = PositionDelete.create();
+
+    try (PositionDeleteWriter<T> closableWriter = writer) {
+      for (Pair<String, Long> delete : deletes) {
+        closableWriter.write(posDelete.set(delete.first(), delete.second()));
+      }
+    }
+
+    return writer.toDeleteFile();
   }
 
   private static class PreviousDeleteLoader implements Function<String, 
PositionDeleteIndex> {

Reply via email to