This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new bb318a51a feat: [iceberg] delete rows support using selection vectors (#2346) bb318a51a is described below commit bb318a51ad355bdb13ae42336f100ff6c95b0cdb Author: Parth Chandra <par...@apache.org> AuthorDate: Tue Sep 9 12:08:30 2025 -0700 feat: [iceberg] delete rows support using selection vectors (#2346) --- .../apache/comet/vector/CometSelectionVector.java | 279 +++++++++++++++++++++ .../org/apache/comet/vector/HasRowIdMapping.java | 39 --- .../scala/org/apache/comet/vector/NativeUtil.scala | 55 ++++ dev/diffs/iceberg/1.8.1.diff | 133 ++++++---- native/core/src/execution/operators/scan.rs | 176 +++++++++++-- native/core/src/jvm_bridge/batch_iterator.rs | 16 ++ .../java/org/apache/comet/CometBatchIterator.java | 46 ++++ 7 files changed, 625 insertions(+), 119 deletions(-) diff --git a/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java b/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java new file mode 100644 index 000000000..3c353976d --- /dev/null +++ b/common/src/main/java/org/apache/comet/vector/CometSelectionVector.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A zero-copy selection vector that extends CometVector. This implementation stores the original + * data vector and selection indices as separate CometVectors, providing zero copy access to the the + * underlying data. + * + * <p>If the original vector has values [v0, v1, v2, v3, v4, v5, v6, v7] and the selection indices + * are [0, 1, 3, 4, 5, 7], then this selection vector will logically represent [v0, v1, v3, v4, v5, + * v7] without actually copying the data. + * + * <p>Most of the implementations of CometVector methods are implemented for completeness. We don't + * use this class except to transfer the original data and the selection indices to the native code. + */ +public class CometSelectionVector extends CometVector { + /** The original vector containing all values */ + private final CometVector values; + + /** + * The valid indices in the values vector. This array is converted into an Arrow vector so we can + * transfer the data to native in one JNI call. This is used to represent the rowid mapping used + * by Iceberg + */ + private final int[] selectionIndices; + + /** + * The indices vector containing selection indices. This is currently allocated by the JVM side + * unlike the values vector which is allocated on the native side + */ + private final CometVector indices; + + /** + * Number of selected elements. The indices array may have a length greater than this but only + * numValues elements in the array are valid + */ + private final int numValues; + + /** + * Creates a new selection vector from the given vector and indices. + * + * @param values The original vector to select from + * @param indices The indices to select from the original vector + * @param numValues The number of valid values in the indices array + * @throws IllegalArgumentException if any index is out of bounds + */ + public CometSelectionVector(CometVector values, int[] indices, int numValues) { + // Use the values vector's datatype, useDecimal128, and dictionary provider + super(values.dataType(), values.useDecimal128); + + this.values = values; + this.selectionIndices = indices; + this.numValues = numValues; + + // Validate indices + int originalLength = values.numValues(); + for (int idx : indices) { + if (idx < 0 || idx >= originalLength) { + throw new IllegalArgumentException( + String.format( + "Index %d is out of bounds for vector of length %d", idx, originalLength)); + } + } + + // Create indices vector + BufferAllocator allocator = values.getValueVector().getAllocator(); + IntVector indicesVector = new IntVector("selection_indices", allocator); + indicesVector.allocateNew(numValues); + for (int i = 0; i < numValues; i++) { + indicesVector.set(i, indices[i]); + } + indicesVector.setValueCount(numValues); + + this.indices = + CometVector.getVector(indicesVector, values.useDecimal128, values.getDictionaryProvider()); + } + + /** + * Returns the index in the values vector for the given selection vector index. + * + * @param selectionIndex Index in the selection vector + * @return The corresponding index in the original vector + * @throws IndexOutOfBoundsException if selectionIndex is out of bounds + */ + private int getValuesIndex(int selectionIndex) { + if (selectionIndex < 0 || selectionIndex >= numValues) { + throw new IndexOutOfBoundsException( + String.format( + "Selection index %d is out of bounds for selection vector of length %d", + selectionIndex, numValues)); + } + return indices.getInt(selectionIndex); + } + + /** + * Returns a reference to the values vector. + * + * @return The CometVector containing the values + */ + public CometVector getValues() { + return values; + } + + /** + * Returns the indices vector. + * + * @return The CometVector containing the indices + */ + public CometVector getIndices() { + return indices; + } + + /** + * Returns the selected indices. + * + * @return Array of selected indices + */ + private int[] getSelectedIndices() { + return selectionIndices; + } + + @Override + public int numValues() { + return numValues; + } + + @Override + public void setNumValues(int numValues) { + // For selection vectors, we don't allow changing the number of values + // as it would break the mapping between selection indices and values + throw new UnsupportedOperationException("CometSelectionVector doesn't support setNumValues"); + } + + @Override + public void setNumNulls(int numNulls) { + // For selection vectors, null count should be delegated to the underlying values vector + // The selection doesn't change the null semantics + values.setNumNulls(numNulls); + } + + @Override + public boolean hasNull() { + return values.hasNull(); + } + + @Override + public int numNulls() { + return values.numNulls(); + } + + // ColumnVector method implementations - delegate to original vector with index mapping + @Override + public boolean isNullAt(int rowId) { + return values.isNullAt(getValuesIndex(rowId)); + } + + @Override + public boolean getBoolean(int rowId) { + return values.getBoolean(getValuesIndex(rowId)); + } + + @Override + public byte getByte(int rowId) { + return values.getByte(getValuesIndex(rowId)); + } + + @Override + public short getShort(int rowId) { + return values.getShort(getValuesIndex(rowId)); + } + + @Override + public int getInt(int rowId) { + return values.getInt(getValuesIndex(rowId)); + } + + @Override + public long getLong(int rowId) { + return values.getLong(getValuesIndex(rowId)); + } + + @Override + public long getLongDecimal(int rowId) { + return values.getLongDecimal(getValuesIndex(rowId)); + } + + @Override + public float getFloat(int rowId) { + return values.getFloat(getValuesIndex(rowId)); + } + + @Override + public double getDouble(int rowId) { + return values.getDouble(getValuesIndex(rowId)); + } + + @Override + public UTF8String getUTF8String(int rowId) { + return values.getUTF8String(getValuesIndex(rowId)); + } + + @Override + public byte[] getBinary(int rowId) { + return values.getBinary(getValuesIndex(rowId)); + } + + @Override + public ColumnarArray getArray(int rowId) { + return values.getArray(getValuesIndex(rowId)); + } + + @Override + public ColumnarMap getMap(int rowId) { + return values.getMap(getValuesIndex(rowId)); + } + + @Override + public ColumnVector getChild(int ordinal) { + // Return the child from the original vector + return values.getChild(ordinal); + } + + @Override + public DictionaryProvider getDictionaryProvider() { + return values.getDictionaryProvider(); + } + + @Override + public CometVector slice(int offset, int length) { + if (offset < 0 || length < 0 || offset + length > numValues) { + throw new IllegalArgumentException("Invalid slice parameters"); + } + // Get the current indices and slice them + int[] currentIndices = getSelectedIndices(); + int[] slicedIndices = new int[length]; + // This is not a very efficient version of slicing, but that is + // not important because we are not likely to use it. + System.arraycopy(currentIndices, offset, slicedIndices, 0, length); + return new CometSelectionVector(values, slicedIndices, length); + } + + @Override + public org.apache.arrow.vector.ValueVector getValueVector() { + return values.getValueVector(); + } + + @Override + public void close() { + // Close both the values and indices vectors + values.close(); + indices.close(); + } +} diff --git a/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java b/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java deleted file mode 100644 index 8794902b4..000000000 --- a/common/src/main/java/org/apache/comet/vector/HasRowIdMapping.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.vector; - -/** - * An interface could be implemented by vectors that have row id mapping. - * - * <p>For example, Iceberg's DeleteFile has a row id mapping to map row id to position. This - * interface is used to set and get the row id mapping. The row id mapping is an array of integers, - * where the index is the row id and the value is the position. Here is an example: - * [0,1,2,3,4,5,6,7] -- Original status of the row id mapping array Position delete 2, 6 - * [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] - */ -public interface HasRowIdMapping { - default void setRowIdMapping(int[] rowIdMapping) { - throw new UnsupportedOperationException("setRowIdMapping is not supported"); - } - - default int[] getRowIdMapping() { - throw new UnsupportedOperationException("getRowIdMapping is not supported"); - } -} diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index fba4e29e5..d5d6ded55 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -96,6 +96,30 @@ class NativeUtil { (0 until batch.numCols()).foreach { index => batch.column(index) match { + case selectionVector: CometSelectionVector => + // For CometSelectionVector, export only the values vector + val valuesVector = selectionVector.getValues + val valueVector = valuesVector.getValueVector + + // Use the selection vector's logical row count + numRows += selectionVector.numValues() + + val provider = if (valueVector.getField.getDictionary != null) { + valuesVector.getDictionaryProvider + } else { + null + } + + // The array and schema structures are allocated by native side. + // Don't need to deallocate them here. + val arrowSchema = ArrowSchema.wrap(schemaAddrs(index)) + val arrowArray = ArrowArray.wrap(arrayAddrs(index)) + Data.exportVector( + allocator, + getFieldVector(valueVector, "export"), + provider, + arrowArray, + arrowSchema) case a: CometVector => val valueVector = a.getValueVector @@ -133,9 +157,40 @@ class NativeUtil { // the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in // its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report // logical number of rows which is less than actual number of rows due to row deletion. + // Similarly, CometSelectionVector represents a different number of logical rows than the + // underlying vector. numRows.headOption.getOrElse(batch.numRows()) } + /** + * Exports a single CometVector to native side. + * + * @param vector + * The CometVector to export + * @param arrayAddr + * The address of the ArrowArray structure + * @param schemaAddr + * The address of the ArrowSchema structure + */ + def exportSingleVector(vector: CometVector, arrayAddr: Long, schemaAddr: Long): Unit = { + val valueVector = vector.getValueVector + + val provider = if (valueVector.getField.getDictionary != null) { + vector.getDictionaryProvider + } else { + null + } + + val arrowSchema = ArrowSchema.wrap(schemaAddr) + val arrowArray = ArrowArray.wrap(arrayAddr) + Data.exportVector( + allocator, + getFieldVector(valueVector, "export"), + provider, + arrowArray, + arrowSchema) + } + /** * Gets the next batch from native execution. * diff --git a/dev/diffs/iceberg/1.8.1.diff b/dev/diffs/iceberg/1.8.1.diff index 2f80453de..675208f5f 100644 --- a/dev/diffs/iceberg/1.8.1.diff +++ b/dev/diffs/iceberg/1.8.1.diff @@ -1,5 +1,5 @@ diff --git a/build.gradle b/build.gradle -index 7327b38..7967109 100644 +index 7327b38905d..7967109f039 100644 --- a/build.gradle +++ b/build.gradle @@ -780,6 +780,13 @@ project(':iceberg-parquet') { @@ -17,7 +17,7 @@ index 7327b38..7967109 100644 exclude group: 'org.apache.avro', module: 'avro' // already shaded by Parquet diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml -index 04ffa8f..cc0099c 100644 +index 04ffa8f4edc..cc0099ccc93 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31" @@ -39,7 +39,7 @@ index 04ffa8f..cc0099c 100644 tez010 = "0.10.4" diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java new file mode 100644 -index 0000000..ddf6c7d +index 00000000000..ddf6c7de5ae --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java @@ -0,0 +1,255 @@ @@ -300,7 +300,7 @@ index 0000000..ddf6c7d +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java new file mode 100644 -index 0000000..88b195b +index 00000000000..88b195b76a2 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java @@ -0,0 +1,255 @@ @@ -560,7 +560,7 @@ index 0000000..88b195b + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java -index 2c37a52..3442cfc 100644 +index 2c37a52449e..3442cfc4375 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1075,6 +1075,7 @@ public class Parquet { @@ -639,7 +639,7 @@ index 2c37a52..3442cfc 100644 return new org.apache.iceberg.parquet.ParquetReader<>( file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java -index 1fb2372..142e5fb 100644 +index 1fb2372ba56..142e5fbadf1 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java @@ -157,6 +157,14 @@ class ReadConf<T> { @@ -658,7 +658,7 @@ index 1fb2372..142e5fb 100644 return model; } diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle -index e2d2c7a..f64232d 100644 +index e2d2c7a7ac0..f64232dc57f 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { @@ -699,7 +699,7 @@ index e2d2c7a..f64232d 100644 relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java -index 578845e..0118b30 100644 +index 578845e3da2..0118b30683d 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java @@ -57,6 +57,16 @@ public abstract class ExtensionsTestBase extends CatalogTestBase { @@ -720,7 +720,7 @@ index 578845e..0118b30 100644 .getOrCreate(); diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java -index ade19de..150a2cd 100644 +index ade19de36fe..150a2cddbc8 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java @@ -56,6 +56,16 @@ public class TestCallStatementParser { @@ -741,7 +741,7 @@ index ade19de..150a2cd 100644 TestCallStatementParser.parser = spark.sessionState().sqlParser(); } diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java -index 64edb10..5bb449f 100644 +index 64edb1002e9..5bb449f1ac7 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java @@ -179,6 +179,16 @@ public class DeleteOrphanFilesBenchmark { @@ -762,7 +762,7 @@ index 64edb10..5bb449f 100644 spark = builder.getOrCreate(); } diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java -index a5d0456..4af408f 100644 +index a5d0456b0b2..4af408f4861 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java @@ -392,6 +392,16 @@ public class IcebergSortCompactionBenchmark { @@ -783,7 +783,7 @@ index a5d0456..4af408f 100644 spark = builder.getOrCreate(); Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java -index c6794e4..f735919 100644 +index c6794e43c63..f7359197407 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java @@ -239,6 +239,16 @@ public class DVReaderBenchmark { @@ -804,7 +804,7 @@ index c6794e4..f735919 100644 .getOrCreate(); } diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java -index ac74fb5..e011b8b 100644 +index ac74fb5a109..e011b8b2510 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java @@ -223,6 +223,16 @@ public class DVWriterBenchmark { @@ -825,7 +825,7 @@ index ac74fb5..e011b8b 100644 .getOrCreate(); } diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java -index 68c537e..f66be2f 100644 +index 68c537e34a4..f66be2f3896 100644 --- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java @@ -94,7 +94,19 @@ public abstract class IcebergSourceBenchmark { @@ -850,7 +850,7 @@ index 68c537e..f66be2f 100644 builder .config("parquet.dictionary.page.size", "1") diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -index 4794863..8bb508f 100644 +index 4794863ab1b..8bb508f19f8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java @@ -20,21 +20,25 @@ package org.apache.iceberg.spark.data.vectorized; @@ -953,20 +953,24 @@ index 4794863..8bb508f 100644 @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java -index 1440e5d..fc6b283 100644 +index 1440e5d1d3f..85cca62e90f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java -@@ -23,7 +23,8 @@ import java.io.UncheckedIOException; +@@ -22,8 +22,12 @@ import java.io.IOException; + import java.io.UncheckedIOException; import java.util.List; import java.util.Map; ++import org.apache.comet.CometRuntimeException; import org.apache.comet.parquet.AbstractColumnReader; -import org.apache.comet.parquet.BatchReader; +import org.apache.comet.parquet.IcebergCometBatchReader; +import org.apache.comet.parquet.RowGroupReader; ++import org.apache.comet.vector.CometSelectionVector; ++import org.apache.comet.vector.CometVector; import org.apache.iceberg.Schema; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.VectorizedReader; -@@ -55,7 +56,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { +@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { // calling BatchReader.nextBatch, the isDeleted value is not yet available, so // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is // available. @@ -975,7 +979,7 @@ index 1440e5d..fc6b283 100644 private DeleteFilter<InternalRow> deletes = null; private long rowStartPosInBatch = 0; -@@ -65,9 +66,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { +@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); @@ -986,7 +990,7 @@ index 1440e5d..fc6b283 100644 } @Override -@@ -85,19 +84,22 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { +@@ -85,19 +87,22 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { && !(readers[i] instanceof CometPositionColumnReader) && !(readers[i] instanceof CometDeleteColumnReader)) { readers[i].reset(); @@ -1012,8 +1016,29 @@ index 1440e5d..fc6b283 100644 .getRowIndexOffset() .orElseThrow( () -> +@@ -154,9 +159,17 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> { + Pair<int[], Integer> pair = buildRowIdMapping(vectors); + if (pair != null) { + int[] rowIdMapping = pair.first(); +- numLiveRows = pair.second(); +- for (int i = 0; i < vectors.length; i++) { +- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping); ++ if (pair.second() != null) { ++ numLiveRows = pair.second(); ++ for (int i = 0; i < vectors.length; i++) { ++ if (vectors[i] instanceof CometVector) { ++ vectors[i] = ++ new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows); ++ } else { ++ throw new CometRuntimeException( ++ "Unsupported column vector type: " + vectors[i].getClass()); ++ } ++ } + } + } + } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java -index 047c963..88d691a 100644 +index 047c96314b1..88d691a607a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java @@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized; @@ -1038,7 +1063,7 @@ index 047c963..88d691a 100644 @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java -index 6235bfe..cba108e 100644 +index 6235bfe4865..cba108e4326 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java @@ -51,10 +51,10 @@ class CometDeleteColumnReader<T> extends CometColumnReader { @@ -1055,7 +1080,7 @@ index 6235bfe..cba108e 100644 } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java -index bcc0e51..98e8006 100644 +index bcc0e514c28..98e80068c51 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java @@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; @@ -1076,7 +1101,7 @@ index bcc0e51..98e8006 100644 false /* isConstant = false */); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java -index d36f1a7..56f8c9b 100644 +index d36f1a72747..56f8c9bff93 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java @@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReade @@ -1089,7 +1114,7 @@ index d36f1a7..56f8c9b 100644 } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java -index 780e175..57892ac 100644 +index 780e1750a52..57892ac4c59 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -109,6 +109,7 @@ abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBa @@ -1101,7 +1126,7 @@ index 780e175..57892ac 100644 } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java -index 11f054b..b37dd33 100644 +index 11f054b1171..f443042ba48 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -172,17 +172,32 @@ class SparkBatch implements Batch { @@ -1129,7 +1154,7 @@ index 11f054b..b37dd33 100644 + } else if (task.isFileScanTask() && !task.isDataTask()) { + FileScanTask fileScanTask = task.asFileScanTask(); + // Comet can't handle delete files for now -+ return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); ++ return fileScanTask.file().format() == FileFormat.PARQUET; + + } else { + return false; @@ -1140,7 +1165,7 @@ index 11f054b..b37dd33 100644 // - ORC vectorization is enabled // - all tasks are of type FileScanTask and read only ORC files with no delete files diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java -index 019f391..656e060 100644 +index 019f3919dc5..656e0600ac0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -23,6 +23,7 @@ import java.util.List; @@ -1172,7 +1197,7 @@ index 019f391..656e060 100644 + } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java -index 404ba72..19d0cd5 100644 +index 404ba728460..19d0cd5c827 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java @@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase @@ -1193,7 +1218,7 @@ index 404ba72..19d0cd5 100644 } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java -index 659507e..eb9cedc 100644 +index 659507e4c5e..eb9cedc34c5 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java @@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes @@ -1214,7 +1239,7 @@ index 659507e..eb9cedc 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java -index a218f96..395c024 100644 +index a218f965ea6..395c02441e7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java @@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles @@ -1235,7 +1260,7 @@ index a218f96..395c024 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java -index 2665d7b..306e859 100644 +index 2665d7ba8d3..306e859ce1a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java @@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting @@ -1256,7 +1281,7 @@ index 2665d7b..306e859 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java -index de68351..3ab0d3d 100644 +index de68351f6e3..3ab0d3db477 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java @@ -77,6 +77,17 @@ public abstract class TestBase extends SparkTestHelperBase { @@ -1278,7 +1303,7 @@ index de68351..3ab0d3d 100644 .getOrCreate(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java -index bc4e722..1a40d8e 100644 +index bc4e722bc86..1a40d8edcb3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java @@ -59,7 +59,20 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect @@ -1304,7 +1329,7 @@ index bc4e722..1a40d8e 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java -index 3a26974..59592eb 100644 +index 3a269740b70..59592eb2ad7 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java @@ -54,7 +54,20 @@ public abstract class ScanTestBase extends AvroDataTest { @@ -1330,7 +1355,7 @@ index 3a26974..59592eb 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java -index f411920..367c2e2 100644 +index f411920a5dc..367c2e2cb45 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java @@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase { @@ -1356,7 +1381,7 @@ index f411920..367c2e2 100644 @BeforeEach diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java -index c4ba96e..a0e77db 100644 +index c4ba96e6340..a0e77db99b9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java @@ -75,7 +75,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog { @@ -1382,7 +1407,7 @@ index c4ba96e..a0e77db 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java -index 3481735..bdd3152 100644 +index 348173596e4..bdd31528d3f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -110,7 +110,20 @@ public class TestFilteredScan { @@ -1408,7 +1433,7 @@ index 3481735..bdd3152 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java -index 84c99a5..6f4b2af 100644 +index 84c99a575c8..6f4b2af003b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java @@ -93,7 +93,20 @@ public class TestForwardCompatibility { @@ -1434,7 +1459,7 @@ index 84c99a5..6f4b2af 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java -index 7eff93d..1774acd 100644 +index 7eff93d204e..1774acd056b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java @@ -46,7 +46,20 @@ public class TestIcebergSpark { @@ -1460,7 +1485,7 @@ index 7eff93d..1774acd 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java -index 9464f68..b5dbb88 100644 +index 9464f687b0e..b5dbb88dd5a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java @@ -112,7 +112,20 @@ public class TestPartitionPruning { @@ -1486,7 +1511,7 @@ index 9464f68..b5dbb88 100644 String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java -index 5c218f2..e48e6d1 100644 +index 5c218f21c47..e48e6d121f0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java @@ -107,7 +107,20 @@ public class TestPartitionValues { @@ -1512,7 +1537,7 @@ index 5c218f2..e48e6d1 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java -index a7334a5..dca7fdd 100644 +index a7334a580ca..dca7fdd9c69 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java @@ -87,7 +87,20 @@ public class TestSnapshotSelection { @@ -1538,7 +1563,7 @@ index a7334a5..dca7fdd 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java -index 182b1ef..5da4a3c 100644 +index 182b1ef8f5a..5da4a3c90aa 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java @@ -120,7 +120,20 @@ public class TestSparkDataFile { @@ -1564,7 +1589,7 @@ index 182b1ef..5da4a3c 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java -index fb2b312..5927a40 100644 +index fb2b312bed9..5927a408c6b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java @@ -96,7 +96,20 @@ public class TestSparkDataWrite { @@ -1671,7 +1696,7 @@ index fb2b312..5927a40 100644 assertThat(actual) .describedAs("Result rows should match") diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java -index becf6a0..beb7c80 100644 +index becf6a064dc..beb7c801b05 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java @@ -83,7 +83,20 @@ public class TestSparkReadProjection extends TestReadProjection { @@ -1697,7 +1722,7 @@ index becf6a0..beb7c80 100644 ImmutableMap.of( "type", "hive", diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java -index 4f1cef5..0e34e08 100644 +index 4f1cef5d373..0e34e08db4a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java @@ -136,6 +136,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests { @@ -1718,7 +1743,7 @@ index 4f1cef5..0e34e08 100644 .getOrCreate(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java -index baf7fa8..509c5de 100644 +index baf7fa8f88a..509c5deba51 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java @@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter { @@ -1739,7 +1764,7 @@ index baf7fa8..509c5de 100644 .getOrCreate(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java -index 17db46b..d68b638 100644 +index 17db46b85c3..d68b638ce09 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java @@ -65,6 +65,16 @@ public class TestStructuredStreaming { @@ -1760,7 +1785,7 @@ index 17db46b..d68b638 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java -index 306444b..0ed52fd 100644 +index 306444b9f29..0ed52fd6cfb 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java @@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase { @@ -1786,7 +1811,7 @@ index 306444b..0ed52fd 100644 @AfterAll diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java -index 841268a..e290c24 100644 +index 841268a6be0..e290c246f7b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java @@ -80,7 +80,20 @@ public class TestWriteMetricsConfig { @@ -1812,7 +1837,7 @@ index 841268a..e290c24 100644 } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java -index 6e09252..438bc4d 100644 +index 6e09252704a..438bc4da575 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java @@ -60,6 +60,16 @@ public class TestAggregatePushDown extends CatalogTestBase { @@ -1833,7 +1858,7 @@ index 6e09252..438bc4d 100644 .getOrCreate(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java -index 9d2ce2b..5e23368 100644 +index 9d2ce2b388a..5e233688488 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java @@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog { @@ -1848,7 +1873,7 @@ index 9d2ce2b..5e23368 100644 assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter ("); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java -index 6719c45..2515454 100644 +index 6719c45ca96..2515454401a 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -616,7 +616,7 @@ public class TestStoragePartitionedJoins extends TestBaseWithCatalog { diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index fd3fb63b5..502fa0bd1 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -24,7 +24,7 @@ use crate::{ jvm_bridge::{jni_call, JVMClasses}, }; use arrow::array::{make_array, ArrayData, ArrayRef, RecordBatch, RecordBatchOptions}; -use arrow::compute::{cast_with_options, CastOptions}; +use arrow::compute::{cast_with_options, take, CastOptions}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::ffi::FFI_ArrowArray; use arrow::ffi::FFI_ArrowSchema; @@ -237,8 +237,89 @@ impl ScanExec { return Ok(InputBatch::EOF); } + // Check for selection vectors and get selection indices if needed from + // JVM via FFI + // Selection vectors can be provided by, for instance, Iceberg to + // remove rows that have been deleted. + let selection_indices_arrays = + Self::get_selection_indices(&mut env, iter, num_cols, jvm_fetch_time, arrow_ffi_time)?; + + // fetch batch data from JVM via FFI let mut timer = arrow_ffi_time.timer(); + let (num_rows, array_addrs, schema_addrs) = + Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?; + + let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols); + + // Process each column + for i in 0..num_cols { + let array_ptr = array_addrs[i]; + let schema_ptr = schema_addrs[i]; + let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; + + // TODO: validate array input data + // array_data.validate_full()?; + + let array = make_array(array_data); + + // Apply selection if selection vectors exist (applies to all columns) + let array = if let Some(ref selection_arrays) = selection_indices_arrays { + let indices = &selection_arrays[i]; + // Apply the selection using Arrow's take kernel + match take(&*array, &**indices, None) { + Ok(selected_array) => selected_array, + Err(e) => { + return Err(CometError::from(ExecutionError::ArrowError(format!( + "Failed to apply selection for column {i}: {e}", + )))); + } + } + } else { + array + }; + + let array = if arrow_ffi_safe { + // ownership of this array has been transferred to native + array + } else { + // it is necessary to copy the array because the contents may be + // overwritten on the JVM side in the future + copy_array(&array) + }; + + inputs.push(array); + + // Drop the Arcs to avoid memory leak + unsafe { + Rc::from_raw(array_ptr as *const FFI_ArrowArray); + Rc::from_raw(schema_ptr as *const FFI_ArrowSchema); + } + } + + timer.stop(); + + // If selection was applied, determine the actual row count from the selected arrays + let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays { + if !selection_arrays.is_empty() { + // Use the length of the first selection array as the actual row count + selection_arrays[0].len() + } else { + num_rows as usize + } + } else { + num_rows as usize + }; + + Ok(InputBatch::new(inputs, Some(actual_num_rows))) + } + /// Allocates Arrow FFI structures and calls JNI to get the next batch data. + /// Returns the number of rows and the allocated array/schema addresses. + fn allocate_and_fetch_batch( + env: &mut jni::JNIEnv, + iter: &JObject, + num_cols: usize, + ) -> Result<(i32, Vec<i64>, Vec<i64>), CometError> { let mut array_addrs = Vec::with_capacity(num_cols); let mut schema_addrs = Vec::with_capacity(num_cols); @@ -268,7 +349,7 @@ impl ScanExec { let schema_obj = JValueGen::Object(schema_obj.as_ref()); let num_rows: i32 = unsafe { - jni_call!(&mut env, + jni_call!(env, comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)? }; @@ -276,39 +357,82 @@ impl ScanExec { // have a valid row count when calling next() assert!(num_rows != -1); - let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols); + Ok((num_rows, array_addrs, schema_addrs)) + } - for i in 0..num_cols { - let array_ptr = array_addrs[i]; - let schema_ptr = schema_addrs[i]; - let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; + /// Checks for selection vectors and exports selection indices if needed. + /// Returns selection arrays if they exist (applies to all columns). + fn get_selection_indices( + env: &mut jni::JNIEnv, + iter: &JObject, + num_cols: usize, + jvm_fetch_time: &Time, + arrow_ffi_time: &Time, + ) -> Result<Option<Vec<ArrayRef>>, CometError> { + // Check if all columns have selection vectors + let mut timer = jvm_fetch_time.timer(); + let has_selection_vectors_result: jni::sys::jboolean = unsafe { + jni_call!(env, + comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)? + }; + timer.stop(); + let has_selection_vectors = has_selection_vectors_result != 0; - // TODO: validate array input data - // array_data.validate_full()?; + let selection_indices_arrays = if has_selection_vectors { + let mut timer = arrow_ffi_time.timer(); - let array = make_array(array_data); + // Allocate arrays for selection indices export (one per column) + let mut indices_array_addrs = Vec::with_capacity(num_cols); + let mut indices_schema_addrs = Vec::with_capacity(num_cols); - let array = if arrow_ffi_safe { - // ownership of this array has been transferred to native - array - } else { - // it is necessary to copy the array because the contents may be - // overwritten on the JVM side in the future - copy_array(&array) - }; + for _ in 0..num_cols { + let arrow_array = Rc::new(FFI_ArrowArray::empty()); + let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); + indices_array_addrs.push(Rc::into_raw(arrow_array) as i64); + indices_schema_addrs.push(Rc::into_raw(arrow_schema) as i64); + } - inputs.push(array); + // Prepare JNI arrays for the export call + let indices_array_obj = env.new_long_array(num_cols as jsize)?; + let indices_schema_obj = env.new_long_array(num_cols as jsize)?; + env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?; + env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?; - // Drop the Arcs to avoid memory leak - unsafe { - Rc::from_raw(array_ptr as *const FFI_ArrowArray); - Rc::from_raw(schema_ptr as *const FFI_ArrowSchema); + timer.stop(); + + // Export selection indices from JVM + let mut timer = jvm_fetch_time.timer(); + let _exported_count: i32 = unsafe { + jni_call!(env, + comet_batch_iterator(iter).export_selection_indices( + JValueGen::Object(JObject::from(indices_array_obj).as_ref()), + JValueGen::Object(JObject::from(indices_schema_obj).as_ref()) + ) -> i32)? + }; + timer.stop(); + + // Convert to ArrayRef for easier handling + let mut timer = arrow_ffi_time.timer(); + let mut selection_arrays = Vec::with_capacity(num_cols); + for i in 0..num_cols { + let array_data = + ArrayData::from_spark((indices_array_addrs[i], indices_schema_addrs[i]))?; + selection_arrays.push(make_array(array_data)); + + // Drop the references to the FFI arrays + unsafe { + Rc::from_raw(indices_array_addrs[i] as *const FFI_ArrowArray); + Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema); + } } - } + timer.stop(); - timer.stop(); + Some(selection_arrays) + } else { + None + }; - Ok(InputBatch::new(inputs, Some(num_rows as usize))) + Ok(selection_indices_arrays) } } diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index 998e540c7..2824bdbfc 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -31,6 +31,10 @@ pub struct CometBatchIterator<'a> { pub method_has_next_ret: ReturnType, pub method_next: JMethodID, pub method_next_ret: ReturnType, + pub method_has_selection_vectors: JMethodID, + pub method_has_selection_vectors_ret: ReturnType, + pub method_export_selection_indices: JMethodID, + pub method_export_selection_indices_ret: ReturnType, } impl<'a> CometBatchIterator<'a> { @@ -45,6 +49,18 @@ impl<'a> CometBatchIterator<'a> { method_has_next_ret: ReturnType::Primitive(Primitive::Int), method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?, method_next_ret: ReturnType::Primitive(Primitive::Int), + method_has_selection_vectors: env.get_method_id( + Self::JVM_CLASS, + "hasSelectionVectors", + "()Z", + )?, + method_has_selection_vectors_ret: ReturnType::Primitive(Primitive::Boolean), + method_export_selection_indices: env.get_method_id( + Self::JVM_CLASS, + "exportSelectionIndices", + "([J[J)I", + )?, + method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int), }) } } diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index 9b48a47c5..4f45f98a6 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -23,6 +23,7 @@ import scala.collection.Iterator; import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.comet.vector.CometSelectionVector; import org.apache.comet.vector.NativeUtil; /** @@ -90,4 +91,49 @@ public class CometBatchIterator { return numRows; } + + /** + * Check if the current batch has selection vectors for all columns. + * + * @return true if all columns are CometSelectionVector instances, false otherwise + */ + public boolean hasSelectionVectors() { + if (currentBatch == null) { + return false; + } + + // Check if all columns are CometSelectionVector instances + for (int i = 0; i < currentBatch.numCols(); i++) { + if (!(currentBatch.column(i) instanceof CometSelectionVector)) { + return false; + } + } + return true; + } + + /** + * Export selection indices for all columns when they are selection vectors. + * + * @param arrayAddrs The addresses of the ArrowArray structures for indices + * @param schemaAddrs The addresses of the ArrowSchema structures for indices + * @return Number of selection indices arrays exported + */ + public int exportSelectionIndices(long[] arrayAddrs, long[] schemaAddrs) { + if (currentBatch == null) { + return 0; + } + + int exportCount = 0; + for (int i = 0; i < currentBatch.numCols(); i++) { + if (currentBatch.column(i) instanceof CometSelectionVector) { + CometSelectionVector selectionVector = (CometSelectionVector) currentBatch.column(i); + + // Export the indices vector + nativeUtil.exportSingleVector( + selectionVector.getIndices(), arrayAddrs[exportCount], schemaAddrs[exportCount]); + exportCount++; + } + } + return exportCount; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org