This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fe645264 Revert "perf: Remove mutable buffers from scan 
partition/missing columns (#3411)" (#3486)
4fe645264 is described below

commit 4fe64526448b1d32a5633076271d594a67159480
Author: Matt Butrovich <[email protected]>
AuthorDate: Wed Feb 11 08:52:01 2026 -0500

    Revert "perf: Remove mutable buffers from scan partition/missing columns 
(#3411)" (#3486)
---
 .../comet/parquet/ArrowConstantColumnReader.java   | 255 ---------------------
 .../comet/parquet/ArrowRowIndexColumnReader.java   | 109 ---------
 .../apache/comet/parquet/NativeBatchReader.java    |  12 +-
 .../org/apache/spark/sql/comet/operators.scala     |   2 -
 4 files changed, 5 insertions(+), 373 deletions(-)

diff --git 
a/common/src/main/java/org/apache/comet/parquet/ArrowConstantColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ArrowConstantColumnReader.java
deleted file mode 100644
index 521eb4aa5..000000000
--- 
a/common/src/main/java/org/apache/comet/parquet/ArrowConstantColumnReader.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import java.math.BigDecimal;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.*;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import org.apache.comet.vector.CometPlainVector;
-import org.apache.comet.vector.CometVector;
-
-/**
- * A column reader that returns constant vectors using Arrow Java vectors 
directly (no native
- * mutable buffers). Used for partition columns and missing columns in the 
native_iceberg_compat
- * scan path.
- *
- * <p>The vector is filled with the constant value repeated for every row in 
the batch. This is
- * necessary because the underlying Arrow vector's buffers must be large 
enough to match the
- * reported value count — otherwise variable-width types (strings, binary) 
would have undersized
- * offset buffers, causing out-of-bounds reads on the native side.
- */
-public class ArrowConstantColumnReader extends AbstractColumnReader {
-  private final BufferAllocator allocator = new RootAllocator();
-
-  private boolean isNull;
-  private Object value;
-  private FieldVector fieldVector;
-  private CometPlainVector vector;
-  private int currentSize;
-
-  /** Constructor for missing columns (default values from schema). */
-  ArrowConstantColumnReader(StructField field, int batchSize, boolean 
useDecimal128) {
-    super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, 
false);
-    this.batchSize = batchSize;
-    this.value =
-        ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new 
StructField[] {field}))[
-            0];
-    initVector(value, batchSize);
-  }
-
-  /** Constructor for partition columns with values from a row. */
-  ArrowConstantColumnReader(
-      StructField field, int batchSize, InternalRow values, int index, boolean 
useDecimal128) {
-    super(field.dataType(), TypeUtil.convertToParquet(field), useDecimal128, 
false);
-    this.batchSize = batchSize;
-    Object v = values.get(index, field.dataType());
-    this.value = v;
-    initVector(v, batchSize);
-  }
-
-  @Override
-  public void setBatchSize(int batchSize) {
-    close();
-    this.batchSize = batchSize;
-    initVector(value, batchSize);
-  }
-
-  @Override
-  public void readBatch(int total) {
-    if (total != currentSize) {
-      close();
-      initVector(value, total);
-    }
-  }
-
-  @Override
-  public CometVector currentBatch() {
-    return vector;
-  }
-
-  @Override
-  public void close() {
-    if (vector != null) {
-      vector.close();
-      vector = null;
-    }
-    if (fieldVector != null) {
-      fieldVector.close();
-      fieldVector = null;
-    }
-  }
-
-  private void initVector(Object value, int count) {
-    currentSize = count;
-    if (value == null) {
-      isNull = true;
-      fieldVector = createNullVector(count);
-    } else {
-      isNull = false;
-      fieldVector = createFilledVector(value, count);
-    }
-    vector = new CometPlainVector(fieldVector, useDecimal128, false, true);
-  }
-
-  /** Creates a vector of the correct type with {@code count} null values. */
-  private FieldVector createNullVector(int count) {
-    String name = "constant";
-    FieldVector v;
-    if (type == DataTypes.BooleanType) {
-      v = new BitVector(name, allocator);
-    } else if (type == DataTypes.ByteType) {
-      v = new TinyIntVector(name, allocator);
-    } else if (type == DataTypes.ShortType) {
-      v = new SmallIntVector(name, allocator);
-    } else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
-      v = new IntVector(name, allocator);
-    } else if (type == DataTypes.LongType
-        || type == DataTypes.TimestampType
-        || type == TimestampNTZType$.MODULE$) {
-      v = new BigIntVector(name, allocator);
-    } else if (type == DataTypes.FloatType) {
-      v = new Float4Vector(name, allocator);
-    } else if (type == DataTypes.DoubleType) {
-      v = new Float8Vector(name, allocator);
-    } else if (type == DataTypes.BinaryType) {
-      v = new VarBinaryVector(name, allocator);
-    } else if (type == DataTypes.StringType) {
-      v = new VarCharVector(name, allocator);
-    } else if (type instanceof DecimalType) {
-      DecimalType dt = (DecimalType) type;
-      if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-        v = new IntVector(name, allocator);
-      } else if (!useDecimal128 && dt.precision() <= 
Decimal.MAX_LONG_DIGITS()) {
-        v = new BigIntVector(name, allocator);
-      } else {
-        v = new DecimalVector(name, allocator, dt.precision(), dt.scale());
-      }
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark type: " + 
type);
-    }
-    v.setValueCount(count);
-    return v;
-  }
-
-  /** Creates a vector filled with {@code count} copies of the given value. */
-  private FieldVector createFilledVector(Object value, int count) {
-    String name = "constant";
-    if (type == DataTypes.BooleanType) {
-      BitVector v = new BitVector(name, allocator);
-      v.allocateNew(count);
-      int bit = (boolean) value ? 1 : 0;
-      for (int i = 0; i < count; i++) v.setSafe(i, bit);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.ByteType) {
-      TinyIntVector v = new TinyIntVector(name, allocator);
-      v.allocateNew(count);
-      byte val = (byte) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.ShortType) {
-      SmallIntVector v = new SmallIntVector(name, allocator);
-      v.allocateNew(count);
-      short val = (short) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.IntegerType || type == DataTypes.DateType) {
-      IntVector v = new IntVector(name, allocator);
-      v.allocateNew(count);
-      int val = (int) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.LongType
-        || type == DataTypes.TimestampType
-        || type == TimestampNTZType$.MODULE$) {
-      BigIntVector v = new BigIntVector(name, allocator);
-      v.allocateNew(count);
-      long val = (long) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.FloatType) {
-      Float4Vector v = new Float4Vector(name, allocator);
-      v.allocateNew(count);
-      float val = (float) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.DoubleType) {
-      Float8Vector v = new Float8Vector(name, allocator);
-      v.allocateNew(count);
-      double val = (double) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, val);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.BinaryType) {
-      VarBinaryVector v = new VarBinaryVector(name, allocator);
-      v.allocateNew(count);
-      byte[] bytes = (byte[]) value;
-      for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
-      v.setValueCount(count);
-      return v;
-    } else if (type == DataTypes.StringType) {
-      VarCharVector v = new VarCharVector(name, allocator);
-      v.allocateNew(count);
-      byte[] bytes = ((UTF8String) value).getBytes();
-      for (int i = 0; i < count; i++) v.setSafe(i, bytes, 0, bytes.length);
-      v.setValueCount(count);
-      return v;
-    } else if (type instanceof DecimalType) {
-      DecimalType dt = (DecimalType) type;
-      Decimal d = (Decimal) value;
-      if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
-        IntVector v = new IntVector(name, allocator);
-        v.allocateNew(count);
-        int val = (int) d.toUnscaledLong();
-        for (int i = 0; i < count; i++) v.setSafe(i, val);
-        v.setValueCount(count);
-        return v;
-      } else if (!useDecimal128 && dt.precision() <= 
Decimal.MAX_LONG_DIGITS()) {
-        BigIntVector v = new BigIntVector(name, allocator);
-        v.allocateNew(count);
-        long val = d.toUnscaledLong();
-        for (int i = 0; i < count; i++) v.setSafe(i, val);
-        v.setValueCount(count);
-        return v;
-      } else {
-        DecimalVector v = new DecimalVector(name, allocator, dt.precision(), 
dt.scale());
-        v.allocateNew(count);
-        BigDecimal bd = d.toJavaBigDecimal();
-        for (int i = 0; i < count; i++) v.setSafe(i, bd);
-        v.setValueCount(count);
-        return v;
-      }
-    } else {
-      throw new UnsupportedOperationException("Unsupported Spark type: " + 
type);
-    }
-  }
-}
diff --git 
a/common/src/main/java/org/apache/comet/parquet/ArrowRowIndexColumnReader.java 
b/common/src/main/java/org/apache/comet/parquet/ArrowRowIndexColumnReader.java
deleted file mode 100644
index 7d17e551d..000000000
--- 
a/common/src/main/java/org/apache/comet/parquet/ArrowRowIndexColumnReader.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.spark.sql.types.*;
-
-import org.apache.comet.vector.CometPlainVector;
-import org.apache.comet.vector.CometVector;
-
-/**
- * A column reader that computes row indices in Java and creates Arrow 
BigIntVectors directly (no
- * native mutable buffers). Used for the row index metadata column in the 
native_iceberg_compat scan
- * path.
- *
- * <p>The {@code indices} array contains alternating pairs of (start_index, 
count) representing
- * ranges of sequential row indices within each row group.
- */
-public class ArrowRowIndexColumnReader extends AbstractColumnReader {
-  private final BufferAllocator allocator = new RootAllocator();
-
-  /** Alternating (start_index, count) pairs from row groups. */
-  private final long[] indices;
-
-  /** Number of row indices consumed so far across batches. */
-  private long offset;
-
-  private BigIntVector fieldVector;
-  private CometPlainVector vector;
-
-  public ArrowRowIndexColumnReader(StructField field, int batchSize, long[] 
indices) {
-    super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
-    this.indices = indices;
-    this.batchSize = batchSize;
-  }
-
-  @Override
-  public void setBatchSize(int batchSize) {
-    close();
-    this.batchSize = batchSize;
-  }
-
-  @Override
-  public void readBatch(int total) {
-    close();
-
-    fieldVector = new BigIntVector("row_index", allocator);
-    fieldVector.allocateNew(total);
-
-    // Port of Rust set_indices: iterate (start, count) pairs, skip offset 
rows, fill up to total.
-    long skipped = 0;
-    int filled = 0;
-    for (int i = 0; i < indices.length && filled < total; i += 2) {
-      long index = indices[i];
-      long count = indices[i + 1];
-      long skip = Math.min(count, offset - skipped);
-      skipped += skip;
-      if (count == skip) {
-        continue;
-      }
-      long remaining = Math.min(count - skip, total - filled);
-      for (long j = 0; j < remaining; j++) {
-        fieldVector.setSafe(filled, index + skip + j);
-        filled++;
-      }
-    }
-    offset += filled;
-
-    fieldVector.setValueCount(filled);
-    vector = new CometPlainVector(fieldVector, false, false, false);
-    vector.setNumValues(filled);
-  }
-
-  @Override
-  public CometVector currentBatch() {
-    return vector;
-  }
-
-  @Override
-  public void close() {
-    if (vector != null) {
-      vector.close();
-      vector = null;
-    }
-    if (fieldVector != null) {
-      fieldVector.close();
-      fieldVector = null;
-    }
-  }
-}
diff --git 
a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java 
b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 32edcb264..d10a8932b 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -448,8 +448,7 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
           // TODO(SPARK-40059): Allow users to include columns named
           //                    FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in 
