This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a5b089aa5 Spark: Read DVs when reading from .position_deletes table 
(#11657)
2a5b089aa5 is described below

commit 2a5b089aa52b2253318985b007af951909adfade
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Dec 16 08:50:49 2024 +0100

    Spark: Read DVs when reading from .position_deletes table (#11657)
---
 .../apache/iceberg/spark/source/DVIterator.java    | 101 +++++++
 .../spark/source/PositionDeletesRowReader.java     |   5 +
 .../spark/source/TestPositionDeletesReader.java    | 301 +++++++++++++++++++++
 3 files changed, 407 insertions(+)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
new file mode 100644
index 0000000000..7b08b86cbf
--- /dev/null
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class DVIterator implements CloseableIterator<InternalRow> {
+  private final DeleteFile deleteFile;
+  private final Schema projection;
+  private final Map<Integer, ?> idToConstant;
+  private final Iterator<Long> positions;
+  private Integer deletedPositionIndex;
+  private GenericInternalRow row;
+
+  DVIterator(
+      InputFile inputFile, DeleteFile deleteFile, Schema projection, 
Map<Integer, ?> idToConstant) {
+    this.deleteFile = deleteFile;
+    this.projection = projection;
+    this.idToConstant = idToConstant;
+    List<Long> pos = Lists.newArrayList();
+    new BaseDeleteLoader(ignored -> inputFile)
+        .loadPositionDeletes(ImmutableList.of(deleteFile), 
deleteFile.referencedDataFile())
+        .forEach(pos::add);
+    this.positions = pos.iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return positions.hasNext();
+  }
+
+  @Override
+  public InternalRow next() {
+    long position = positions.next();
+
+    if (null == row) {
+      List<Object> rowValues = Lists.newArrayList();
+      for (Types.NestedField column : projection.columns()) {
+        int fieldId = column.fieldId();
+        if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) {
+          
rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile()));
+        } else if (fieldId == MetadataColumns.DELETE_FILE_POS.fieldId()) {
+          rowValues.add(position);
+          // remember the index where the deleted position needs to be set
+          deletedPositionIndex = rowValues.size() - 1;
+        } else if (fieldId == MetadataColumns.PARTITION_COLUMN_ID) {
+          rowValues.add(idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
+        } else if (fieldId == MetadataColumns.SPEC_ID_COLUMN_ID) {
+          rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID));
+        } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) {
+          rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID));
+        }
+      }
+
+      this.row = new GenericInternalRow(rowValues.toArray());
+    } else if (null != deletedPositionIndex) {
+      // only update the deleted position if necessary, everything else stays 
the same
+      row.update(deletedPositionIndex, position);
+    }
+
+    return row;
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException("Remove is not supported");
+  }
+
+  @Override
+  public void close() {}
+}
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 1a894df291..329bcf0855 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.util.ContentFileUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -90,6 +91,10 @@ class PositionDeletesRowReader extends 
BaseRowReader<PositionDeletesScanTask>
         ExpressionUtil.extractByIdInclusive(
             task.residual(), expectedSchema(), caseSensitive(), 
Ints.toArray(nonConstantFieldIds));
 
