This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bfe06a540d Spark: Move DeleteFiltering out from the vectorized reader 
(#14652)
bfe06a540d is described below

commit bfe06a540d9d31d89b5161b20dcdcf6b1b6e2ef4
Author: pvary <[email protected]>
AuthorDate: Wed Dec 3 14:03:40 2025 +0100

    Spark: Move DeleteFiltering out from the vectorized reader (#14652)
---
 .baseline/checkstyle/checkstyle-suppressions.xml   |   2 +
 .../spark/data/vectorized/ColumnarBatchReader.java |  57 +-----------
 .../data/vectorized/CometColumnarBatchReader.java  |  64 +------------
 .../data/vectorized/CometDeleteColumnReader.java   |  27 +++---
 ...mnVector.java => CometDeletedColumnVector.java} |  41 ++++++++-
 .../vectorized/CometVectorizedReaderBuilder.java   |  13 +--
 .../spark/data/vectorized/DeletedColumnVector.java |   3 +-
 .../vectorized/UpdatableDeletedColumnVector.java   |  23 +++++
 .../vectorized/VectorizedSparkParquetReaders.java  |  32 +------
 .../iceberg/spark/source/BaseBatchReader.java      | 101 ++++++++++++++++++---
 .../iceberg/spark/source/BatchDataReader.java      |   8 +-
 .../data/TestSparkParquetReadMetadataColumns.java  |  71 ++++++++++++---
 ...estParquetDictionaryEncodedVectorizedReads.java |   2 +-
 .../parquet/TestParquetVectorizedReads.java        |   5 +-
 .../iceberg/spark/source/BatchReaderUtil.java      |  34 +++++++
 15 files changed, 273 insertions(+), 210 deletions(-)

diff --git a/.baseline/checkstyle/checkstyle-suppressions.xml 
b/.baseline/checkstyle/checkstyle-suppressions.xml
index 1f180e40a1..1e79b1a7aa 100644
--- a/.baseline/checkstyle/checkstyle-suppressions.xml
+++ b/.baseline/checkstyle/checkstyle-suppressions.xml
@@ -55,6 +55,8 @@
 
     <!-- Suppress checks for CometColumnReader -->
     <suppress 
files="org.apache.iceberg.spark.data.vectorized.CometColumnReader" 
checks="IllegalImport"/>
+    <!-- Suppress checks for CometDeletedColumnVector -->
+    <suppress 
files="org.apache.iceberg.spark.data.vectorized.CometDeletedColumnVector" 
checks="IllegalImport"/>
 
     <!-- Suppress TestClassNamingConvention for main source files -->
     <suppress files=".*[/\\]src[/\\]main[/\\].*" 
id="TestClassNamingConvention" />
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
index a0670c2bbe..38d505d250 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
@@ -22,15 +22,11 @@ import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
 import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
-import 
org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.util.Pair;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -40,31 +36,15 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
  * populated via delegated read calls to {@linkplain VectorizedArrowReader 
VectorReader(s)}.
  */
 public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
-  private final boolean hasIsDeletedColumn;
-  private DeleteFilter<InternalRow> deletes = null;
-  private long rowStartPosInBatch = 0;
 
   public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
     super(readers);
-    this.hasIsDeletedColumn =
-        readers.stream().anyMatch(reader -> reader instanceof 
DeletedVectorReader);
   }
 
   @Override
   public void setRowGroupInfo(
       PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData) {
     super.setRowGroupInfo(pageStore, metaData);
-    this.rowStartPosInBatch =
-        pageStore
-            .getRowIndexOffset()
-            .orElseThrow(
-                () ->
-                    new IllegalArgumentException(
-                        "PageReadStore does not contain row index offset"));
-  }
-
-  public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
-    this.deletes = deleteFilter;
   }
 
   @Override
@@ -73,9 +53,7 @@ public class ColumnarBatchReader extends 
BaseBatchReader<ColumnarBatch> {
       closeVectors();
     }
 
