This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new bb318a51a feat: [iceberg] delete rows support using selection vectors 
(#2346)
bb318a51a is described below

commit bb318a51ad355bdb13ae42336f100ff6c95b0cdb
Author: Parth Chandra <par...@apache.org>
AuthorDate: Tue Sep 9 12:08:30 2025 -0700

    feat: [iceberg] delete rows support using selection vectors (#2346)
---
 .../apache/comet/vector/CometSelectionVector.java  | 279 +++++++++++++++++++++
 .../org/apache/comet/vector/HasRowIdMapping.java   |  39 ---
 .../scala/org/apache/comet/vector/NativeUtil.scala |  55 ++++
 dev/diffs/iceberg/1.8.1.diff                       | 133 ++++++----
 native/core/src/execution/operators/scan.rs        | 176 +++++++++++--
 native/core/src/jvm_bridge/batch_iterator.rs       |  16 ++
 .../java/org/apache/comet/CometBatchIterator.java  |  46 ++++
 7 files changed, 625 insertions(+), 119 deletions(-)

diff --git 
a/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java 
b/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java
new file mode 100644
index 000000000..3c353976d
--- /dev/null
+++ b/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.vector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarArray;
+import org.apache.spark.sql.vectorized.ColumnarMap;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A zero-copy selection vector that extends CometVector. This implementation 
stores the original
+ * data vector and selection indices as separate CometVectors, providing zero 
copy access to the the
+ * underlying data.
+ *
+ * <p>If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and 
the selection indices
+ * are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent 
[v0, v1, v3, v4, v5,
+ * v7] without actually copying the data.
+ *
+ * <p>Most of the implementations of CometVector methods are implemented for 
completeness. We don't
+ * use this class except to transfer the original data and the selection 
indices to the native code.
+ */
+public class CometSelectionVector extends CometVector {
+  /** The original vector containing all values */
+  private final CometVector values;
+
+  /**
+   * The valid indices in the values vector. This array is converted into an 
Arrow vector so we can
+   * transfer the data to native in one JNI call. This is used to represent 
the rowid mapping used
+   * by Iceberg
+   */
+  private final int[] selectionIndices;
+
+  /**
+   * The indices vector containing selection indices. This is currently 
allocated by the JVM side
+   * unlike the values vector which is allocated on the native side
+   */
+  private final CometVector indices;
+
+  /**
+   * Number of selected elements. The indices array may have a length greater 
than this but only
+   * numValues elements in the array are valid
+   */
+  private final int numValues;
+
+  /**
+   * Creates a new selection vector from the given vector and indices.
+   *
+   * @param values The original vector to select from
+   * @param indices The indices to select from the original vector
+   * @param numValues The number of valid values in the indices array
+   * @throws IllegalArgumentException if any index is out of bounds
+   */
+  public CometSelectionVector(CometVector values, int[] indices, int 
numValues) {
+    // Use the values vector's datatype, useDecimal128, and dictionary provider
+    super(values.dataType(), values.useDecimal128);
+
+    this.values = values;
+    this.selectionIndices = indices;
+    this.numValues = numValues;
+
+    // Validate indices
+    int originalLength = values.numValues();
+    for (int idx : indices) {
+      if (idx < 0 || idx >= originalLength) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Index %d is out of bounds for vector of length %d", idx, 
originalLength));
+      }
+    }
+
+    // Create indices vector
+    BufferAllocator allocator = values.getValueVector().getAllocator();
+    IntVector indicesVector = new IntVector("selection_indices", allocator);
+    indicesVector.allocateNew(numValues);
+    for (int i = 0; i < numValues; i++) {
+      indicesVector.set(i, indices[i]);
+    }
+    indicesVector.setValueCount(numValues);
+
+    this.indices =
+        CometVector.getVector(indicesVector, values.useDecimal128, 
values.getDictionaryProvider());
+  }
+
+  /**
+   * Returns the index in the values vector for the given selection vector 
index.
+   *
+   * @param selectionIndex Index in the selection vector
+   * @return The corresponding index in the original vector
+   * @throws IndexOutOfBoundsException if selectionIndex is out of bounds
+   */
+  private int getValuesIndex(int selectionIndex) {
+    if (selectionIndex < 0 || selectionIndex >= numValues) {
+      throw new IndexOutOfBoundsException(
+          String.format(
+              "Selection index %d is out of bounds for selection vector of 
length %d",
+              selectionIndex, numValues));
+    }
+    return indices.getInt(selectionIndex);
+  }
+
+  /**
+   * Returns a reference to the values vector.
+   *
+   * @return The CometVector containing the values
+   */
+  public CometVector getValues() {
+    return values;
+  }
+
+  /**
+   * Returns the indices vector.
+   *
+   * @return The CometVector containing the indices
+   */
+  public CometVector getIndices() {
+    return indices;
+  }
+
+  /**
+   * Returns the selected indices.
+   *
+   * @return Array of selected indices
+   */
+  private int[] getSelectedIndices() {
+    return selectionIndices;
+  }
+
+  @Override
+  public int numValues() {
+    return numValues;
+  }
+
+  @Override
+  public void setNumValues(int numValues) {
+    // For selection vectors, we don't allow changing the number of values
+    // as it would break the mapping between selection indices and values
+    throw new UnsupportedOperationException("CometSelectionVector doesn't 
support setNumValues");
+  }
+
+  @Override
+  public void setNumNulls(int numNulls) {
+    // For selection vectors, null count should be delegated to the underlying 
values vector
+    // The selection doesn't change the null semantics
+    values.setNumNulls(numNulls);
+  }
+
+  @Override
+  public boolean hasNull() {
+    return values.hasNull();
+  }
+
+  @Override
+  public int numNulls() {
+    return values.numNulls();
+  }
+
+  // ColumnVector method implementations - delegate to original vector with 
index mapping
+  @Override
+  public boolean isNullAt(int rowId) {
+    return values.isNullAt(getValuesIndex(rowId));
+  }
+
+  @Override
+  public boolean getBoolean(int rowId) {
+    return values.getBoolean(getValuesIndex(rowId));
+  }
+
+  @Override
+  public byte getByte(int rowId) {
+    return values.getByte(getValuesIndex(rowId));
+  }
+
+  @Override
+  public short getShort(int rowId) {
+    return values.getShort(getValuesIndex(rowId));
+  }
+
+  @Override
+  public int getInt(int rowId) {
+    return values.getInt(getValuesIndex(rowId));
+  }
+
+  @Override
+  public long getLong(int rowId) {
+    return values.getLong(getValuesIndex(rowId));
+  }
+
+  @Override
+  public long getLongDecimal(int rowId) {
+    return values.getLongDecimal(getValuesIndex(rowId));
+  }
+
+  @Override
+  public float getFloat(int rowId) {
+    return values.getFloat(getValuesIndex(rowId));
+  }
+
+  @Override
+  public double getDouble(int rowId) {
+    return values.getDouble(getValuesIndex(rowId));
+  }
+
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    return values.getUTF8String(getValuesIndex(rowId));
+  }
+
+  @Override
+  public byte[] getBinary(int rowId) {
+    return values.getBinary(getValuesIndex(rowId));
+  }
+
+  @Override
+  public ColumnarArray getArray(int rowId) {
+    return values.getArray(getValuesIndex(rowId));
+  }
+
+  @Override
+  public ColumnarMap getMap(int rowId) {
+    return values.getMap(getValuesIndex(rowId));
+  }
+
+  @Override
+  public ColumnVector getChild(int ordinal) {
+    // Return the child from the original vector
+    return values.getChild(ordinal);
+  }
+
+  @Override
+  public DictionaryProvider getDictionaryProvider() {
+    return values.getDictionaryProvider();
+  }
+
+  @Override
+  public CometVector slice(int offset, int length) {
+    if (offset < 0 || length < 0 || offset + length > numValues) {
+      throw new IllegalArgumentException("Invalid slice parameters");
+    }
+    // Get the current indices and slice them
+    int[] currentIndices = getSelectedIndices();
+    int[] slicedIndices = new int[length];
+    // This is not a very efficient version of slicing, but that is
+    // not important because we are not likely to use it.
+    System.arraycopy(currentIndices, offset, slicedIndices, 0, length);
+    return new CometSelectionVector(values, slicedIndices, length);
+  }
+
+  @Override
+  public org.apache.arrow.vector.ValueVector getValueVector() {
+    return values.getValueVector();
+  }
+
+  @Override
+  public void close() {
+    // Close both the values and indices vectors
+    values.close();
+    indices.close();
+  }
+}
diff --git a/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java 
b/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java
deleted file mode 100644
index 8794902b4..000000000
--- a/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.vector;
-
-/**
- * An interface could be implemented by vectors that have row id mapping.
- *
- * <p>For example, Iceberg's DeleteFile has a row id mapping to map row id to 
position. This
- * interface is used to set and get the row id mapping. The row id mapping is 
an array of integers,
- * where the index is the row id and the value is the position. Here is an 
example:
- * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array Position 
delete 2, 6
- * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]
- */
-public interface HasRowIdMapping {
-  default void setRowIdMapping(int[] rowIdMapping) {
-    throw new UnsupportedOperationException("setRowIdMapping is not 
supported");
-  }
-
-  default int[] getRowIdMapping() {
-    throw new UnsupportedOperationException("getRowIdMapping is not 
supported");
-  }
-}
diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala 
b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
index fba4e29e5..d5d6ded55 100644
--- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
+++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
@@ -96,6 +96,30 @@ class NativeUtil {
 
     (0 until batch.numCols()).foreach { index =>
       batch.column(index) match {
+        case selectionVector: CometSelectionVector =>
+          // For CometSelectionVector, export only the values vector
+          val valuesVector = selectionVector.getValues
+          val valueVector = valuesVector.getValueVector
+
+          // Use the selection vector's logical row count
+          numRows += selectionVector.numValues()
+
+          val provider = if (valueVector.getField.getDictionary != null) {
+            valuesVector.getDictionaryProvider
+          } else {
+            null
+          }
+
+          // The array and schema structures are allocated by native side.
+          // Don't need to deallocate them here.
+          val arrowSchema = ArrowSchema.wrap(schemaAddrs(index))
+          val arrowArray = ArrowArray.wrap(arrayAddrs(index))
+          Data.exportVector(
+            allocator,
+            getFieldVector(valueVector, "export"),
+            provider,
+            arrowArray,
+            arrowSchema)
         case a: CometVector =>
           val valueVector = a.getValueVector
 
@@ -133,9 +157,40 @@ class NativeUtil {
     // the Arrow arrays. For example, Iceberg column reader will skip deleted 
rows internally in
     // its `CometVector` implementation. The `ColumnarBatch` returned by the 
reader will report
     // logical number of rows which is less than actual number of rows due to 
row deletion.
+    // Similarly, CometSelectionVector represents a different number of 
logical rows than the
+    // underlying vector.
     numRows.headOption.getOrElse(batch.numRows())
   }
 
+  /**
+   * Exports a single CometVector to native side.
+   *
+   * @param vector
+   *   The CometVector to export
+   * @param arrayAddr
+   *   The address of the ArrowArray structure
+   * @param schemaAddr
+   *   The address of the ArrowSchema structure
+   */
+  def exportSingleVector(vector: CometVector, arrayAddr: Long, schemaAddr: 
Long): Unit = {
+    val valueVector = vector.getValueVector
+
+    val provider = if (valueVector.getField.getDictionary != null) {
+      vector.getDictionaryProvider
+    } else {
+      null
+    }
+
+    val arrowSchema = ArrowSchema.wrap(schemaAddr)
+    val arrowArray = ArrowArray.wrap(arrayAddr)
+    Data.exportVector(
+      allocator,
+      getFieldVector(valueVector, "export"),
+      provider,
+      arrowArray,
+      arrowSchema)
+  }
+
   /**
    * Gets the next batch from native execution.
    *
diff --git a/dev/diffs/iceberg/1.8.1.diff b/dev/diffs/iceberg/1.8.1.diff
index 2f80453de..675208f5f 100644
--- a/dev/diffs/iceberg/1.8.1.diff
+++ b/dev/diffs/iceberg/1.8.1.diff
@@ -1,5 +1,5 @@
 diff --git a/build.gradle b/build.gradle
-index 7327b38..7967109 100644
+index 7327b38905d..7967109f039 100644
 --- a/build.gradle
 +++ b/build.gradle
 @@ -780,6 +780,13 @@ project(':iceberg-parquet') {
@@ -17,7 +17,7 @@ index 7327b38..7967109 100644
        exclude group: 'org.apache.avro', module: 'avro'
        // already shaded by Parquet
 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
-index 04ffa8f..cc0099c 100644
+index 04ffa8f4edc..cc0099ccc93 100644
 --- a/gradle/libs.versions.toml
 +++ b/gradle/libs.versions.toml
 @@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31"
@@ -39,7 +39,7 @@ index 04ffa8f..cc0099c 100644
  tez010 = "0.10.4"
 diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
 new file mode 100644
-index 0000000..ddf6c7d
+index 00000000000..ddf6c7de5ae
 --- /dev/null
 +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
 @@ -0,0 +1,255 @@
@@ -300,7 +300,7 @@ index 0000000..ddf6c7d
 +}
 diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
 new file mode 100644
-index 0000000..88b195b
+index 00000000000..88b195b76a2
 --- /dev/null
 +++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
 @@ -0,0 +1,255 @@
@@ -560,7 +560,7 @@ index 0000000..88b195b
 +  }
 +}
 diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
-index 2c37a52..3442cfc 100644
+index 2c37a52449e..3442cfc4375 100644
 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
 @@ -1075,6 +1075,7 @@ public class Parquet {
@@ -639,7 +639,7 @@ index 2c37a52..3442cfc 100644
            return new org.apache.iceberg.parquet.ParquetReader<>(
                file, schema, options, readerFunc, mapping, filter, 
reuseContainers, caseSensitive);
 diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
-index 1fb2372..142e5fb 100644
+index 1fb2372ba56..142e5fbadf1 100644
 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java
 @@ -157,6 +157,14 @@ class ReadConf<T> {
@@ -658,7 +658,7 @@ index 1fb2372..142e5fb 100644
      return model;
    }
 diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
-index e2d2c7a..f64232d 100644
+index e2d2c7a7ac0..f64232dc57f 100644
 --- a/spark/v3.5/build.gradle
 +++ b/spark/v3.5/build.gradle
 @@ -75,7 +75,7 @@ 
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
@@ -699,7 +699,7 @@ index e2d2c7a..f64232d 100644
      relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
      relocate 'org.apache.hc.client5', 
'org.apache.iceberg.shaded.org.apache.hc.client5'
 diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
-index 578845e..0118b30 100644
+index 578845e3da2..0118b30683d 100644
 --- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
 +++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java
 @@ -57,6 +57,16 @@ public abstract class ExtensionsTestBase extends 
CatalogTestBase {
@@ -720,7 +720,7 @@ index 578845e..0118b30 100644
              .getOrCreate();
  
 diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
-index ade19de..150a2cd 100644
+index ade19de36fe..150a2cddbc8 100644
 --- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
 +++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java
 @@ -56,6 +56,16 @@ public class TestCallStatementParser {
@@ -741,7 +741,7 @@ index ade19de..150a2cd 100644
      TestCallStatementParser.parser = spark.sessionState().sqlParser();
    }
 diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
-index 64edb10..5bb449f 100644
+index 64edb1002e9..5bb449f1ac7 100644
 --- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
 +++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java
 @@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark {
@@ -762,7 +762,7 @@ index 64edb10..5bb449f 100644
      spark = builder.getOrCreate();
    }
 diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
-index a5d0456..4af408f 100644
+index a5d0456b0b2..4af408f4861 100644
 --- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 +++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java
 @@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark {
@@ -783,7 +783,7 @@ index a5d0456..4af408f 100644
      spark = builder.getOrCreate();
      Configuration sparkHadoopConf = spark.sessionState().newHadoopConf();
 diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
-index c6794e4..f735919 100644
+index c6794e43c63..f7359197407 100644
 --- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
 +++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java
 @@ -239,6 +239,16 @@ public class DVReaderBenchmark {
@@ -804,7 +804,7 @@ index c6794e4..f735919 100644
              .getOrCreate();
    }
 diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
-index ac74fb5..e011b8b 100644
+index ac74fb5a109..e011b8b2510 100644
 --- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
 +++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java
 @@ -223,6 +223,16 @@ public class DVWriterBenchmark {
@@ -825,7 +825,7 @@ index ac74fb5..e011b8b 100644
              .getOrCreate();
    }
 diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
-index 68c537e..f66be2f 100644
+index 68c537e34a4..f66be2f3896 100644
 --- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
 +++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
 @@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark {
@@ -850,7 +850,7 @@ index 68c537e..f66be2f 100644
        builder
            .config("parquet.dictionary.page.size", "1")
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
-index 4794863..8bb508f 100644
+index 4794863ab1b..8bb508f19f8 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java
 @@ -20,21 +20,25 @@ package org.apache.iceberg.spark.data.vectorized;
@@ -953,20 +953,24 @@ index 4794863..8bb508f 100644
  
    @Override
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
-index 1440e5d..fc6b283 100644
+index 1440e5d1d3f..85cca62e90f 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
-@@ -23,7 +23,8 @@ import java.io.UncheckedIOException;
+@@ -22,8 +22,12 @@ import java.io.IOException;
+ import java.io.UncheckedIOException;
  import java.util.List;
  import java.util.Map;
++import org.apache.comet.CometRuntimeException;
  import org.apache.comet.parquet.AbstractColumnReader;
 -import org.apache.comet.parquet.BatchReader;
 +import org.apache.comet.parquet.IcebergCometBatchReader;
 +import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.vector.CometSelectionVector;
++import org.apache.comet.vector.CometVector;
  import org.apache.iceberg.Schema;
  import org.apache.iceberg.data.DeleteFilter;
  import org.apache.iceberg.parquet.VectorizedReader;
-@@ -55,7 +56,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
    // calling BatchReader.nextBatch, the isDeleted value is not yet available, 
so
    // DeleteColumnReader.readBatch must be called explicitly later, after the 
isDeleted value is
    // available.
@@ -975,7 +979,7 @@ index 1440e5d..fc6b283 100644
    private DeleteFilter<InternalRow> deletes = null;
    private long rowStartPosInBatch = 0;
  
-@@ -65,9 +66,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
      this.hasIsDeletedColumn =
          readers.stream().anyMatch(reader -> reader instanceof 
CometDeleteColumnReader);
  
@@ -986,7 +990,7 @@ index 1440e5d..fc6b283 100644
    }
  
    @Override
-@@ -85,19 +84,22 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+@@ -85,19 +87,22 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
              && !(readers[i] instanceof CometPositionColumnReader)
              && !(readers[i] instanceof CometDeleteColumnReader)) {
            readers[i].reset();
@@ -1012,8 +1016,29 @@ index 1440e5d..fc6b283 100644
              .getRowIndexOffset()
              .orElseThrow(
                  () ->
+@@ -154,9 +159,17 @@ class CometColumnarBatchReader implements 
VectorizedReader<ColumnarBatch> {
+         Pair<int[], Integer> pair = buildRowIdMapping(vectors);
+         if (pair != null) {
+           int[] rowIdMapping = pair.first();
+-          numLiveRows = pair.second();
+-          for (int i = 0; i < vectors.length; i++) {
+-            vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
++          if (pair.second() != null) {
++            numLiveRows = pair.second();
++            for (int i = 0; i < vectors.length; i++) {
++              if (vectors[i] instanceof CometVector) {
++                vectors[i] =
++                    new CometSelectionVector((CometVector) vectors[i], 
rowIdMapping, numLiveRows);
++              } else {
++                throw new CometRuntimeException(
++                    "Unsupported column vector type: " + 
vectors[i].getClass());
++              }
++            }
+           }
+         }
+       }
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
-index 047c963..88d691a 100644
+index 047c96314b1..88d691a607a 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
 @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
@@ -1038,7 +1063,7 @@ index 047c963..88d691a 100644
  
    @Override
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
-index 6235bfe..cba108e 100644
+index 6235bfe4865..cba108e4326 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
 @@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader 
{
@@ -1055,7 +1080,7 @@ index 6235bfe..cba108e 100644
      }
  
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
-index bcc0e51..98e8006 100644
+index bcc0e514c28..98e80068c51 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
 @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
@@ -1076,7 +1101,7 @@ index bcc0e51..98e8006 100644
            false /* isConstant = false */);
      }
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
-index d36f1a7..56f8c9b 100644
+index d36f1a72747..56f8c9bff93 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
 @@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends 
TypeWithSchemaVisitor<VectorizedReade
@@ -1089,7 +1114,7 @@ index d36f1a7..56f8c9b 100644
    }
  }
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
-index 780e175..57892ac 100644
+index 780e1750a52..57892ac4c59 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java
 @@ -109,6 +109,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends 
BaseReader<ColumnarBa
@@ -1101,7 +1126,7 @@ index 780e175..57892ac 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
-index 11f054b..b37dd33 100644
+index 11f054b1171..f443042ba48 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
 @@ -172,17 +172,32 @@ class SparkBatch implements Batch {
@@ -1129,7 +1154,7 @@ index 11f054b..b37dd33 100644
 +    } else if (task.isFileScanTask() && !task.isDataTask()) {
 +      FileScanTask fileScanTask = task.asFileScanTask();
 +      // Comet can't handle delete files for now
-+      return fileScanTask.file().format() == FileFormat.PARQUET && 
fileScanTask.deletes().isEmpty();
++      return fileScanTask.file().format() == FileFormat.PARQUET;
 +
 +    } else {
 +      return false;
@@ -1140,7 +1165,7 @@ index 11f054b..b37dd33 100644
    // - ORC vectorization is enabled
    // - all tasks are of type FileScanTask and read only ORC files with no 
delete files
 diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
-index 019f391..656e060 100644
+index 019f3919dc5..656e0600ac0 100644
 --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
 +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
 @@ -23,6 +23,7 @@ import java.util.List;
@@ -1172,7 +1197,7 @@ index 019f391..656e060 100644
 +  }
  }
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
-index 404ba72..19d0cd5 100644
+index 404ba728460..19d0cd5c827 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
 @@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
