This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2a5b089aa5 Spark: Read DVs when reading from .position_deletes table
(#11657)
2a5b089aa5 is described below
commit 2a5b089aa52b2253318985b007af951909adfade
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Dec 16 08:50:49 2024 +0100
Spark: Read DVs when reading from .position_deletes table (#11657)
---
.../apache/iceberg/spark/source/DVIterator.java | 101 +++++++
.../spark/source/PositionDeletesRowReader.java | 5 +
.../spark/source/TestPositionDeletesReader.java | 301 +++++++++++++++++++++
3 files changed, 407 insertions(+)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
new file mode 100644
index 0000000000..7b08b86cbf
--- /dev/null
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseDeleteLoader;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class DVIterator implements CloseableIterator<InternalRow> {
+ private final DeleteFile deleteFile;
+ private final Schema projection;
+ private final Map<Integer, ?> idToConstant;
+ private final Iterator<Long> positions;
+ private Integer deletedPositionIndex;
+ private GenericInternalRow row;
+
+ DVIterator(
+ InputFile inputFile, DeleteFile deleteFile, Schema projection,
Map<Integer, ?> idToConstant) {
+ this.deleteFile = deleteFile;
+ this.projection = projection;
+ this.idToConstant = idToConstant;
+ List<Long> pos = Lists.newArrayList();
+ new BaseDeleteLoader(ignored -> inputFile)
+ .loadPositionDeletes(ImmutableList.of(deleteFile),
deleteFile.referencedDataFile())
+ .forEach(pos::add);
+ this.positions = pos.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return positions.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ long position = positions.next();
+
+ if (null == row) {
+ List<Object> rowValues = Lists.newArrayList();
+ for (Types.NestedField column : projection.columns()) {
+ int fieldId = column.fieldId();
+ if (fieldId == MetadataColumns.DELETE_FILE_PATH.fieldId()) {
+
rowValues.add(UTF8String.fromString(deleteFile.referencedDataFile()));
+ } else if (fieldId == MetadataColumns.DELETE_FILE_POS.fieldId()) {
+ rowValues.add(position);
+ // remember the index where the deleted position needs to be set
+ deletedPositionIndex = rowValues.size() - 1;
+ } else if (fieldId == MetadataColumns.PARTITION_COLUMN_ID) {
+ rowValues.add(idToConstant.get(MetadataColumns.PARTITION_COLUMN_ID));
+ } else if (fieldId == MetadataColumns.SPEC_ID_COLUMN_ID) {
+ rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID));
+ } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) {
+ rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID));
+ }
+ }
+
+ this.row = new GenericInternalRow(rowValues.toArray());
+ } else if (null != deletedPositionIndex) {
+ // only update the deleted position if necessary, everything else stays
the same
+ row.update(deletedPositionIndex, position);
+ }
+
+ return row;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Remove is not supported");
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
index 1a894df291..329bcf0855 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
+import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.rdd.InputFileBlockHolder;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -90,6 +91,10 @@ class PositionDeletesRowReader extends
BaseRowReader<PositionDeletesScanTask>
ExpressionUtil.extractByIdInclusive(
task.residual(), expectedSchema(), caseSensitive(),
Ints.toArray(nonConstantFieldIds));
+ if (ContentFileUtil.isDV(task.file())) {
+ return new DVIterator(inputFile, task.file(), expectedSchema(),
idToConstant);
+ }
+
return newIterable(
inputFile,
task.file().format(),
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
new file mode 100644
index 0000000000..5b876dfc57
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.TestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestPositionDeletesReader extends TestBase {
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()), optional(2, "data",
Types.StringType.get()));
+ private static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+ private Table table;
+ private DataFile dataFile1;
+ private DataFile dataFile2;
+
+ @TempDir private Path temp;
+
+ @Parameter(index = 0)
+ private int formatVersion;
+
+ @Parameters(name = "formatVersion = {0}")
+ protected static List<Object> parameters() {
+ return ImmutableList.of(2, 3);
+ }
+
+ @BeforeEach
+ public void before() throws IOException {
+ table =
+ catalog.createTable(
+ TableIdentifier.of("default", "test"),
+ SCHEMA,
+ SPEC,
+ ImmutableMap.of(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion)));
+
+ GenericRecord record = GenericRecord.create(table.schema());
+ List<Record> records1 = Lists.newArrayList();
+ records1.add(record.copy("id", 29, "data", "a"));
+ records1.add(record.copy("id", 43, "data", "b"));
+ records1.add(record.copy("id", 61, "data", "c"));
+ records1.add(record.copy("id", 89, "data", "d"));
+
+ List<Record> records2 = Lists.newArrayList();
+ records2.add(record.copy("id", 100, "data", "e"));
+ records2.add(record.copy("id", 121, "data", "f"));
+ records2.add(record.copy("id", 122, "data", "g"));
+
+ dataFile1 = writeDataFile(records1);
+ dataFile2 = writeDataFile(records2);
+ table.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+ }
+
+ @AfterEach
+ public void after() {
+ catalog.dropTable(TableIdentifier.of("default", "test"));
+ }
+
+ @TestTemplate
+ public void readPositionDeletesTableWithNoDeleteFiles() {
+ Table positionDeletesTable =
+ catalog.loadTable(TableIdentifier.of("default", "test",
"position_deletes"));
+
+ assertThat(positionDeletesTable.newBatchScan().planFiles()).isEmpty();
+ }
+
+ @TestTemplate
+ public void readPositionDeletesTableWithMultipleDeleteFiles() throws
IOException {
+ Pair<DeleteFile, CharSequenceSet> posDeletes1 =
+ FileHelpers.writeDeleteFile(
+ table,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(0),
+ Lists.newArrayList(
+ Pair.of(dataFile1.location(), 0L),
Pair.of(dataFile1.location(), 1L)),
+ formatVersion);
+
+ Pair<DeleteFile, CharSequenceSet> posDeletes2 =
+ FileHelpers.writeDeleteFile(
+ table,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(0),
+ Lists.newArrayList(
+ Pair.of(dataFile2.location(), 2L),
Pair.of(dataFile2.location(), 3L)),
+ formatVersion);
+
+ DeleteFile deleteFile1 = posDeletes1.first();
+ DeleteFile deleteFile2 = posDeletes2.first();
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1)
+ .addDeletes(deleteFile2)
+ .validateDataFilesExist(posDeletes1.second())
+ .validateDataFilesExist(posDeletes2.second())
+ .commit();
+
+ Table positionDeletesTable =
+ catalog.loadTable(TableIdentifier.of("default", "test",
"position_deletes"));
+
+ Schema projectedSchema =
+ positionDeletesTable
+ .schema()
+ .select(
+ MetadataColumns.DELETE_FILE_PATH.name(),
+ MetadataColumns.DELETE_FILE_POS.name(),
+ PositionDeletesTable.DELETE_FILE_PATH);
+
+ List<ScanTask> scanTasks =
+ Lists.newArrayList(
+
positionDeletesTable.newBatchScan().project(projectedSchema).planFiles());
+ assertThat(scanTasks).hasSize(2);
+
+ assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class);
+ PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask)
scanTasks.get(0);
+
+ try (PositionDeletesRowReader reader =
+ new PositionDeletesRowReader(
+ table,
+ new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
+ positionDeletesTable.schema(),
+ projectedSchema,
+ false)) {
+ List<InternalRow> actualRows = Lists.newArrayList();
+ while (reader.next()) {
+ actualRows.add(reader.get().copy());
+ }
+
+ String dataFileLocation =
+ formatVersion >= 3 ? deleteFile1.referencedDataFile() :
dataFile1.location();
+ Object[] first = {
+ UTF8String.fromString(dataFileLocation), 0L,
UTF8String.fromString(deleteFile1.location())
+ };
+ Object[] second = {
+ UTF8String.fromString(dataFileLocation), 1L,
UTF8String.fromString(deleteFile1.location())
+ };
+ assertThat(internalRowsToJava(actualRows, projectedSchema))
+ .hasSize(2)
+ .containsExactly(first, second);
+ }
+
+ assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class);
+ PositionDeletesScanTask scanTask2 = (PositionDeletesScanTask)
scanTasks.get(1);
+ try (PositionDeletesRowReader reader =
+ new PositionDeletesRowReader(
+ table,
+ new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask2)),
+ positionDeletesTable.schema(),
+ projectedSchema,
+ false)) {
+ List<InternalRow> actualRows = Lists.newArrayList();
+ while (reader.next()) {
+ actualRows.add(reader.get().copy());
+ }
+
+ String dataFileLocation =
+ formatVersion >= 3 ? deleteFile2.referencedDataFile() :
dataFile2.location();
+ Object[] first = {
+ UTF8String.fromString(dataFileLocation), 2L,
UTF8String.fromString(deleteFile2.location())
+ };
+ Object[] second = {
+ UTF8String.fromString(dataFileLocation), 3L,
UTF8String.fromString(deleteFile2.location())
+ };
+ assertThat(internalRowsToJava(actualRows, projectedSchema))
+ .hasSize(2)
+ .containsExactly(first, second);
+ }
+ }
+
+ @TestTemplate
+ public void readPositionDeletesTableWithDifferentColumnOrdering() throws
IOException {
+ Pair<DeleteFile, CharSequenceSet> posDeletes1 =
+ FileHelpers.writeDeleteFile(
+ table,
+ Files.localOutput(File.createTempFile("junit", null,
temp.toFile())),
+ TestHelpers.Row.of(0),
+ Lists.newArrayList(
+ Pair.of(dataFile1.location(), 0L),
Pair.of(dataFile1.location(), 1L)),
+ formatVersion);
+
+ DeleteFile deleteFile1 = posDeletes1.first();
+ table
+ .newRowDelta()
+ .addDeletes(deleteFile1)
+ .validateDataFilesExist(posDeletes1.second())
+ .commit();
+
+ Table positionDeletesTable =
+ catalog.loadTable(TableIdentifier.of("default", "test",
"position_deletes"));
+
+ // select a few fields in backwards order
+ Schema projectedSchema =
+ new Schema(MetadataColumns.DELETE_FILE_POS,
MetadataColumns.DELETE_FILE_PATH);
+
+ List<ScanTask> scanTasks =
+ Lists.newArrayList(
+
positionDeletesTable.newBatchScan().project(projectedSchema).planFiles());
+ assertThat(scanTasks).hasSize(1);
+
+ assertThat(scanTasks.get(0)).isInstanceOf(PositionDeletesScanTask.class);
+ PositionDeletesScanTask scanTask1 = (PositionDeletesScanTask)
scanTasks.get(0);
+
+ try (PositionDeletesRowReader reader =
+ new PositionDeletesRowReader(
+ table,
+ new BaseScanTaskGroup<>(null, ImmutableList.of(scanTask1)),
+ positionDeletesTable.schema(),
+ projectedSchema,
+ false)) {
+ List<InternalRow> actualRows = Lists.newArrayList();
+ while (reader.next()) {
+ actualRows.add(reader.get().copy());
+ }
+
+ assertThat(internalRowsToJava(actualRows, projectedSchema))
+ .hasSize(2)
+ .containsExactly(
+ new Object[] {0L, UTF8String.fromString(dataFile1.location())},
+ new Object[] {1L, UTF8String.fromString(dataFile1.location())});
+ }
+ }
+
+ private DataFile writeDataFile(List<Record> records) throws IOException {
+ return FileHelpers.writeDataFile(
+ table,
+ Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
+ TestHelpers.Row.of(0),
+ records);
+ }
+
+ private List<Object[]> internalRowsToJava(List<InternalRow> rows, Schema
projection) {
+ return rows.stream().map(row -> toJava(row,
projection)).collect(Collectors.toList());
+ }
+
+ private Object[] toJava(InternalRow row, Schema projection) {
+ Object[] values = new Object[row.numFields()];
+ for (int i = 0; i < projection.columns().size(); i++) {
+ values[i] = row.get(i,
SparkSchemaUtil.convert(projection.columns().get(i).type()));
+ }
+ return values;
+ }
+}