-    ColumnarBatch columnarBatch = new 
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
-    rowStartPosInBatch += numRowsToRead;
-    return columnarBatch;
+    return new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
   }
 
   private class ColumnBatchLoader {
@@ -89,43 +67,12 @@ public class ColumnarBatchReader extends 
BaseBatchReader<ColumnarBatch> {
 
     ColumnarBatch loadDataToColumnBatch() {
       ColumnVector[] vectors = readDataToColumnVectors();
-      int numLiveRows = batchSize;
-
-      if (hasIsDeletedColumn) {
-        boolean[] isDeleted = buildIsDeleted(vectors);
-        for (ColumnVector vector : vectors) {
-          if (vector instanceof DeletedColumnVector) {
-            ((DeletedColumnVector) vector).setValue(isDeleted);
-          }
-        }
-      } else {
-        Pair<int[], Integer> pair = buildRowIdMapping(vectors);
-        if (pair != null) {
-          int[] rowIdMapping = pair.first();
-          numLiveRows = pair.second();
-          for (int i = 0; i < vectors.length; i++) {
-            vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
-          }
-        }
-      }
-
-      if (deletes != null && deletes.hasEqDeletes()) {
-        vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
-      }
 
       ColumnarBatch batch = new ColumnarBatch(vectors);
-      batch.setNumRows(numLiveRows);
+      batch.setNumRows(batchSize);
       return batch;
     }
 
-    private boolean[] buildIsDeleted(ColumnVector[] vectors) {
-      return ColumnarBatchUtil.buildIsDeleted(vectors, deletes, 
rowStartPosInBatch, batchSize);
-    }
-
-    private Pair<int[], Integer> buildRowIdMapping(ColumnVector[] vectors) {
-      return ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, 
rowStartPosInBatch, batchSize);
-    }
-
     ColumnVector[] readDataToColumnVectors() {
       ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
index 04ac69476a..3d3e9aca24 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
@@ -25,15 +25,12 @@ import java.util.Map;
 import org.apache.comet.parquet.AbstractColumnReader;
 import org.apache.comet.parquet.BatchReader;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.util.Pair;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -46,7 +43,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
 
   private final CometColumnReader[] readers;
-  private final boolean hasIsDeletedColumn;
 
   // The delegated BatchReader on the Comet side does the real work of loading 
a batch of rows.
   // The Comet BatchReader contains an array of ColumnReader. There is no need 
to explicitly call
@@ -56,14 +52,10 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
   // DeleteColumnReader.readBatch must be called explicitly later, after the 
isDeleted value is
   // available.
   private final BatchReader delegate;
-  private DeleteFilter<InternalRow> deletes = null;
-  private long rowStartPosInBatch = 0;
 
   CometColumnarBatchReader(List<VectorizedReader<?>> readers, Schema schema) {
     this.readers =
         
readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new);
-    this.hasIsDeletedColumn =
-        readers.stream().anyMatch(reader -> reader instanceof 
CometDeleteColumnReader);
 
     AbstractColumnReader[] abstractColumnReaders = new 
AbstractColumnReader[readers.size()];
     this.delegate = new BatchReader(abstractColumnReaders);
@@ -89,25 +81,11 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
     for (int i = 0; i < readers.length; i++) {
       delegate.getColumnReaders()[i] = this.readers[i].delegate();
     }
-
-    this.rowStartPosInBatch =
-        pageStore
-            .getRowIndexOffset()
-            .orElseThrow(
-                () ->
-                    new IllegalArgumentException(
-                        "PageReadStore does not contain row index offset"));
-  }
-
-  public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
-    this.deletes = deleteFilter;
   }
 
   @Override
   public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
-    ColumnarBatch columnarBatch = new 
ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
-    rowStartPosInBatch += numRowsToRead;
-    return columnarBatch;
+    return new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
   }
 
   @Override