@@ -1193,7 +1218,7 @@ index 404ba72..19d0cd5 100644
    }
  }
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
-index 659507e..eb9cedc 100644
+index 659507e4c5e..eb9cedc34c5 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
 @@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
@@ -1214,7 +1239,7 @@ index 659507e..eb9cedc 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
-index a218f96..395c024 100644
+index a218f965ea6..395c02441e7 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
 @@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
@@ -1235,7 +1260,7 @@ index a218f96..395c024 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
-index 2665d7b..306e859 100644
+index 2665d7ba8d3..306e859ce1a 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
 @@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
@@ -1256,7 +1281,7 @@ index 2665d7b..306e859 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
-index de68351..3ab0d3d 100644
+index de68351f6e3..3ab0d3db477 100644
 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
 +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
 @@ -77,6 +77,17 @@ public abstract class TestBase extends SparkTestHelperBase {
@@ -1278,7 +1303,7 @@ index de68351..3ab0d3d 100644
              .getOrCreate();
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
-index bc4e722..1a40d8e 100644
+index bc4e722bc86..1a40d8edcb3 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
 @@ -59,7 +59,20 @@ public class TestParquetDictionaryEncodedVectorizedReads 
extends TestParquetVect
@@ -1304,7 +1329,7 @@ index bc4e722..1a40d8e 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
-index 3a26974..59592eb 100644
+index 3a269740b70..59592eb2ad7 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
 @@ -54,7 +54,20 @@ public abstract class ScanTestBase extends AvroDataTest {
@@ -1330,7 +1355,7 @@ index 3a26974..59592eb 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
-index f411920..367c2e2 100644
+index f411920a5dc..367c2e2cb45 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
 @@ -144,7 +144,20 @@ public class TestCompressionSettings extends 
CatalogTestBase {
@@ -1356,7 +1381,7 @@ index f411920..367c2e2 100644
  
    @BeforeEach
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
-index c4ba96e..a0e77db 100644
+index c4ba96e6340..a0e77db99b9 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
 @@ -75,7 +75,20 @@ public class TestDataSourceOptions extends 
TestBaseWithCatalog {
@@ -1382,7 +1407,7 @@ index c4ba96e..a0e77db 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
-index 3481735..bdd3152 100644
+index 348173596e4..bdd31528d3f 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 @@ -110,7 +110,20 @@ public class TestFilteredScan {
@@ -1408,7 +1433,7 @@ index 3481735..bdd3152 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
-index 84c99a5..6f4b2af 100644
+index 84c99a575c8..6f4b2af003b 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
 @@ -93,7 +93,20 @@ public class TestForwardCompatibility {
@@ -1434,7 +1459,7 @@ index 84c99a5..6f4b2af 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
-index 7eff93d..1774acd 100644
+index 7eff93d204e..1774acd056b 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
 @@ -46,7 +46,20 @@ public class TestIcebergSpark {
@@ -1460,7 +1485,7 @@ index 7eff93d..1774acd 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
-index 9464f68..b5dbb88 100644
+index 9464f687b0e..b5dbb88dd5a 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
 @@ -112,7 +112,20 @@ public class TestPartitionPruning {
@@ -1486,7 +1511,7 @@ index 9464f68..b5dbb88 100644
  
      String optionKey = String.format("fs.%s.impl", 
CountOpenLocalFileSystem.scheme);
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
-index 5c218f2..e48e6d1 100644
+index 5c218f21c47..e48e6d121f0 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 @@ -107,7 +107,20 @@ public class TestPartitionValues {
@@ -1512,7 +1537,7 @@ index 5c218f2..e48e6d1 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
-index a7334a5..dca7fdd 100644
+index a7334a580ca..dca7fdd9c69 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
 @@ -87,7 +87,20 @@ public class TestSnapshotSelection {
@@ -1538,7 +1563,7 @@ index a7334a5..dca7fdd 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
-index 182b1ef..5da4a3c 100644
+index 182b1ef8f5a..5da4a3c90aa 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
 @@ -120,7 +120,20 @@ public class TestSparkDataFile {
@@ -1564,7 +1589,7 @@ index 182b1ef..5da4a3c 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
-index fb2b312..5927a40 100644
+index fb2b312bed9..5927a408c6b 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
 @@ -96,7 +96,20 @@ public class TestSparkDataWrite {
@@ -1671,7 +1696,7 @@ index fb2b312..5927a40 100644
      assertThat(actual)
          .describedAs("Result rows should match")
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
-index becf6a0..beb7c80 100644
+index becf6a064dc..beb7c801b05 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
 @@ -83,7 +83,20 @@ public class TestSparkReadProjection extends 
TestReadProjection {
@@ -1697,7 +1722,7 @@ index becf6a0..beb7c80 100644
          ImmutableMap.of(
              "type", "hive",
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
-index 4f1cef5..0e34e08 100644
+index 4f1cef5d373..0e34e08db4a 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 @@ -136,6 +136,16 @@ public class TestSparkReaderDeletes extends 
DeleteReadTests {
@@ -1718,7 +1743,7 @@ index 4f1cef5..0e34e08 100644
              .getOrCreate();
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
-index baf7fa8..509c5de 100644
+index baf7fa8f88a..509c5deba51 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
 @@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
@@ -1739,7 +1764,7 @@ index baf7fa8..509c5de 100644
              .getOrCreate();
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
-index 17db46b..d68b638 100644
+index 17db46b85c3..d68b638ce09 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
 @@ -65,6 +65,16 @@ public class TestStructuredStreaming {
@@ -1760,7 +1785,7 @@ index 17db46b..d68b638 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
-index 306444b..0ed52fd 100644
+index 306444b9f29..0ed52fd6cfb 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
 @@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
@@ -1786,7 +1811,7 @@ index 306444b..0ed52fd 100644
  
    @AfterAll
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
-index 841268a..e290c24 100644
+index 841268a6be0..e290c246f7b 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
 @@ -80,7 +80,20 @@ public class TestWriteMetricsConfig {
@@ -1812,7 +1837,7 @@ index 841268a..e290c24 100644
    }
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
-index 6e09252..438bc4d 100644
+index 6e09252704a..438bc4da575 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
 @@ -60,6 +60,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
@@ -1833,7 +1858,7 @@ index 6e09252..438bc4d 100644
              .getOrCreate();
  
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
-index 9d2ce2b..5e23368 100644
+index 9d2ce2b388a..5e233688488 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
 @@ -598,9 +598,7 @@ public class TestFilterPushDown extends 
TestBaseWithCatalog {
@@ -1848,7 +1873,7 @@ index 9d2ce2b..5e23368 100644
        assertThat(planAsString).as("Should be no post scan 
filter").doesNotContain("Filter (");
      }
 diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
-index 6719c45..2515454 100644
+index 6719c45ca96..2515454401a 100644
 --- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
 +++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java
 @@ -616,7 +616,7 @@ public class TestStoragePartitionedJoins extends 
TestBaseWithCatalog {
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index fd3fb63b5..502fa0bd1 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -24,7 +24,7 @@ use crate::{
     jvm_bridge::{jni_call, JVMClasses},
 };
 use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, 
RecordBatchOptions};
-use arrow::compute::{cast_with_options, CastOptions};
+use arrow::compute::{cast_with_options, take, CastOptions};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::ffi::FFI_ArrowArray;
 use arrow::ffi::FFI_ArrowSchema;
@@ -237,8 +237,89 @@ impl ScanExec {
             return Ok(InputBatch::EOF);
         }
 
+        // Check for selection vectors and get selection indices if needed from
+        // JVM via FFI
+        // Selection vectors can be provided by, for instance, Iceberg to
+        // remove rows that have been deleted.
+        let selection_indices_arrays =
+            Self::get_selection_indices(&mut env, iter, num_cols, 
jvm_fetch_time, arrow_ffi_time)?;
+
+        // fetch batch data from JVM via FFI
         let mut timer = arrow_ffi_time.timer();
+        let (num_rows, array_addrs, schema_addrs) =
+            Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
+
+        let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);
+
+        // Process each column
+        for i in 0..num_cols {
+            let array_ptr = array_addrs[i];
+            let schema_ptr = schema_addrs[i];
+            let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
+
+            // TODO: validate array input data
+            // array_data.validate_full()?;
+
+            let array = make_array(array_data);
+
+            // Apply selection if selection vectors exist (applies to all 
columns)
+            let array = if let Some(ref selection_arrays) = 
selection_indices_arrays {
+                let indices = &selection_arrays[i];
+                // Apply the selection using Arrow's take kernel
+                match take(&*array, &**indices, None) {
+                    Ok(selected_array) => selected_array,
+                    Err(e) => {
+                        return 
Err(CometError::from(ExecutionError::ArrowError(format!(
+                            "Failed to apply selection for column {i}: {e}",
+                        ))));
+                    }
+                }
+            } else {
+                array
+            };
+
+            let array = if arrow_ffi_safe {
+                // ownership of this array has been transferred to native
+                array
+            } else {
+                // it is necessary to copy the array because the contents may 
be
+                // overwritten on the JVM side in the future
+                copy_array(&array)
+            };
+
+            inputs.push(array);
+
+            // Drop the Arcs to avoid memory leak
+            unsafe {
+                Rc::from_raw(array_ptr as *const FFI_ArrowArray);
+                Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
+            }
+        }
+
+        timer.stop();
+
+        // If selection was applied, determine the actual row count from the 
selected arrays
+        let actual_num_rows = if let Some(ref selection_arrays) = 
selection_indices_arrays {
+            if !selection_arrays.is_empty() {
+                // Use the length of the first selection array as the actual 
row count
+                selection_arrays[0].len()
+            } else {
+                num_rows as usize
+            }
+        } else {
+            num_rows as usize
+        };
+
+        Ok(InputBatch::new(inputs, Some(actual_num_rows)))
+    }
 
+    /// Allocates Arrow FFI structures and calls JNI to get the next batch 
data.
+    /// Returns the number of rows and the allocated array/schema addresses.
+    fn allocate_and_fetch_batch(
+        env: &mut jni::JNIEnv,
+        iter: &JObject,
+        num_cols: usize,
+    ) -> Result<(i32, Vec<i64>, Vec<i64>), CometError> {
         let mut array_addrs = Vec::with_capacity(num_cols);
         let mut schema_addrs = Vec::with_capacity(num_cols);
 
@@ -268,7 +349,7 @@ impl ScanExec {
         let schema_obj = JValueGen::Object(schema_obj.as_ref());
 
         let num_rows: i32 = unsafe {
-            jni_call!(&mut env,
+            jni_call!(env,
         comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)?
         };
 
@@ -276,39 +357,82 @@ impl ScanExec {
         // have a valid row count when calling next()
         assert!(num_rows != -1);
 
-        let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);
+        Ok((num_rows, array_addrs, schema_addrs))
+    }
 
-        for i in 0..num_cols {
-            let array_ptr = array_addrs[i];
-            let schema_ptr = schema_addrs[i];
-            let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
+    /// Checks for selection vectors and exports selection indices if needed.
+    /// Returns selection arrays if they exist (applies to all columns).
+    fn get_selection_indices(
+        env: &mut jni::JNIEnv,
+        iter: &JObject,
+        num_cols: usize,
+        jvm_fetch_time: &Time,
+        arrow_ffi_time: &Time,
+    ) -> Result<Option<Vec<ArrayRef>>, CometError> {
+        // Check if all columns have selection vectors
+        let mut timer = jvm_fetch_time.timer();
+        let has_selection_vectors_result: jni::sys::jboolean = unsafe {
+            jni_call!(env,
+                comet_batch_iterator(iter).has_selection_vectors() -> 
jni::sys::jboolean)?
+        };
+        timer.stop();
+        let has_selection_vectors = has_selection_vectors_result != 0;
 
-            // TODO: validate array input data
-            // array_data.validate_full()?;
+        let selection_indices_arrays = if has_selection_vectors {
+            let mut timer = arrow_ffi_time.timer();
 
-            let array = make_array(array_data);
+            // Allocate arrays for selection indices export (one per column)
+            let mut indices_array_addrs = Vec::with_capacity(num_cols);
+            let mut indices_schema_addrs = Vec::with_capacity(num_cols);
 
-            let array = if arrow_ffi_safe {
-                // ownership of this array has been transferred to native
-                array
-            } else {
-                // it is necessary to copy the array because the contents may 
be
-                // overwritten on the JVM side in the future
-                copy_array(&array)
-            };
+            for _ in 0..num_cols {
+                let arrow_array = Rc::new(FFI_ArrowArray::empty());
+                let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
+                indices_array_addrs.push(Rc::into_raw(arrow_array) as i64);
+                indices_schema_addrs.push(Rc::into_raw(arrow_schema) as i64);
+            }
 
-            inputs.push(array);
+            // Prepare JNI arrays for the export call
+            let indices_array_obj = env.new_long_array(num_cols as jsize)?;
+            let indices_schema_obj = env.new_long_array(num_cols as jsize)?;
+            env.set_long_array_region(&indices_array_obj, 0, 
&indices_array_addrs)?;
+            env.set_long_array_region(&indices_schema_obj, 0, 
&indices_schema_addrs)?;
 
-            // Drop the Arcs to avoid memory leak
-            unsafe {
-                Rc::from_raw(array_ptr as *const FFI_ArrowArray);
-                Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
+            timer.stop();
+
+            // Export selection indices from JVM
+            let mut timer = jvm_fetch_time.timer();
+            let _exported_count: i32 = unsafe {
+                jni_call!(env,
+                    comet_batch_iterator(iter).export_selection_indices(
+                        
JValueGen::Object(JObject::from(indices_array_obj).as_ref()),
+                        
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
+                    ) -> i32)?
+            };
+            timer.stop();
+
+            // Convert to ArrayRef for easier handling
+            let mut timer = arrow_ffi_time.timer();
+            let mut selection_arrays = Vec::with_capacity(num_cols);
+            for i in 0..num_cols {
+                let array_data =
+                    ArrayData::from_spark((indices_array_addrs[i], 
indices_schema_addrs[i]))?;
+                selection_arrays.push(make_array(array_data));
+
+                // Drop the references to the FFI arrays
+                unsafe {
+                    Rc::from_raw(indices_array_addrs[i] as *const 
FFI_ArrowArray);
+                    Rc::from_raw(indices_schema_addrs[i] as *const 
FFI_ArrowSchema);
+                }
             }
-        }
+            timer.stop();
 
-        timer.stop();
+            Some(selection_arrays)
+        } else {
+            None
+        };
 
-        Ok(InputBatch::new(inputs, Some(num_rows as usize)))
+        Ok(selection_indices_arrays)
     }
 }
 
diff --git a/native/core/src/jvm_bridge/batch_iterator.rs 
b/native/core/src/jvm_bridge/batch_iterator.rs
index 998e540c7..2824bdbfc 100644
--- a/native/core/src/jvm_bridge/batch_iterator.rs
+++ b/native/core/src/jvm_bridge/batch_iterator.rs
@@ -31,6 +31,10 @@ pub struct CometBatchIterator<'a> {
     pub method_has_next_ret: ReturnType,
     pub method_next: JMethodID,
     pub method_next_ret: ReturnType,
+    pub method_has_selection_vectors: JMethodID,
+    pub method_has_selection_vectors_ret: ReturnType,
+    pub method_export_selection_indices: JMethodID,
+    pub method_export_selection_indices_ret: ReturnType,
 }
 
 impl<'a> CometBatchIterator<'a> {
@@ -45,6 +49,18 @@ impl<'a> CometBatchIterator<'a> {
             method_has_next_ret: ReturnType::Primitive(Primitive::Int),
             method_next: env.get_method_id(Self::JVM_CLASS, "next", 
"([J[J)I")?,
             method_next_ret: ReturnType::Primitive(Primitive::Int),
+            method_has_selection_vectors: env.get_method_id(
+                Self::JVM_CLASS,
+                "hasSelectionVectors",
+                "()Z",
+            )?,
+            method_has_selection_vectors_ret: 
ReturnType::Primitive(Primitive::Boolean),
+            method_export_selection_indices: env.get_method_id(
+                Self::JVM_CLASS,
+                "exportSelectionIndices",
+                "([J[J)I",
+            )?,
+            method_export_selection_indices_ret: 
ReturnType::Primitive(Primitive::Int),
         })
     }
 }
diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java 
b/spark/src/main/java/org/apache/comet/CometBatchIterator.java
index 9b48a47c5..4f45f98a6 100644
--- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java
+++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java
@@ -23,6 +23,7 @@ import scala.collection.Iterator;
 
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
+import org.apache.comet.vector.CometSelectionVector;
 import org.apache.comet.vector.NativeUtil;
 
 /**
@@ -90,4 +91,49 @@ public class CometBatchIterator {
 
     return numRows;
   }
+
+  /**
+   * Check if the current batch has selection vectors for all columns.
+   *
+   * @return true if all columns are CometSelectionVector instances, false 
otherwise
+   */
+  public boolean hasSelectionVectors() {
+    if (currentBatch == null) {
+      return false;
+    }
+
+    // Check if all columns are CometSelectionVector instances
+    for (int i = 0; i < currentBatch.numCols(); i++) {
+      if (!(currentBatch.column(i) instanceof CometSelectionVector)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Export selection indices for all columns when they are selection vectors.
+   *
+   * @param arrayAddrs The addresses of the ArrowArray structures for indices
+   * @param schemaAddrs The addresses of the ArrowSchema structures for indices
+   * @return Number of selection indices arrays exported
+   */
+  public int exportSelectionIndices(long[] arrayAddrs, long[] schemaAddrs) {
+    if (currentBatch == null) {
+      return 0;
+    }
+
+    int exportCount = 0;
+    for (int i = 0; i < currentBatch.numCols(); i++) {
+      if (currentBatch.column(i) instanceof CometSelectionVector) {
+        CometSelectionVector selectionVector = (CometSelectionVector) 
currentBatch.column(i);
+
+        // Export the indices vector
+        nativeUtil.exportSingleVector(
+            selectionVector.getIndices(), arrayAddrs[exportCount], 
schemaAddrs[exportCount]);
+        exportCount++;
+      }
+    }
+    return exportCount;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to