their schemas.
           long[] rowIndices = FileReader.getRowIndices(blocks);
-          columnReaders[i] =
-              new ArrowRowIndexColumnReader(nonPartitionFields[i], capacity, 
rowIndices);
+          columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], 
capacity, rowIndices);
           hasRowIndexColumn = true;
           missingColumns[i] = true;
         } else if (optFileField.isPresent()) {
@@ -474,8 +473,8 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
                       + filePath);
             }
             if (field.isPrimitive()) {
-              ArrowConstantColumnReader reader =
-                  new ArrowConstantColumnReader(nonPartitionFields[i], 
capacity, useDecimal128);
+              ConstantColumnReader reader =
+                  new ConstantColumnReader(nonPartitionFields[i], capacity, 
useDecimal128);
               columnReaders[i] = reader;
               missingColumns[i] = true;
             } else {
@@ -493,9 +492,8 @@ public class NativeBatchReader extends RecordReader<Void, 
ColumnarBatch> impleme
         for (int i = fields.size(); i < columnReaders.length; i++) {
           int fieldIndex = i - fields.size();
           StructField field = partitionFields[fieldIndex];
-          ArrowConstantColumnReader reader =
-              new ArrowConstantColumnReader(
-                  field, capacity, partitionValues, fieldIndex, useDecimal128);
+          ConstantColumnReader reader =
+              new ConstantColumnReader(field, capacity, partitionValues, 
fieldIndex, useDecimal128);
           columnReaders[i] = reader;
         }
       }
diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
index 9b6944ac4..eba74c9e2 100644
--- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
+++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
@@ -2072,8 +2072,6 @@ case class CometSortMergeJoinExec(
 }
 
 object CometScanWrapper extends CometSink[SparkPlan] {
-  override def isFfiSafe: Boolean = true
-
   override def createExec(nativeOp: Operator, op: SparkPlan): CometNativeExec 
= {
     CometScanWrapper(nativeOp, op)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to