@@ -139,39 +117,12 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
 
     ColumnarBatch loadDataToColumnBatch() {
       ColumnVector[] vectors = readDataToColumnVectors();
-      int numLiveRows = batchSize;
-
-      if (hasIsDeletedColumn) {
-        boolean[] isDeleted = buildIsDeleted(vectors);
-        readDeletedColumn(vectors, isDeleted);
-      } else {
-        Pair<int[], Integer> pair = buildRowIdMapping(vectors);
-        if (pair != null) {
-          int[] rowIdMapping = pair.first();
-          numLiveRows = pair.second();
-          for (int i = 0; i < vectors.length; i++) {
-            vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
-          }
-        }
-      }
-
-      if (deletes != null && deletes.hasEqDeletes()) {
-        vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
-      }
 
       ColumnarBatch batch = new ColumnarBatch(vectors);
-      batch.setNumRows(numLiveRows);
+      batch.setNumRows(batchSize);
       return batch;
     }
 
-    private boolean[] buildIsDeleted(ColumnVector[] vectors) {
-      return ColumnarBatchUtil.buildIsDeleted(vectors, deletes, 
rowStartPosInBatch, batchSize);
-    }
-
-    private Pair<int[], Integer> buildRowIdMapping(ColumnVector[] vectors) {
-      return ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, 
rowStartPosInBatch, batchSize);
-    }
-
     ColumnVector[] readDataToColumnVectors() {
       ColumnVector[] columnVectors = new ColumnVector[readers.length];
       // Fetch rows for all readers in the delegate
@@ -182,16 +133,5 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
 
       return columnVectors;
     }
-
-    void readDeletedColumn(ColumnVector[] columnVectors, boolean[] isDeleted) {
-      for (int i = 0; i < readers.length; i++) {
-        if (readers[i] instanceof CometDeleteColumnReader) {
-          CometDeleteColumnReader deleteColumnReader = new 
CometDeleteColumnReader<>(isDeleted);
-          deleteColumnReader.setBatchSize(batchSize);
-          deleteColumnReader.delegate().readBatch(batchSize);
-          columnVectors[i] = deleteColumnReader.delegate().currentBatch();
-        }
-      }
-    }
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
index 6235bfe486..26219014f7 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
@@ -21,7 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
 import org.apache.comet.parquet.MetadataColumnReader;
 import org.apache.comet.parquet.Native;
 import org.apache.comet.parquet.TypeUtil;
-import org.apache.iceberg.MetadataColumns;
+import org.apache.comet.vector.CometVector;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.Metadata;
@@ -33,11 +33,6 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
     setDelegate(new DeleteColumnReader());
   }
 