+    if (ContentFileUtil.isDV(task.file())) {
+      return new DVIterator(inputFile, task.file(), expectedSchema(), 
idToConstant);
+    }
+
     return newIterable(
             inputFile,
             task.file().format(),
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
new file mode 100644
index 0000000000..5b876dfc57
--- /dev/null
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestPositionDeletesReader extends TestBase {
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get()), optional(2, "data", 
Types.StringType.get()));
+  private static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  private Table table;
+  private DataFile dataFile1;
+  private DataFile dataFile2;
+
+  @TempDir private Path temp;
+
+  @Parameter(index = 0)
+  private int formatVersion;
+
+  @Parameters(name = "formatVersion = {0}")
+  protected static List<Object> parameters() {
+    return ImmutableList.of(2, 3);
+  }
+
+  @BeforeEach
+  public void before() throws IOException {
+    table =
+        catalog.createTable(
+            TableIdentifier.of("default", "test"),
+            SCHEMA,
+            SPEC,
+            ImmutableMap.of(TableProperties.FORMAT_VERSION, 
String.valueOf(formatVersion)));
+
+    GenericRecord record = GenericRecord.create(table.schema());
+    List<Record> records1 = Lists.newArrayList();
+    records1.add(record.copy("id", 29, "data", "a"));
+    records1.add(record.copy("id", 43, "data", "b"));
+    records1.add(record.copy("id", 61, "data", "c"));
+    records1.add(record.copy("id", 89, "data", "d"));
+
+    List<Record> records2 = Lists.newArrayList();
+    records2.add(record.copy("id", 100, "data", "e"));
+    records2.add(record.copy("id", 121, "data", "f"));
+    records2.add(record.copy("id", 122, "data", "g"));
+
+    dataFile1 = writeDataFile(records1);
+    dataFile2 = writeDataFile(records2);
+    table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+  }
+
+  @AfterEach
+  public void after() {
+    catalog.dropTable(TableIdentifier.of("default", "test"));
+  }
+
+  @TestTemplate
+  public void readPositionDeletesTableWithNoDeleteFiles() {
+    Table positionDeletesTable =
+        catalog.loadTable(TableIdentifier.of("default", "test", 
"position_deletes"));
+
+    assertThat(positionDeletesTable.newBatchScan().planFiles()).isEmpty();
+  }
+
+  @TestTemplate
+  public void readPositionDeletesTableWithMultipleDeleteFiles() throws 
IOException {
+    Pair<DeleteFile, CharSequenceSet> posDeletes1 =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            Lists.newArrayList(
+                Pair.of(dataFile1.location(), 0L), 
Pair.of(dataFile1.location(), 1L)),
+            formatVersion);
+
+    Pair<DeleteFile, CharSequenceSet> posDeletes2 =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            Lists.newArrayList(
+                Pair.of(dataFile2.location(), 2L), 
Pair.of(dataFile2.location(), 3L)),
+            formatVersion);
+
+    DeleteFile deleteFile1 = posDeletes1.first();
+    DeleteFile deleteFile2 = posDeletes2.first();
+    table
+        .newRowDelta()
+        .addDeletes(deleteFile1)
+        .addDeletes(deleteFile2)
+        .validateDataFilesExist(posDeletes1.second())
+        .validateDataFilesExist(posDeletes2.second())
+        .commit();
+
+    Table positionDeletesTable =
+        catalog.loadTable(TableIdentifier.of("default", "test", 
"position_deletes"));
+
+    Schema projectedSchema =
+        positionDeletesTable
+            .schema()
+            .select(
+                MetadataColumns.DELETE_FILE_PATH.name(),
+                MetadataColumns.DELETE_FILE_POS.name(),
+                PositionDeletesTable.DELETE_FILE_PATH);
+
+    List<ScanTask> scanTasks =
+        Lists.newArrayList(
+            
positionDeletesTable.newBatchScan().project(projectedSchema).planFiles());
+    assertThat(scanTasks).hasSize(2);
+
+    assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class);
+    PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask) 
scanTasks.get(0);
+
+    try (PositionDeletesRowReader reader =
+        new PositionDeletesRowReader(
+            table,
+            new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
+            positionDeletesTable.schema(),
+            projectedSchema,
+            false)) {
+      List<InternalRow> actualRows = Lists.newArrayList();
+      while (reader.next()) {
+        actualRows.add(reader.get().copy());
+      }
+
+      String dataFileLocation =
+          formatVersion >= 3 ? deleteFile1.referencedDataFile() : 
dataFile1.location();
+      Object[] first = {
+        UTF8String.fromString(dataFileLocation), 0L, 
UTF8String.fromString(deleteFile1.location())
+      };
+      Object[] second = {
+        UTF8String.fromString(dataFileLocation), 1L, 
UTF8String.fromString(deleteFile1.location())
+      };
+      assertThat(internalRowsToJava(actualRows, projectedSchema))
+          .hasSize(2)
+          .containsExactly(first, second);
+    }
+
+    assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class);
+    PositionDeletesScanTask scanTask2 = (PositionDeletesScanTask) 
scanTasks.get(1);
+    try (PositionDeletesRowReader reader =
+        new PositionDeletesRowReader(
+            table,
+            new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
+            positionDeletesTable.schema(),
+            projectedSchema,
+            false)) {
+      List<InternalRow> actualRows = Lists.newArrayList();
+      while (reader.next()) {
+        actualRows.add(reader.get().copy());
+      }
+
+      String dataFileLocation =
+          formatVersion >= 3 ? deleteFile2.referencedDataFile() : 
dataFile2.location();
+      Object[] first = {
+        UTF8String.fromString(dataFileLocation), 2L, 
UTF8String.fromString(deleteFile2.location())
+      };
+      Object[] second = {
+        UTF8String.fromString(dataFileLocation), 3L, 
UTF8String.fromString(deleteFile2.location())
+      };
+      assertThat(internalRowsToJava(actualRows, projectedSchema))
+          .hasSize(2)
+          .containsExactly(first, second);
+    }
+  }
+
+  @TestTemplate
+  public void readPositionDeletesTableWithDifferentColumnOrdering() throws 
IOException {
+    Pair<DeleteFile, CharSequenceSet> posDeletes1 =
+        FileHelpers.writeDeleteFile(
+            table,
+            Files.localOutput(File.createTempFile("junit", null, 
temp.toFile())),
+            TestHelpers.Row.of(0),
+            Lists.newArrayList(
+                Pair.of(dataFile1.location(), 0L), 
Pair.of(dataFile1.location(), 1L)),
+            formatVersion);
+
+    DeleteFile deleteFile1 = posDeletes1.first();
+    table
+        .newRowDelta()
+        .addDeletes(deleteFile1)
+        .validateDataFilesExist(posDeletes1.second())
+        .commit();
+
+    Table positionDeletesTable =
+        catalog.loadTable(TableIdentifier.of("default", "test", 
"position_deletes"));
+
+    // select a few fields in backwards order
+    Schema projectedSchema =
+        new Schema(MetadataColumns.DELETE_FILE_POS, 
MetadataColumns.DELETE_FILE_PATH);
+
+    List<ScanTask> scanTasks =
+        Lists.newArrayList(
+            
positionDeletesTable.newBatchScan().project(projectedSchema).planFiles());
+    assertThat(scanTasks).hasSize(1);
+
+    assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class);
+    PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask) 
scanTasks.get(0);
+
+    try (PositionDeletesRowReader reader =
+        new PositionDeletesRowReader(
+            table,
+            new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
+            positionDeletesTable.schema(),
+            projectedSchema,
+            false)) {
+      List<InternalRow> actualRows = Lists.newArrayList();
+      while (reader.next()) {
+        actualRows.add(reader.get().copy());
+      }
+
+      assertThat(internalRowsToJava(actualRows, projectedSchema))
+          .hasSize(2)
+          .containsExactly(
+              new Object[] {0L, UTF8String.fromString(dataFile1.location())},
+              new Object[] {1L, UTF8String.fromString(dataFile1.location())});
+    }
+  }
+
+  private DataFile writeDataFile(List<Record> records) throws IOException {
+    return FileHelpers.writeDataFile(
+        table,
+        Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
+        TestHelpers.Row.of(0),
+        records);
+  }
+
+  private List<Object[]> internalRowsToJava(List<InternalRow> rows, Schema 
projection) {
+    return rows.stream().map(row -> toJava(row, 
projection)).collect(Collectors.toList());
+  }
+
+  private Object[] toJava(InternalRow row, Schema projection) {
+    Object[] values = new Object[row.numFields()];
+    for (int i = 0; i < projection.columns().size(); i++) {
+      values[i] = row.get(i, 
SparkSchemaUtil.convert(projection.columns().get(i).type()));
+    }
+    return values;
+  }
+}

Reply via email to