This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b2f3a4ce7b Spark: Backport ORC vectorized reader to use the delete
filter (#14794)
b2f3a4ce7b is described below
commit b2f3a4ce7bcb21b966cd3928ae964cc98b9cbdf5
Author: pvary <[email protected]>
AuthorDate: Tue Dec 9 13:31:27 2025 +0100
Spark: Backport ORC vectorized reader to use the delete filter (#14794)
backports #14746
---
.../data/vectorized/VectorizedSparkOrcReaders.java | 4 +-
.../org/apache/iceberg/spark/data/TestHelpers.java | 71 ++++++++++++++++++++
.../data/TestSparkOrcReadMetadataColumns.java | 54 ++++++++++++---
.../data/TestSparkParquetReadMetadataColumns.java | 78 ++--------------------
.../data/vectorized/VectorizedSparkOrcReaders.java | 4 +-
.../org/apache/iceberg/spark/data/TestHelpers.java | 71 ++++++++++++++++++++
.../data/TestSparkOrcReadMetadataColumns.java | 54 ++++++++++++---
.../data/TestSparkParquetReadMetadataColumns.java | 78 ++--------------------
8 files changed, 250 insertions(+), 164 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
index 76e0ee3811..8dceb075e6 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
@@ -462,7 +462,9 @@ public class VectorizedSparkOrcReaders {
} else if (field.equals(MetadataColumns.ROW_POSITION)) {
fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
} else if (field.equals(MetadataColumns.IS_DELETED)) {
- fieldVectors.add(new ConstantColumnVector(field.type(), batchSize,
false));
+ DeletedColumnVector deletedVector = new
DeletedColumnVector(field.type());
+ deletedVector.setValue(new boolean[batchSize]);
+ fieldVectors.add(deletedVector);
} else {
fieldVectors.add(
fieldConverters
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 120d6eeb17..72f9a36609 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,10 +52,16 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
@@ -895,4 +901,69 @@ public class TestHelpers {
public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
return
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
}
+
+ public static class CustomizedDeleteFilter extends DeleteFilter<InternalRow>
{
+ private final boolean hasDeletes;
+
+ protected CustomizedDeleteFilter(
+ boolean hasDeletes, Schema tableSchema, Schema projectedSchema) {
+ super("", List.of(), tableSchema, projectedSchema, new DeleteCounter(),
true);
+ this.hasDeletes = hasDeletes;
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow record) {
+ return null;
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return null;
+ }
+
+ @Override
+ public boolean hasPosDeletes() {
+ return hasDeletes;
+ }
+
+ @Override
+ public PositionDeleteIndex deletedRowPositions() {
+ PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+ if (hasDeletes) {
+ deletedRowPos.delete(98, 103);
+ }
+
+ return deletedRowPos;
+ }
+ }
+
+ public static class CustomizedPositionDeleteIndex implements
PositionDeleteIndex {
+ private final Set<Long> deleteIndex;
+
+ private CustomizedPositionDeleteIndex() {
+ deleteIndex = Sets.newHashSet();
+ }
+
+ @Override
+ public void delete(long position) {
+ deleteIndex.add(position);
+ }
+
+ @Override
+ public void delete(long posStart, long posEnd) {
+ for (long l = posStart; l < posEnd; l++) {
+ delete(l);
+ }
+ }
+
+ @Override
+ public boolean isDeleted(long position) {
+ return deleteIndex.contains(position);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return deleteIndex.isEmpty();
+ }
+ }
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 9d725250d3..13acaa1e3a 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -46,6 +48,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.orc.OrcConf;
@@ -74,7 +77,11 @@ public class TestSparkOrcReadMetadataColumns {
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED);
+ private static final DeleteFilter<InternalRow> NO_DELETES_FILTER =
+ new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA,
PROJECTION_SCHEMA);
+
private static final int NUM_ROWS = 1000;
+ private static final int RECORDS_PER_BATCH = 10;
private static final List<InternalRow> DATA_ROWS;
private static final List<InternalRow> EXPECTED_ROWS;
@@ -128,13 +135,35 @@ public class TestSparkOrcReadMetadataColumns {
@TestTemplate
public void testReadRowNumbers() throws IOException {
- readAndValidate(null, null, null, EXPECTED_ROWS);
+ readAndValidate(null, null, null, EXPECTED_ROWS, NO_DELETES_FILTER);
+ }
+
+ @TestTemplate
+ public void testReadRowNumbersWithDelete() throws IOException {
+ assumeThat(vectorized).isTrue();
+
+ List<InternalRow> expectedRowsAfterDelete = Lists.newArrayList();
+ EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
+ // remove row at position 98, 99, 100, 101, 102, this crosses two row
groups [0, 100) and [100,
+ // 200)
+ for (int i = 98; i <= 102; i++) {
+ expectedRowsAfterDelete.get(i).update(3, true);
+ }
+
+ DeleteFilter<InternalRow> deleteFilter =
+ new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA,
PROJECTION_SCHEMA);
+
+ readAndValidate(null, null, null, expectedRowsAfterDelete, deleteFilter);
}
@TestTemplate
public void testReadRowNumbersWithFilter() throws IOException {
readAndValidate(
- Expressions.greaterThanOrEqual("id", 500), null, null,
EXPECTED_ROWS.subList(500, 1000));
+ Expressions.greaterThanOrEqual("id", 500),
+ null,
+ null,
+ EXPECTED_ROWS.subList(500, 1000),
+ NO_DELETES_FILTER);
}
@TestTemplate
@@ -157,12 +186,17 @@ public class TestSparkOrcReadMetadataColumns {
null,
splitOffsets.get(i),
splitLengths.get(i),
- EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
+ EXPECTED_ROWS.subList(i * 100, (i + 1) * 100),
+ NO_DELETES_FILTER);
}
}
private void readAndValidate(
- Expression filter, Long splitStart, Long splitLength, List<InternalRow>
expected)
+ Expression filter,
+ Long splitStart,
+ Long splitLength,
+ List<InternalRow> expected,
+ DeleteFilter<InternalRow> deleteFilter)
throws IOException {
Schema projectionWithoutMetadataFields =
TypeUtil.selectNot(PROJECTION_SCHEMA,
MetadataColumns.metadataFieldIds());
@@ -173,10 +207,12 @@ public class TestSparkOrcReadMetadataColumns {
if (vectorized) {
builder =
- builder.createBatchedReaderFunc(
- readOrcSchema ->
- VectorizedSparkOrcReaders.buildReader(
- PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
+ builder
+ .recordsPerBatch(RECORDS_PER_BATCH)
+ .createBatchedReaderFunc(
+ readOrcSchema ->
+ VectorizedSparkOrcReaders.buildReader(
+ PROJECTION_SCHEMA, readOrcSchema,
ImmutableMap.of()));
} else {
builder =
builder.createReaderFunc(
@@ -192,7 +228,7 @@ public class TestSparkOrcReadMetadataColumns {
}
if (vectorized) {
- reader = batchesToRows(builder.build());
+ reader =
batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter));
} else {
reader = builder.build();
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index ccd783915c..e2e5a98ccb 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -26,7 +26,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
@@ -35,21 +34,16 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.deletes.DeleteCounter;
-import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.spark.source.BatchReaderUtil;
@@ -183,7 +177,8 @@ public class TestSparkParquetReadMetadataColumns {
Parquet.ReadBuilder builder =
Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
- DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
+ DeleteFilter<InternalRow> deleteFilter =
+ new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA,
PROJECTION_SCHEMA);
builder.createBatchedReaderFunc(
fileSchema ->
@@ -194,70 +189,6 @@ public class TestSparkParquetReadMetadataColumns {
validate(expectedRowsAfterDelete, builder, deleteFilter);
}
- private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
- private final boolean hasDeletes;
-
- protected TestDeleteFilter(boolean hasDeletes) {
- super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new
DeleteCounter(), true);
- this.hasDeletes = hasDeletes;
- }
-
- @Override
- protected StructLike asStructLike(InternalRow record) {
- return null;
- }
-
- @Override
- protected InputFile getInputFile(String location) {
- return null;
- }
-
- @Override
- public boolean hasPosDeletes() {
- return hasDeletes;
- }
-
- @Override
- public PositionDeleteIndex deletedRowPositions() {
- PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
- if (hasDeletes) {
- deletedRowPos.delete(98, 103);
- }
-
- return deletedRowPos;
- }
- }
-
- private static class CustomizedPositionDeleteIndex implements
PositionDeleteIndex {
- private final Set<Long> deleteIndex;
-
- private CustomizedPositionDeleteIndex() {
- deleteIndex = Sets.newHashSet();
- }
-
- @Override
- public void delete(long position) {
- deleteIndex.add(position);
- }
-
- @Override
- public void delete(long posStart, long posEnd) {
- for (long l = posStart; l < posEnd; l++) {
- delete(l);
- }
- }
-
- @Override
- public boolean isDeleted(long position) {
- return deleteIndex.contains(position);
- }
-
- @Override
- public boolean isEmpty() {
- return deleteIndex.isEmpty();
- }
- }
-
@TestTemplate
public void testReadRowNumbersWithFilter() throws IOException {
// current iceberg supports row group filter.
@@ -314,7 +245,10 @@ public class TestSparkParquetReadMetadataColumns {
builder = builder.split(splitStart, splitLength);
}
- validate(expected, builder, new TestDeleteFilter(false));
+ validate(
+ expected,
+ builder,
+ new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA,
PROJECTION_SCHEMA));
}
private void validate(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
index 5f68c233f6..4f32423988 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java
@@ -462,7 +462,9 @@ public class VectorizedSparkOrcReaders {
} else if (field.equals(MetadataColumns.ROW_POSITION)) {
fieldVectors.add(new RowPositionColumnVector(batchOffsetInFile));
} else if (field.equals(MetadataColumns.IS_DELETED)) {
- fieldVectors.add(new ConstantColumnVector(field.type(), batchSize,
false));
+ DeletedColumnVector deletedVector = new
DeletedColumnVector(field.type());
+ deletedVector.setValue(new boolean[batchSize]);
+ fieldVectors.add(deletedVector);
} else if (field.type().equals(Types.UnknownType.get())) {
fieldVectors.add(new ConstantColumnVector(field.type(), batchSize,
null));
} else {
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 82ebb1d950..dae8612f7d 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -52,10 +52,16 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
+import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Type;
@@ -876,4 +882,69 @@ public class TestHelpers {
public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
return
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
}
+
+ public static class CustomizedDeleteFilter extends DeleteFilter<InternalRow>
{
+ private final boolean hasDeletes;
+
+ protected CustomizedDeleteFilter(
+ boolean hasDeletes, Schema tableSchema, Schema projectedSchema) {
+ super("", List.of(), tableSchema, projectedSchema, new DeleteCounter(),
true);
+ this.hasDeletes = hasDeletes;
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow record) {
+ return null;
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return null;
+ }
+
+ @Override
+ public boolean hasPosDeletes() {
+ return hasDeletes;
+ }
+
+ @Override
+ public PositionDeleteIndex deletedRowPositions() {
+ PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+ if (hasDeletes) {
+ deletedRowPos.delete(98, 103);
+ }
+
+ return deletedRowPos;
+ }
+ }
+
+ public static class CustomizedPositionDeleteIndex implements
PositionDeleteIndex {
+ private final Set<Long> deleteIndex;
+
+ private CustomizedPositionDeleteIndex() {
+ deleteIndex = Sets.newHashSet();
+ }
+
+ @Override
+ public void delete(long position) {
+ deleteIndex.add(position);
+ }
+
+ @Override
+ public void delete(long posStart, long posEnd) {
+ for (long l = posStart; l < posEnd; l++) {
+ delete(l);
+ }
+ }
+
+ @Override
+ public boolean isDeleted(long position) {
+ return deleteIndex.contains(position);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return deleteIndex.isEmpty();
+ }
+ }
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
index 9d725250d3..13acaa1e3a 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkOrcReadMetadataColumns.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -36,6 +37,7 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -46,6 +48,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.orc.OrcConf;
@@ -74,7 +77,11 @@ public class TestSparkOrcReadMetadataColumns {
MetadataColumns.ROW_POSITION,
MetadataColumns.IS_DELETED);
+ private static final DeleteFilter<InternalRow> NO_DELETES_FILTER =
+ new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA,
PROJECTION_SCHEMA);
+
private static final int NUM_ROWS = 1000;
+ private static final int RECORDS_PER_BATCH = 10;
private static final List<InternalRow> DATA_ROWS;
private static final List<InternalRow> EXPECTED_ROWS;
@@ -128,13 +135,35 @@ public class TestSparkOrcReadMetadataColumns {
@TestTemplate
public void testReadRowNumbers() throws IOException {
- readAndValidate(null, null, null, EXPECTED_ROWS);
+ readAndValidate(null, null, null, EXPECTED_ROWS, NO_DELETES_FILTER);
+ }
+
+ @TestTemplate
+ public void testReadRowNumbersWithDelete() throws IOException {
+ assumeThat(vectorized).isTrue();
+
+ List<InternalRow> expectedRowsAfterDelete = Lists.newArrayList();
+ EXPECTED_ROWS.forEach(row -> expectedRowsAfterDelete.add(row.copy()));
+ // remove row at position 98, 99, 100, 101, 102, this crosses two row
groups [0, 100) and [100,
+ // 200)
+ for (int i = 98; i <= 102; i++) {
+ expectedRowsAfterDelete.get(i).update(3, true);
+ }
+
+ DeleteFilter<InternalRow> deleteFilter =
+ new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA,
PROJECTION_SCHEMA);
+
+ readAndValidate(null, null, null, expectedRowsAfterDelete, deleteFilter);
}
@TestTemplate
public void testReadRowNumbersWithFilter() throws IOException {
readAndValidate(
- Expressions.greaterThanOrEqual("id", 500), null, null,
EXPECTED_ROWS.subList(500, 1000));
+ Expressions.greaterThanOrEqual("id", 500),
+ null,
+ null,
+ EXPECTED_ROWS.subList(500, 1000),
+ NO_DELETES_FILTER);
}
@TestTemplate
@@ -157,12 +186,17 @@ public class TestSparkOrcReadMetadataColumns {
null,
splitOffsets.get(i),
splitLengths.get(i),
- EXPECTED_ROWS.subList(i * 100, (i + 1) * 100));
+ EXPECTED_ROWS.subList(i * 100, (i + 1) * 100),
+ NO_DELETES_FILTER);
}
}
private void readAndValidate(
- Expression filter, Long splitStart, Long splitLength, List<InternalRow>
expected)
+ Expression filter,
+ Long splitStart,
+ Long splitLength,
+ List<InternalRow> expected,
+ DeleteFilter<InternalRow> deleteFilter)
throws IOException {
Schema projectionWithoutMetadataFields =
TypeUtil.selectNot(PROJECTION_SCHEMA,
MetadataColumns.metadataFieldIds());
@@ -173,10 +207,12 @@ public class TestSparkOrcReadMetadataColumns {
if (vectorized) {
builder =
- builder.createBatchedReaderFunc(
- readOrcSchema ->
- VectorizedSparkOrcReaders.buildReader(
- PROJECTION_SCHEMA, readOrcSchema, ImmutableMap.of()));
+ builder
+ .recordsPerBatch(RECORDS_PER_BATCH)
+ .createBatchedReaderFunc(
+ readOrcSchema ->
+ VectorizedSparkOrcReaders.buildReader(
+ PROJECTION_SCHEMA, readOrcSchema,
ImmutableMap.of()));
} else {
builder =
builder.createReaderFunc(
@@ -192,7 +228,7 @@ public class TestSparkOrcReadMetadataColumns {
}
if (vectorized) {
- reader = batchesToRows(builder.build());
+ reader =
batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), deleteFilter));
} else {
reader = builder.build();
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index ccd783915c..e2e5a98ccb 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -26,7 +26,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Files;
@@ -35,21 +34,16 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.deletes.DeleteCounter;
-import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.spark.source.BatchReaderUtil;
@@ -183,7 +177,8 @@ public class TestSparkParquetReadMetadataColumns {
Parquet.ReadBuilder builder =
Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
- DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
+ DeleteFilter<InternalRow> deleteFilter =
+ new TestHelpers.CustomizedDeleteFilter(true, DATA_SCHEMA,
PROJECTION_SCHEMA);
builder.createBatchedReaderFunc(
fileSchema ->
@@ -194,70 +189,6 @@ public class TestSparkParquetReadMetadataColumns {
validate(expectedRowsAfterDelete, builder, deleteFilter);
}
- private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
- private final boolean hasDeletes;
-
- protected TestDeleteFilter(boolean hasDeletes) {
- super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new
DeleteCounter(), true);
- this.hasDeletes = hasDeletes;
- }
-
- @Override
- protected StructLike asStructLike(InternalRow record) {
- return null;
- }
-
- @Override
- protected InputFile getInputFile(String location) {
- return null;
- }
-
- @Override
- public boolean hasPosDeletes() {
- return hasDeletes;
- }
-
- @Override
- public PositionDeleteIndex deletedRowPositions() {
- PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
- if (hasDeletes) {
- deletedRowPos.delete(98, 103);
- }
-
- return deletedRowPos;
- }
- }
-
- private static class CustomizedPositionDeleteIndex implements
PositionDeleteIndex {
- private final Set<Long> deleteIndex;
-
- private CustomizedPositionDeleteIndex() {
- deleteIndex = Sets.newHashSet();
- }
-
- @Override
- public void delete(long position) {
- deleteIndex.add(position);
- }
-
- @Override
- public void delete(long posStart, long posEnd) {
- for (long l = posStart; l < posEnd; l++) {
- delete(l);
- }
- }
-
- @Override
- public boolean isDeleted(long position) {
- return deleteIndex.contains(position);
- }
-
- @Override
- public boolean isEmpty() {
- return deleteIndex.isEmpty();
- }
- }
-
@TestTemplate
public void testReadRowNumbersWithFilter() throws IOException {
// current iceberg supports row group filter.
@@ -314,7 +245,10 @@ public class TestSparkParquetReadMetadataColumns {
builder = builder.split(splitStart, splitLength);
}
- validate(expected, builder, new TestDeleteFilter(false));
+ validate(
+ expected,
+ builder,
+ new TestHelpers.CustomizedDeleteFilter(false, DATA_SCHEMA,
PROJECTION_SCHEMA));
}
private void validate(