-  CometDeleteColumnReader(boolean[] isDeleted) {
-    super(MetadataColumns.IS_DELETED);
-    setDelegate(new DeleteColumnReader(isDeleted));
-  }
-
   @Override
   public void setBatchSize(int batchSize) {
     super.setBatchSize(batchSize);
@@ -46,30 +41,34 @@ class CometDeleteColumnReader<T> extends CometColumnReader {
   }
 
   private static class DeleteColumnReader extends MetadataColumnReader {
-    private boolean[] isDeleted;
+    private final CometDeletedColumnVector deletedVector;
 
     DeleteColumnReader() {
+      this(new boolean[0]);
+    }
+
+    DeleteColumnReader(boolean[] isDeleted) {
       super(
           DataTypes.BooleanType,
           TypeUtil.convertToParquet(
               new StructField("_deleted", DataTypes.BooleanType, false, 
Metadata.empty())),
           false /* useDecimal128 = false */,
           false /* isConstant = false */);
-      this.isDeleted = new boolean[0];
-    }
-
-    DeleteColumnReader(boolean[] isDeleted) {
-      this();
-      this.isDeleted = isDeleted;
+      this.deletedVector = new CometDeletedColumnVector(isDeleted);
     }
 
     @Override
     public void readBatch(int total) {
       Native.resetBatch(nativeHandle);
       // set isDeleted on the native side to be consumed by native execution
-      Native.setIsDeleted(nativeHandle, isDeleted);
+      Native.setIsDeleted(nativeHandle, deletedVector.isDeleted());
 
       super.readBatch(total);
     }
+
+    @Override
+    public CometVector currentBatch() {
+      return deletedVector;
+    }
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
similarity index 72%
copy from 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
copy to 
spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
index a453247068..5817f2c20a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeletedColumnVector.java
@@ -18,25 +18,58 @@
  */
 package org.apache.iceberg.spark.data.vectorized;
 
+import org.apache.comet.shaded.arrow.vector.ValueVector;
+import org.apache.comet.vector.CometVector;
 import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
 
-public class DeletedColumnVector extends ColumnVector {
+public class CometDeletedColumnVector extends CometVector implements 
UpdatableDeletedColumnVector {
   private boolean[] isDeleted;
 
-  public DeletedColumnVector(Type type) {
-    super(SparkSchemaUtil.convert(type));
+  public CometDeletedColumnVector(boolean[] isDeleted) {
+    super(SparkSchemaUtil.convert(Types.BooleanType.get()), false);
+    this.isDeleted = isDeleted;
   }
 
+  @Override
   public void setValue(boolean[] deleted) {
     this.isDeleted = deleted;
   }
 
+  boolean[] isDeleted() {
+    return isDeleted;
+  }
+
+  @Override
+  public void setNumNulls(int numNulls) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public void setNumValues(int numValues) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public int numValues() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public ValueVector getValueVector() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  @Override
+  public CometVector slice(int offset, int length) {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
   @Override
   public void close() {}
 
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
index d36f1a7274..779dc240d4 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
@@ -24,7 +24,6 @@ import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -37,7 +36,6 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
-import org.apache.spark.sql.catalyst.InternalRow;
 
 class CometVectorizedReaderBuilder extends 
TypeWithSchemaVisitor<VectorizedReader<?>> {
 
@@ -45,19 +43,16 @@ class CometVectorizedReaderBuilder extends 
TypeWithSchemaVisitor<VectorizedReade
   private final Schema icebergSchema;
   private final Map<Integer, ?> idToConstant;
   private final Function<List<VectorizedReader<?>>, VectorizedReader<?>> 
readerFactory;
-  private final DeleteFilter<InternalRow> deleteFilter;
 
   CometVectorizedReaderBuilder(
       Schema expectedSchema,
       MessageType parquetSchema,
       Map<Integer, ?> idToConstant,
-      Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
-      DeleteFilter<InternalRow> deleteFilter) {
+      Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory) {
     this.parquetSchema = parquetSchema;
     this.icebergSchema = expectedSchema;
     this.idToConstant = idToConstant;
     this.readerFactory = readerFactory;
-    this.deleteFilter = deleteFilter;
   }
 
   @Override
@@ -107,11 +102,7 @@ class CometVectorizedReaderBuilder extends 
TypeWithSchemaVisitor<VectorizedReade
   }
 
   protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> 
reorderedFields) {
-    VectorizedReader<?> reader = readerFactory.apply(reorderedFields);
-    if (deleteFilter != null) {
-      ((CometColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
-    }
-    return reader;
+    return readerFactory.apply(reorderedFields);
   }
 
   @Override
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
index a453247068..fa3bcfdd00 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java
@@ -26,13 +26,14 @@ import org.apache.spark.sql.vectorized.ColumnarArray;
 import org.apache.spark.sql.vectorized.ColumnarMap;
 import org.apache.spark.unsafe.types.UTF8String;
 
-public class DeletedColumnVector extends ColumnVector {
+public class DeletedColumnVector extends ColumnVector implements 
UpdatableDeletedColumnVector {
   private boolean[] isDeleted;
 
   public DeletedColumnVector(Type type) {
     super(SparkSchemaUtil.convert(type));
   }
 
+  @Override
   public void setValue(boolean[] deleted) {
     this.isDeleted = deleted;
   }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
new file mode 100644
index 0000000000..99bedc42bf
--- /dev/null
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/UpdatableDeletedColumnVector.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data.vectorized;
+
+public interface UpdatableDeletedColumnVector {
+  void setValue(boolean[] isDeleted);
+}
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
index d95baa724b..8e25e81a05 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
@@ -26,12 +26,10 @@ import org.apache.arrow.vector.NullCheckingForGet;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.arrow.ArrowAllocation;
 import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder;
-import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
 import org.apache.iceberg.parquet.VectorizedReader;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.parquet.schema.MessageType;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +56,6 @@ public class VectorizedSparkParquetReaders {
       Schema expectedSchema,
       MessageType fileSchema,
       Map<Integer, ?> idToConstant,
-      DeleteFilter<InternalRow> deleteFilter,
       BufferAllocator bufferAllocator) {
     return (ColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
@@ -70,24 +67,16 @@ public class VectorizedSparkParquetReaders {
                 NullCheckingForGet.NULL_CHECKING_ENABLED,
                 idToConstant,
                 ColumnarBatchReader::new,
-                deleteFilter,
                 bufferAllocator));
   }
 
   public static ColumnarBatchReader buildReader(
-      Schema expectedSchema,
-      MessageType fileSchema,
-      Map<Integer, ?> idToConstant,
-      DeleteFilter<InternalRow> deleteFilter) {
-    return buildReader(
-        expectedSchema, fileSchema, idToConstant, deleteFilter, 
ArrowAllocation.rootAllocator());
+      Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> 
idToConstant) {
+    return buildReader(expectedSchema, fileSchema, idToConstant, 
ArrowAllocation.rootAllocator());
   }
 
   public static CometColumnarBatchReader buildCometReader(
-      Schema expectedSchema,
-      MessageType fileSchema,
-      Map<Integer, ?> idToConstant,
-      DeleteFilter<InternalRow> deleteFilter) {
+      Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> 
idToConstant) {
     return (CometColumnarBatchReader)
         TypeWithSchemaVisitor.visit(
             expectedSchema.asStruct(),
@@ -96,8 +85,7 @@ public class VectorizedSparkParquetReaders {
                 expectedSchema,
                 fileSchema,
                 idToConstant,
-                readers -> new CometColumnarBatchReader(readers, 
expectedSchema),
-                deleteFilter));
+                readers -> new CometColumnarBatchReader(readers, 
expectedSchema)));
   }
 
   // enables unsafe memory access to avoid costly checks to see if index is 
within bounds
@@ -134,7 +122,6 @@ public class VectorizedSparkParquetReaders {
   }
 
   private static class ReaderBuilder extends VectorizedReaderBuilder {
-    private final DeleteFilter<InternalRow> deleteFilter;
 
     ReaderBuilder(
         Schema expectedSchema,
@@ -142,7 +129,6 @@ public class VectorizedSparkParquetReaders {
         boolean setArrowValidityVector,
         Map<Integer, ?> idToConstant,
         Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
-        DeleteFilter<InternalRow> deleteFilter,
         BufferAllocator bufferAllocator) {
       super(
           expectedSchema,
@@ -152,16 +138,6 @@ public class VectorizedSparkParquetReaders {
           readerFactory,
           SparkUtil::internalToSpark,
           bufferAllocator);
-      this.deleteFilter = deleteFilter;
-    }
-
-    @Override
-    protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>> 
reorderedFields) {
-      VectorizedReader<?> reader = super.vectorizedReader(reorderedFields);
-      if (deleteFilter != null) {
-        ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
-      }
-      return reader;
     }
   }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
index a0f45e7610..ff30f29aea 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
@@ -20,24 +20,33 @@ package org.apache.iceberg.spark.source;
 
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nonnull;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.OrcBatchReadConf;
 import org.apache.iceberg.spark.ParquetBatchReadConf;
 import org.apache.iceberg.spark.ParquetReaderType;
+import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
+import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
+import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBatch, T> {
@@ -66,18 +75,23 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       long length,
       Expression residual,
       Map<Integer, ?> idToConstant,
-      SparkDeleteFilter deleteFilter) {
+      @Nonnull SparkDeleteFilter deleteFilter) {
+    CloseableIterable<ColumnarBatch> iterable;
     switch (format) {
       case PARQUET:
-        return newParquetIterable(inputFile, start, length, residual, 
idToConstant, deleteFilter);
-
+        iterable =
+            newParquetIterable(
+                inputFile, start, length, residual, idToConstant, 
deleteFilter.requiredSchema());
+        break;
       case ORC:
-        return newOrcIterable(inputFile, start, length, residual, 
idToConstant);
-
+        iterable = newOrcIterable(inputFile, start, length, residual, 
idToConstant);
+        break;
       default:
         throw new UnsupportedOperationException(
             "Format: " + format + " not supported for batched reads");
     }
+
+    return CloseableIterable.transform(iterable, new 
BatchDeleteFilter(deleteFilter)::filterBatch);
   }
 
   private CloseableIterable<ColumnarBatch> newParquetIterable(
@@ -86,10 +100,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
       long length,
       Expression residual,
       Map<Integer, ?> idToConstant,
-      SparkDeleteFilter deleteFilter) {
-    // get required schema if there are deletes
-    Schema requiredSchema = deleteFilter != null ? 
deleteFilter.requiredSchema() : expectedSchema();
-
+      Schema requiredSchema) {
     return Parquet.read(inputFile)
         .project(requiredSchema)
         .split(start, length)
@@ -97,10 +108,10 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
             fileSchema -> {
               if (parquetConf.readerType() == ParquetReaderType.COMET) {
                 return VectorizedSparkParquetReaders.buildCometReader(
-                    requiredSchema, fileSchema, idToConstant, deleteFilter);
+                    requiredSchema, fileSchema, idToConstant);
               } else {
                 return VectorizedSparkParquetReaders.buildReader(
-                    requiredSchema, fileSchema, idToConstant, deleteFilter);
+                    requiredSchema, fileSchema, idToConstant);
               }
             })
         .recordsPerBatch(parquetConf.batchSize())
@@ -139,4 +150,72 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
         .withNameMapping(nameMapping())
         .build();
   }
+
+  @VisibleForTesting
+  static class BatchDeleteFilter {
+    private final DeleteFilter<InternalRow> deletes;
+    private boolean hasIsDeletedColumn;
+    private int rowPositionColumnIndex = -1;
+
+    BatchDeleteFilter(DeleteFilter<InternalRow> deletes) {
+      this.deletes = deletes;
+
+      Schema schema = deletes.requiredSchema();
+      for (int i = 0; i < schema.columns().size(); i++) {
+        if (schema.columns().get(i).fieldId() == 
MetadataColumns.ROW_POSITION.fieldId()) {
+          this.rowPositionColumnIndex = i;
+        } else if (schema.columns().get(i).fieldId() == 
MetadataColumns.IS_DELETED.fieldId()) {
+          this.hasIsDeletedColumn = true;
+        }
+      }
+    }
+
+    ColumnarBatch filterBatch(ColumnarBatch batch) {
+      if (!needDeletes()) {
+        return batch;
+      }
+
+      ColumnVector[] vectors = new ColumnVector[batch.numCols()];
+      for (int i = 0; i < batch.numCols(); i++) {
+        vectors[i] = batch.column(i);
+      }
+
+      int numLiveRows = batch.numRows();
+      long rowStartPosInBatch =
+          rowPositionColumnIndex == -1 ? -1 : 
vectors[rowPositionColumnIndex].getLong(0);
+
+      if (hasIsDeletedColumn) {
+        boolean[] isDeleted =
+            ColumnarBatchUtil.buildIsDeleted(vectors, deletes, 
rowStartPosInBatch, numLiveRows);
+        for (ColumnVector vector : vectors) {
+          if (vector instanceof UpdatableDeletedColumnVector) {
+            ((UpdatableDeletedColumnVector) vector).setValue(isDeleted);
+          }
+        }
+      } else {
+        Pair<int[], Integer> pair =
+            ColumnarBatchUtil.buildRowIdMapping(vectors, deletes, 
rowStartPosInBatch, numLiveRows);
+        if (pair != null) {
+          int[] rowIdMapping = pair.first();
+          numLiveRows = pair.second();
+          for (int i = 0; i < vectors.length; i++) {
+            vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
+          }
+        }
+      }
+
+      if (deletes != null && deletes.hasEqDeletes()) {
+        vectors = ColumnarBatchUtil.removeExtraColumns(deletes, vectors);
+      }
+
+      ColumnarBatch output = new ColumnarBatch(vectors);
+      output.setNumRows(numLiveRows);
+      return output;
+    }
+
+    private boolean needDeletes() {
+      return hasIsDeletedColumn
+          || (deletes != null && (deletes.hasEqDeletes() || 
deletes.hasPosDeletes()));
+    }
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 20128ccef8..9ec0f88577 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -105,15 +105,13 @@ class BatchDataReader extends 
BaseBatchReader<FileScanTask>
     // update the current file for Spark's filename() function
     InputFileBlockHolder.set(filePath, task.start(), task.length());
 
-    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
-
     InputFile inputFile = getInputFile(filePath);
     Preconditions.checkNotNull(inputFile, "Could not find InputFile associated 
with FileScanTask");
 
     SparkDeleteFilter deleteFilter =
-        task.deletes().isEmpty()
-            ? null
-            : new SparkDeleteFilter(filePath, task.deletes(), counter(), 
false);
+        new SparkDeleteFilter(filePath, task.deletes(), counter(), true);
+
+    Map<Integer, ?> idToConstant = constantsMap(task, 
deleteFilter.requiredSchema());
 
     return newBatchIterable(
             inputFile,
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
index 044ea3d93c..ccd783915c 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java
@@ -21,8 +21,6 @@ package org.apache.iceberg.spark.data;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,12 +35,15 @@ import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.deletes.DeleteCounter;
 import org.apache.iceberg.deletes.PositionDeleteIndex;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -51,6 +52,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
+import org.apache.iceberg.spark.source.BatchReaderUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -181,22 +183,52 @@ public class TestSparkParquetReadMetadataColumns {
     Parquet.ReadBuilder builder =
         Parquet.read(Files.localInput(testFile)).project(PROJECTION_SCHEMA);
 
-    DeleteFilter deleteFilter = mock(DeleteFilter.class);
-    when(deleteFilter.hasPosDeletes()).thenReturn(true);
-    PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
-    deletedRowPos.delete(98, 103);
-    when(deleteFilter.deletedRowPositions()).thenReturn(deletedRowPos);
+    DeleteFilter<InternalRow> deleteFilter = new TestDeleteFilter(true);
 
     builder.createBatchedReaderFunc(
         fileSchema ->
             VectorizedSparkParquetReaders.buildReader(
-                PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), 
deleteFilter));
+                PROJECTION_SCHEMA, fileSchema, Maps.newHashMap()));
     builder.recordsPerBatch(RECORDS_PER_BATCH);
 
-    validate(expectedRowsAfterDelete, builder);
+    validate(expectedRowsAfterDelete, builder, deleteFilter);
   }
 
-  private class CustomizedPositionDeleteIndex implements PositionDeleteIndex {
+  private static class TestDeleteFilter extends DeleteFilter<InternalRow> {
+    private final boolean hasDeletes;
+
+    protected TestDeleteFilter(boolean hasDeletes) {
+      super("", List.of(), DATA_SCHEMA, PROJECTION_SCHEMA, new 
DeleteCounter(), true);
+      this.hasDeletes = hasDeletes;
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow record) {
+      return null;
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return null;
+    }
+
+    @Override
+    public boolean hasPosDeletes() {
+      return hasDeletes;
+    }
+
+    @Override
+    public PositionDeleteIndex deletedRowPositions() {
+      PositionDeleteIndex deletedRowPos = new CustomizedPositionDeleteIndex();
+      if (hasDeletes) {
+        deletedRowPos.delete(98, 103);
+      }
+
+      return deletedRowPos;
+    }
+  }
+
+  private static class CustomizedPositionDeleteIndex implements 
PositionDeleteIndex {
     private final Set<Long> deleteIndex;
 
     private CustomizedPositionDeleteIndex() {
@@ -266,7 +298,7 @@ public class TestSparkParquetReadMetadataColumns {
       builder.createBatchedReaderFunc(
           fileSchema ->
               VectorizedSparkParquetReaders.buildReader(
-                  PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), null));
+                  PROJECTION_SCHEMA, fileSchema, Maps.newHashMap()));
       builder.recordsPerBatch(RECORDS_PER_BATCH);
     } else {
       builder =
@@ -282,13 +314,13 @@ public class TestSparkParquetReadMetadataColumns {
       builder = builder.split(splitStart, splitLength);
     }
 
-    validate(expected, builder);
+    validate(expected, builder, new TestDeleteFilter(false));
   }
 
-  private void validate(List<InternalRow> expected, Parquet.ReadBuilder 
builder)
+  private void validate(
+      List<InternalRow> expected, Parquet.ReadBuilder builder, 
DeleteFilter<InternalRow> filter)
       throws IOException {
-    try (CloseableIterable<InternalRow> reader =
-        vectorized ? batchesToRows(builder.build()) : builder.build()) {
+    try (CloseableIterable<InternalRow> reader = reader(builder, filter)) {
       final Iterator<InternalRow> actualRows = reader.iterator();
 
       for (InternalRow internalRow : expected) {
@@ -300,6 +332,15 @@ public class TestSparkParquetReadMetadataColumns {
     }
   }
 
+  private CloseableIterable<InternalRow> reader(
+      Parquet.ReadBuilder builder, DeleteFilter<InternalRow> filter) {
+    if (!vectorized) {
+      return builder.build();
+    } else {
+      return batchesToRows(BatchReaderUtil.applyDeleteFilter(builder.build(), 
filter));
+    }
+  }
+
   private CloseableIterable<InternalRow> 
batchesToRows(CloseableIterable<ColumnarBatch> batches) {
     return CloseableIterable.combine(
         Iterables.concat(Iterables.transform(batches, b -> 
(Iterable<InternalRow>) b::rowIterator)),
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
index 973a17c9a3..284fa0b055 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
@@ -179,7 +179,7 @@ public class TestParquetDictionaryEncodedVectorizedReads 
extends TestParquetVect
                   .createBatchedReaderFunc(
                       type ->
                           VectorizedSparkParquetReaders.buildReader(
-                              schema, type, ImmutableMap.of(), null, 
allocator));
+                              schema, type, ImmutableMap.of(), allocator));
 
           try (CloseableIterable<ColumnarBatch> batchReader = 
readBuilder.build()) {
             Iterator<Row> expectedIter = expected.iterator();
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 2e46088cfe..46a6a302e1 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -277,7 +277,7 @@ public class TestParquetVectorizedReads extends 
AvroDataTestBase {
                   .createBatchedReaderFunc(
                       type ->
                           VectorizedSparkParquetReaders.buildReader(
-                              schema, type, idToConstant, null, allocator));
+                              schema, type, idToConstant, allocator));
           if (reuseContainers) {
             readBuilder.reuseContainers();
           }
@@ -308,8 +308,7 @@ public class TestParquetVectorizedReads extends 
AvroDataTestBase {
                         new Schema(required(1, "struct", 
SUPPORTED_PRIMITIVES))),
                     new MessageType(
                         "struct", new GroupType(Type.Repetition.OPTIONAL, 
"struct").withId(1)),
-                    Maps.newHashMap(),
-                    null))
+                    Maps.newHashMap()))
         .isInstanceOf(UnsupportedOperationException.class)
         .hasMessage("Vectorized reads are not supported yet for struct 
fields");
   }
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
new file mode 100644
index 0000000000..e5d03a4efb
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/BatchReaderUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+public class BatchReaderUtil {
+  private BatchReaderUtil() {}
+
+  public static CloseableIterable<ColumnarBatch> applyDeleteFilter(
+      CloseableIterable<ColumnarBatch> batches, DeleteFilter<InternalRow> 
filter) {
+    return CloseableIterable.transform(
+        batches, new BaseBatchReader.BatchDeleteFilter(filter)::filterBatch);
+  }
+}


Reply via email to