This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bfe06a540d Spark: Move DeleteFiltering out from the vectorized reader
(#14652)
bfe06a540d is described below
commit bfe06a540d9d31d89b5161b20dcdcf6b1b6e2ef4
Author: pvary <[email protected]>
AuthorDate: Wed Dec 3 14:03:40 2025 +0100
Spark: Move DeleteFiltering out from the vectorized reader (#14652)
---
.baseline/checkstyle/checkstyle-suppressions.xml | 2 +
.../spark/data/vectorized/ColumnarBatchReader.java | 57 +-----------
.../data/vectorized/CometColumnarBatchReader.java | 64 +------------
.../data/vectorized/CometDeleteColumnReader.java | 27 +++---
...mnVector.java => CometDeletedColumnVector.java} | 41 ++++++++-
.../vectorized/CometVectorizedReaderBuilder.java | 13 +--
.../spark/data/vectorized/DeletedColumnVector.java | 3 +-
.../vectorized/UpdatableDeletedColumnVector.java | 23 +++++
.../vectorized/VectorizedSparkParquetReaders.java | 32 +------
.../iceberg/spark/source/BaseBatchReader.java | 101 ++++++++++++++++++---
.../iceberg/spark/source/BatchDataReader.java | 8 +-
.../data/TestSparkParquetReadMetadataColumns.java | 71 ++++++++++++---
...estParquetDictionaryEncodedVectorizedReads.java | 2 +-
.../parquet/TestParquetVectorizedReads.java | 5 +-
.../iceberg/spark/source/BatchReaderUtil.java | 34 +++++++
15 files changed, 273 insertions(+), 210 deletions(-)
diff --git a/.baseline/checkstyle/checkstyle-suppressions.xml
b/.baseline/checkstyle/checkstyle-suppressions.xml
index 1f180e40a1..1e79b1a7aa 100644
--- a/.baseline/checkstyle/checkstyle-suppressions.xml
+++ b/.baseline/checkstyle/checkstyle-suppressions.xml
@@ -55,6 +55,8 @@
<!-- Suppress checks for CometColumnReader -->
<suppress
files="org.apache.iceberg.spark.data.vectorized.CometColumnReader"
checks="IllegalImport"/>
+ <!-- Suppress checks for CometDeletedColumnVector -->
+ <suppress
files="org.apache.iceberg.spark.data.vectorized.CometDeletedColumnVector"
checks="IllegalImport"/>
<!-- Suppress TestClassNamingConvention for main source files -->
<suppress files=".*[/\\]src[/\\]main[/\\].*"
id="TestClassNamingConvention" />
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
index a0670c2bbe..38d505d250 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -22,15 +22,11 @@ import java.util.List;
import java.util.Map;
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
-import
org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.Pair;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -40,31 +36,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
* populated via delegated read calls to {@linkplain VectorizedArrowReader
VectorReader(s)}.
*/
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
- private final boolean hasIsDeletedColumn;
- private DeleteFilter<InternalRow> deletes = null;
- private long rowStartPosInBatch = 0;
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
- this.hasIsDeletedColumn =
- readers.stream().anyMatch(reader -> reader instanceof
DeletedVectorReader);
}
@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
super.setRowGroupInfo(pageStore, metaData);
- this.rowStartPosInBatch =
- pageStore
- .getRowIndexOffset()
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "PageReadStore does not contain row index offset"));
- }
-
- public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
- this.deletes = deleteFilter;
}
@Override
@@ -73,9 +53,7 @@ public class ColumnarBatchReader extends
BaseBatchReader<ColumnarBatch> {
closeVectors();
}
- ColumnarBatch columnarBatch = new
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
- rowStartPosInBatch += numRowsToRead;
- return columnarBatch;
+ return new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
}
private class ColumnBatchLoader {
@@ -89,43 +67,12 @@ public class ColumnarBatchReader extends
BaseBatchReader<ColumnarBatch> {
ColumnarBatch loadDataToColumnBatch() {
ColumnVector[] vectors = readDataToColumnVectors();
- int numLiveRows = batchSize;
-
- if (hasIsDeletedColumn) {
- boolean[] isDeleted = buildIsDeleted(vectors);
- for (ColumnVector vector : vectors) {
- if (vector instanceof DeletedColumnVector) {
- ((DeletedColumnVector) vector).setValue(isDeleted);
- }
- }
- } else {
- Pair<int[], Integer> pair = buildRowIdMapping(vectors);
- if (pair != null) {
- int[] rowIdMapping = pair.first();
- numLiveRows = pair.second();
- for (int i = 0; i < vectors.length; i++) {
- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
- }
- }
- }
-
- if (deletes != null && deletes.hasEqDeletes()) {
- vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
- }
ColumnarBatch batch = new ColumnarBatch(vectors);
- batch.setNumRows(numLiveRows);
+ batch.setNumRows(batchSize);
return batch;
}
- private boolean[] buildIsDeleted(ColumnVector[] vectors) {
- return ColumnarBatchUtil.buildIsDeleted(vectors, deletes,
rowStartPosInBatch, batchSize);
- }
-
- private Pair<int[], Integer> buildRowIdMapping(ColumnVector[] vectors) {
- return ColumnarBatchUtil.buildRowIdMapping(vectors, deletes,
rowStartPosInBatch, batchSize);
- }
-
ColumnVector[] readDataToColumnVectors() {
ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
index 04ac69476a..3d3e9aca24 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
@@ -25,15 +25,12 @@ import java.util.Map;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.BatchReader;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.util.Pair;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
@@ -46,7 +43,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
private final CometColumnReader[] readers;
- private final boolean hasIsDeletedColumn;
// The delegated BatchReader on the Comet side does the real work of loading
a batch of rows.
// The Comet BatchReader contains an array of ColumnReader. There is no need
to explicitly call
@@ -56,14 +52,10 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
// DeleteColumnReader.readBatch must be called explicitly later, after the
isDeleted value is
// available.
private final BatchReader delegate;
- private DeleteFilter<InternalRow> deletes = null;
- private long rowStartPosInBatch = 0;
CometColumnarBatchReader(List<VectorizedReader<?>> readers, Schema schema) {
this.readers =
readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new);
- this.hasIsDeletedColumn =
- readers.stream().anyMatch(reader -> reader instanceof
CometDeleteColumnReader);
AbstractColumnReader[] abstractColumnReaders = new
AbstractColumnReader[readers.size()];
this.delegate = new BatchReader(abstractColumnReaders);
@@ -89,25 +81,11 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
for (int i = 0; i < readers.length; i++) {
delegate.getColumnReaders()[i] = this.readers[i].delegate();
}
-
- this.rowStartPosInBatch =
- pageStore
- .getRowIndexOffset()
- .orElseThrow(
- () ->
- new IllegalArgumentException(
- "PageReadStore does not contain row index offset"));
- }
-
- public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
- this.deletes = deleteFilter;
}
@Override
public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
- ColumnarBatch columnarBatch = new
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
- rowStartPosInBatch += numRowsToRead;
- return columnarBatch;
+ return new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
}
@Override
@@ -139,39 +117,12 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
ColumnarBatch loadDataToColumnBatch() {
ColumnVector[] vectors = readDataToColumnVectors();
- int numLiveRows = batchSize;
-
- if (hasIsDeletedColumn) {
- boolean[] isDeleted = buildIsDeleted(vectors);
- readDeletedColumn(vectors, isDeleted);
- } else {
- Pair<int[], Integer> pair = buildRowIdMapping(vectors);
- if (pair != null) {
- int[] rowIdMapping = pair.first();
- numLiveRows = pair.second();
- for (int i = 0; i < vectors.length; i++) {
- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
- }
- }
- }
-
- if (deletes != null && deletes.hasEqDeletes()) {
- vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
- }
ColumnarBatch batch = new ColumnarBatch(vectors);
- batch.setNumRows(numLiveRows);
+ batch.setNumRows(batchSize);
return batch;
}
- private boolean[] buildIsDeleted(ColumnVector[] vectors) {
- return ColumnarBatchUtil.buildIsDeleted(vectors, deletes,
rowStartPosInBatch, batchSize);
- }
-
- private Pair<int[], Integer> buildRowIdMapping(ColumnVector[] vectors) {
- return ColumnarBatchUtil.buildRowIdMapping(vectors, deletes,
rowStartPosInBatch, batchSize);
- }
-
ColumnVector[] readDataToColumnVectors() {
ColumnVector[] columnVectors = new ColumnVector[readers.length];
// Fetch rows for all readers in the delegate
@@ -182,16 +133,5 @@ class CometColumnarBatchReader implements
VectorizedReader<ColumnarBatch> {
return columnVectors;
}
-
- void readDeletedColumn(ColumnVector[] columnVectors, boolean[] isDeleted) {
- for (int i = 0; i < readers.length; i++) {
- if (readers[i] instanceof CometDeleteColumnReader) {
- CometDeleteColumnReader deleteColumnReader = new
CometDeleteColumnReader<>(isDeleted);
- deleteColumnReader.setBatchSize(batchSize);
- deleteColumnReader.delegate().readBatch(batchSize);
- columnVectors[i] = deleteColumnReader.delegate().currentBatch();
- }
- }
- }
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
index 6235bfe486..26219014f7 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
import org.apache.comet.parquet.MetadataColumnReader;
import org.apache.comet.parquet.Native;
import org.apache.comet.parquet.TypeUtil;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.comet.vector.CometVector;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
@@ -33,11 +33,6 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
setDelegate(new DeleteColumnReader());
}
- CometDeleteColumnReader(boolean[] isDeleted) {
- super(MetadataColumns.IS_DELETED);
- setDelegate(new DeleteColumnReader(isDeleted));
- }
-
@Override
public void setBatchSize(int batchSize) {
super.setBatchSize(batchSize);
@@ -46,30 +41,34 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
}
private static class DeleteColumnReader extends MetadataColumnReader {
- private boolean[] isDeleted;
+ private final CometDeletedColumnVector deletedVector;
DeleteColumnReader() {
+ this(new boolean[0]);
+ }
+
+ DeleteColumnReader(boolean[] isDeleted) {
super(
DataTypes.BooleanType,
TypeUtil.convertToParquet(
new StructField("_deleted", DataTypes.BooleanType, false,
Metadata.empty())),
false /* useDecimal128 = false */,
false /* isConstant = false */);
- this.isDeleted = new boolean[0];
- }
-
- DeleteColumnReader(boolean[] isDeleted) {
- this();
- this.isDeleted = isDeleted;
+ this.deletedVector = new CometDeletedColumnVector(isDeleted);
}
@Override
public void readBatch(int total) {
Native.resetBatch(nativeHandle);
// set isDeleted on the native side to be consumed by native execution
- Native.setIsDeleted(nativeHandle, isDeleted);
+ Native.setIsDeleted(nativeHandle, deletedVector.isDeleted());
super.readBatch(total);
}
+
+ @Override
+ public CometVector currentBatch() {
+ return deletedVector;
+ }
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
similarity index 72%
copy from
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
copy to
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
index a453247068..5817f2c20a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
@@ -18,25 +18,58 @@
*/
package org.apache.iceberg.spark.data.vectorized;
+import org.apache.comet.shaded.arrow.vector.ValueVector;
+import org.apache.comet.vector.CometVector;
import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
-public class DeletedColumnVector extends ColumnVector {
+public class CometDeletedColumnVector extends CometVector implements
UpdatableDeletedColumnVector {
private boolean[] isDeleted;
- public DeletedColumnVector(Type type) {
- super(SparkSchemaUtil.convert(type));
+ public CometDeletedColumnVector(boolean[] isDeleted) {
+ super(SparkSchemaUtil.convert(Types.BooleanType.get()), false);
+ this.isDeleted = isDeleted;
}
+ @Override
public void setValue(boolean[] deleted) {
this.isDeleted = deleted;
}
+ boolean[] isDeleted() {
+ return isDeleted;
+ }
+
+ @Override
+ public void setNumNulls(int numNulls) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void setNumValues(int numValues) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public int numValues() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public ValueVector getValueVector() {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public CometVector slice(int offset, int length) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
@Override
public void close() {}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
index d36f1a7274..779dc240d4 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
@@ -24,7 +24,6 @@ import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -37,7 +36,6 @@ import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-import org.apache.spark.sql.catalyst.InternalRow;
class CometVectorizedReaderBuilder extends
TypeWithSchemaVisitor<VectorizedReader<?>> {
@@ -45,19 +43,16 @@ class CometVectorizedReaderBuilder extends
TypeWithSchemaVisitor<VectorizedReade
private final Schema icebergSchema;
private final Map<Integer, ?> idToConstant;
private final Function<List<VectorizedReader<?>>, VectorizedReader<?>>
readerFactory;
- private final DeleteFilter<InternalRow> deleteFilter;
CometVectorizedReaderBuilder(
Schema expectedSchema,
MessageType parquetSchema,
Map<Integer, ?> idToConstant,
- Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
- DeleteFilter<InternalRow> deleteFilter) {
+ Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory) {
this.parquetSchema = parquetSchema;
this.icebergSchema = expectedSchema;
this.idToConstant = idToConstant;
this.readerFactory = readerFactory;
- this.deleteFilter = deleteFilter;
}
@Override
@@ -107,11 +102,7 @@ class CometVectorizedReaderBuilder extends
TypeWithSchemaVisitor<VectorizedReade
}
protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>>
reorderedFields) {
- VectorizedReader<?> reader = readerFactory.apply(reorderedFields);
- if (deleteFilter != null) {
- ((CometColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
- }
- return reader;
+ return readerFactory.apply(reorderedFields);
}
@Override
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
index a453247068..fa3bcfdd00 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
@@ -26,13 +26,14 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.UTF8String;
-public class DeletedColumnVector extends ColumnVector {
+public class DeletedColumnVector extends ColumnVector implements
UpdatableDeletedColumnVector {
private boolean[] isDeleted;
public DeletedColumnVector(Type type) {
super(SparkSchemaUtil.convert(type));
}
+ @Override
public void setValue(boolean[] deleted) {
this.isDeleted = deleted;
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
new file mode 100644
index 0000000000..99bedc42bf
--- /dev/null
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+public interface UpdatableDeletedColumnVector {
+ void setValue(boolean[] isDeleted);
+}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index d95baa724b..8e25e81a05 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -26,12 +26,10 @@ import org.apache.arrow.vector.NullCheckingForGet;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
-import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.parquet.schema.MessageType;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +56,6 @@ public class VectorizedSparkParquetReaders {
Schema expectedSchema,
MessageType fileSchema,
Map<Integer, ?> idToConstant,
- DeleteFilter<InternalRow> deleteFilter,
BufferAllocator bufferAllocator) {
return (ColumnarBatchReader)
TypeWithSchemaVisitor.visit(
@@ -70,24 +67,16 @@ public class VectorizedSparkParquetReaders {
NullCheckingForGet.NULL_CHECKING_ENABLED,
idToConstant,
ColumnarBatchReader::new,
- deleteFilter,
bufferAllocator));
}
public static ColumnarBatchReader buildReader(
- Schema expectedSchema,
- MessageType fileSchema,
- Map<Integer, ?> idToConstant,
- DeleteFilter<InternalRow> deleteFilter) {
- return buildReader(
- expectedSchema, fileSchema, idToConstant, deleteFilter,
ArrowAllocation.rootAllocator());
+ Schema expectedSchema, MessageType fileSchema, Map<Integer, ?>
idToConstant) {
+ return buildReader(expectedSchema, fileSchema, idToConstant,
ArrowAllocation.rootAllocator());
}
public static CometColumnarBatchReader buildCometReader(
- Schema expectedSchema,
- MessageType fileSchema,
- Map<Integer, ?> idToConstant,
- DeleteFilter<InternalRow> deleteFilter) {
+ Schema expectedSchema, MessageType fileSchema, Map<Integer, ?>
idToConstant) {
return (CometColumnarBatchReader)
TypeWithSchemaVisitor.visit(
expectedSchema.asStruct(),
@@ -96,8 +85,7 @@ public class VectorizedSparkParquetReaders {
expectedSchema,
fileSchema,
idToConstant,
- readers -> new CometColumnarBatchReader(readers,
expectedSchema),
- deleteFilter));
+ readers -> new CometColumnarBatchReader(readers,
expectedSchema)));
}
// enables unsafe memory access to avoid costly checks to see if index is
within bounds
@@ -134,7 +122,6 @@ public class VectorizedSparkParquetReaders {
}
private static class ReaderBuilder extends VectorizedReaderBuilder {
- private final DeleteFilter<InternalRow> deleteFilter;
ReaderBuilder(
Schema expectedSchema,
@@ -142,7 +129,6 @@ public class VectorizedSparkParquetReaders {
boolean setArrowValidityVector,
Map<Integer, ?> idToConstant,
Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
- DeleteFilter<InternalRow> deleteFilter,
BufferAllocator bufferAllocator) {
super(
expectedSchema,
@@ -152,16 +138,6 @@ public class VectorizedSparkParquetReaders {
readerFactory,
SparkUtil::internalToSpark,
bufferAllocator);
- this.deleteFilter = deleteFilter;
- }
-
- @Override
- protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>>
reorderedFields) {
- VectorizedReader<?> reader = super.vectorizedReader(reorderedFields);
- if (deleteFilter != null) {
- ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
- }
- return reader;
}
}
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a0f45e7610..ff30f29aea 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -20,24 +20,33 @@ package org.apache.iceberg.spark.source;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nonnull;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.OrcBatchReadConf;
import org.apache.iceberg.spark.ParquetBatchReadConf;
import org.apache.iceberg.spark.ParquetReaderType;
+import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
+import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
+import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBatch, T> {
@@ -66,18 +75,23 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
long length,
Expression residual,
Map<Integer, ?> idToConstant,
- SparkDeleteFilter deleteFilter) {
+ @Nonnull SparkDeleteFilter deleteFilter) {
+ CloseableIterable<ColumnarBatch> iterable;
switch (format) {
case PARQUET:
- return newParquetIterable(inputFile, start, length, residual,
idToConstant, deleteFilter);
-
+ iterable =
+ newParquetIterable(
+ inputFile, start, length, residual, idToConstant,
deleteFilter.requiredSchema());
+ break;
case ORC:
- return newOrcIterable(inputFile, start, length, residual,
idToConstant);
-
+ iterable = newOrcIterable(inputFile, start, length, residual,
idToConstant);
+ break;
default:
throw new UnsupportedOperationException(
"Format: " + format + " not supported for batched reads");
}
+
+ return CloseableIterable.transform(iterable, new
BatchDeleteFilter(deleteFilter)::filterBatch);
}
private CloseableIterable<ColumnarBatch> newParquetIterable(
@@ -86,10 +100,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
long length,
Expression residual,
Map<Integer, ?> idToConstant,
- SparkDeleteFilter deleteFilter) {
- // get required schema if there are deletes
- Schema requiredSchema = deleteFilter != null ?
deleteFilter.requiredSchema() : expectedSchema();
-
+ Schema requiredSchema) {
return Parquet.read(inputFile)
.project(requiredSchema)
.split(start, length)
@@ -97,10 +108,10 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
fileSchema -> {
if (parquetConf.readerType() == ParquetReaderType.COMET) {
return VectorizedSparkParquetReaders.buildCometReader(
- requiredSchema, fileSchema, idToConstant, deleteFilter);
+ requiredSchema, fileSchema, idToConstant);
} else {
return VectorizedSparkParquetReaders.buildReader(
- requiredSchema, fileSchema, idToConstant, deleteFilter);
+ requiredSchema, fileSchema, idToConstant);
}
})
.recordsPerBatch(parquetConf.batchSize())
@@ -139,4 +150,72 @@ abstract class BaseBatchReader<T extends ScanTask> extends
BaseReader<ColumnarBa
.withNameMapping(nameMapping())
.build();
}
+
+ @VisibleForTesting
+ static class BatchDeleteFilter {
+ private final DeleteFilter<InternalRow> deletes;
+ private boolean hasIsDeletedColumn;
+ private int rowPositionColumnIndex = -1;
+
+ BatchDeleteFilter(DeleteFilter<InternalRow> deletes) {
+ this.deletes = deletes;
+
+ Schema schema = deletes.requiredSchema();
+ for (int i = 0; i < schema.columns().size(); i++) {
+ if (schema.columns().get(i).fieldId() ==
MetadataColumns.ROW_POSITION.fieldId()) {
+ this.rowPositionColumnIndex = i;
+ } else if (schema.columns().get(i).fieldId() ==
MetadataColumns.IS_DELETED.fieldId()) {
+ this.hasIsDeletedColumn = true;
+ }
+ }
+ }
+
+ ColumnarBatch filterBatch(ColumnarBatch batch) {
+ if (!needDeletes()) {
+ return batch;
+ }
+
+ ColumnVector[] vectors = new ColumnVector[batch.numCols()];
+ for (int i = 0; i < batch.numCols(); i++) {
+ vectors[i] = batch.column(i);
+ }
+
+ int numLiveRows = batch.numRows();
+ long rowStartPosInBatch =
+ rowPositionColumnIndex == -1 ? -1 :
vectors[rowPositionColumnIndex].getLong(0);
+
+ if (hasIsDeletedColumn) {
+ boolean[] isDeleted =
+ ColumnarBatchUtil.buildIsDeleted(vectors, deletes,
rowStartPosInBatch, numLiveRows);
+ for (ColumnVector vector : vectors) {
+ if (vector instanceof UpdatableDeletedColumnVector) {
+ ((UpdatableDeletedColumnVector) vector).setValue(isDeleted);
+ }
+ }
+ } else {
+ Pair<int[], Integer> pair =
+ ColumnarBatchUtil.buildRowIdMapping(vectors, deletes,
rowStartPosInBatch, numLiveRows);
+ if (pair != null) {
+ int[] rowIdMapping = pair.first();
+ numLiveRows = pair.second();
+ for (int i = 0; i < vectors.length; i++) {
+ vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
+ }
+ }
+ }
+
+ if (deletes != null && deletes.hasEqDeletes()) {
+ vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
+ }
+
+ ColumnarBatch output = new ColumnarBatch(vectors);
+ output.setNumRows(numLiveRows);
+ return output;
+ }
+
+ private boolean needDeletes() {
+ return hasIsDeletedColumn
+ || (deletes != null && (deletes.hasEqDeletes() ||
deletes.hasPosDeletes()));
+ }
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 20128ccef8..9ec0f88577 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -105,15 +105,13 @@ class BatchDataReader extends
BaseBatchReader<FileScanTask>
// update the current file for Spark's filename() function
InputFileBlockHolder.set(filePath, task.start(), task.length());
- Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
-
InputFile inputFile = getInputFile(filePath);
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated
with FileScanTask");
SparkDeleteFilter deleteFilter =
- task.deletes().isEmpty()
- ? null
- : new SparkDeleteFilter(filePath, task.deletes(), counter(),
false);
+ new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
+
+ Map<Integer, ?> idToConstant = constantsMap(task,
deleteFilter.requiredSchema());
return newBatchIterable(
inputFile,
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index 044ea3d93c..ccd783915c 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -21,8 +21,6 @@ package org.apache.iceberg.spark.data;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
@@ -37,12 +35,15 @@ import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -51,6 +52,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -181,22 +183,52 @@ public class TestSparkParquetReadMetadataColumns {
Parquet.ReadBuilder builder =
Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
- DeleteFilter deleteFilter = mock(DeleteFilter.class);
- when(deleteFilter.hasPosDeletes()).thenReturn(true);
- PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
- deletedRowPos.delete(98, 103);
- when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos);
+ DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
builder.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
- PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(),
deleteFilter));
+ PROJECTION_SCHEMA, fileSchema, Maps.newHashMap()));
builder.recordsPerBatch(RECORDS_PER_BATCH);
- validate(expectedRowsAfterDelete, builder);
+ validate(expectedRowsAfterDelete, builder, deleteFilter);
}
- private class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
+ private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
+ private final boolean hasDeletes;
+
+ protected TestDeleteFilter(boolean hasDeletes) {
+ super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new
DeleteCounter(), true);
+ this.hasDeletes = hasDeletes;
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow record) {
+ return null;
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return null;
+ }
+
+ @Override
+ public boolean hasPosDeletes() {
+ return hasDeletes;
+ }
+
+ @Override
+ public PositionDeleteIndex deletedRowPositions() {
+ PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+ if (hasDeletes) {
+ deletedRowPos.delete(98, 103);
+ }
+
+ return deletedRowPos;
+ }
+ }
+
+ private static class CustomizedPositionDeleteIndex implements
PositionDeleteIndex {
private final Set<Long> deleteIndex;
private CustomizedPositionDeleteIndex() {
@@ -266,7 +298,7 @@ public class TestSparkParquetReadMetadataColumns {
builder.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
- PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), null));
+ PROJECTION_SCHEMA, fileSchema, Maps.newHashMap()));
builder.recordsPerBatch(RECORDS_PER_BATCH);
} else {
builder =
@@ -282,13 +314,13 @@ public class TestSparkParquetReadMetadataColumns {
builder = builder.split(splitStart, splitLength);
}
- validate(expected, builder);
+ validate(expected, builder, new TestDeleteFilter(false));
}
- private void validate(List<InternalRow> expected, Parquet.ReadBuilder
builder)
+ private void validate(
+ List<InternalRow> expected, Parquet.ReadBuilder builder,
DeleteFilter<InternalRow> filter)
throws IOException {
- try (CloseableIterable<InternalRow> reader =
- vectorized ? batchesToRows(builder.build()) : builder.build()) {
+ try (CloseableIterable<InternalRow> reader = reader(builder, filter)) {
final Iterator<InternalRow> actualRows = reader.iterator();
for (InternalRow internalRow : expected) {
@@ -300,6 +332,15 @@ public class TestSparkParquetReadMetadataColumns {
}
}
+ private CloseableIterable<InternalRow> reader(
+ Parquet.ReadBuilder builder, DeleteFilter<InternalRow> filter) {
+ if (!vectorized) {
+ return builder.build();
+ } else {
+ return batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(),
filter));
+ }
+ }
+
private CloseableIterable<InternalRow>
batchesToRows(CloseableIterable<ColumnarBatch> batches) {
return CloseableIterable.combine(
Iterables.concat(Iterables.transform(batches, b ->
(Iterable<InternalRow>) b::rowIterator)),
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
index 973a17c9a3..284fa0b055 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
@@ -179,7 +179,7 @@ public class TestParquetDictionaryEncodedVectorizedReads
extends TestParquetVect
.createBatchedReaderFunc(
type ->
VectorizedSparkParquetReaders.buildReader(
- schema, type, ImmutableMap.of(), null,
allocator));
+ schema, type, ImmutableMap.of(), allocator));
try (CloseableIterable<ColumnarBatch> batchReader =
readBuilder.build()) {
Iterator<Row> expectedIter = expected.iterator();
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 2e46088cfe..46a6a302e1 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -277,7 +277,7 @@ public class TestParquetVectorizedReads extends
AvroDataTestBase {
.createBatchedReaderFunc(
type ->
VectorizedSparkParquetReaders.buildReader(
- schema, type, idToConstant, null, allocator));
+ schema, type, idToConstant, allocator));
if (reuseContainers) {
readBuilder.reuseContainers();
}
@@ -308,8 +308,7 @@ public class TestParquetVectorizedReads extends
AvroDataTestBase {
new Schema(required(1, "struct",
SUPPORTED_PRIMITIVES))),
new MessageType(
"struct", new GroupType(Type.Repetition.OPTIONAL,
"struct").withId(1)),
- Maps.newHashMap(),
- null))
+ Maps.newHashMap()))
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Vectorized reads are not supported yet for struct
fields");
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
new file mode 100644
index 0000000000..e5d03a4efb
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class BatchReaderUtil {
+ private BatchReaderUtil() {}
+
+ public static CloseableIterable<ColumnarBatch> applyDeleteFilter(
+ CloseableIterable<ColumnarBatch> batches, DeleteFilter<InternalRow>
filter) {
+ return CloseableIterable.transform(
+ batches, new BaseBatchReader.BatchDeleteFilter(filter)::filterBatch);
+ }
+}