This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 08a4a0cde2 Spark, Arrow, Parquet: Add vectorized read support for 
parquet BYTE_STREAM_SPLIT encoding (#15373)
08a4a0cde2 is described below

commit 08a4a0cde2f4ae4f67fa776b7d46c31be287c54f
Author: jbewing <[email protected]>
AuthorDate: Wed Feb 25 11:19:57 2026 -0500

    Spark, Arrow, Parquet: Add vectorized read support for parquet 
BYTE_STREAM_SPLIT encoding (#15373)
---
 LICENSE                                            |   1 +
 .../VectorizedByteStreamSplitValuesReader.java     | 151 +++++++++++++++++++++
 .../vectorized/parquet/VectorizedPageIterator.java |  22 +++
 .../encodings/BYTE_STREAM_SPLIT/double.parquet     | Bin 0 -> 8459 bytes
 .../BYTE_STREAM_SPLIT/double_with_nulls.parquet    | Bin 0 -> 6986 bytes
 .../encodings/BYTE_STREAM_SPLIT/float.parquet      | Bin 0 -> 4427 bytes
 .../BYTE_STREAM_SPLIT/float_with_nulls.parquet     | Bin 0 -> 3754 bytes
 .../encodings/BYTE_STREAM_SPLIT/int32.parquet      | Bin 0 -> 4439 bytes
 .../BYTE_STREAM_SPLIT/int32_with_nulls.parquet     | Bin 0 -> 3766 bytes
 .../encodings/BYTE_STREAM_SPLIT/int64.parquet      | Bin 0 -> 8471 bytes
 .../BYTE_STREAM_SPLIT/int64_with_nulls.parquet     | Bin 0 -> 6998 bytes
 .../encodings/BYTE_STREAM_SPLIT/uuid.parquet       | Bin 0 -> 16643 bytes
 .../BYTE_STREAM_SPLIT/uuid_with_nulls.parquet      | Bin 0 -> 13570 bytes
 .../resources/encodings/PLAIN/double.parquet       | Bin 0 -> 8459 bytes
 .../encodings/PLAIN/double_with_nulls.parquet      | Bin 0 -> 6986 bytes
 .../resources/encodings/PLAIN/uuid.parquet         | Bin 0 -> 16643 bytes
 .../encodings/PLAIN/uuid_with_nulls.parquet        | Bin 0 -> 13570 bytes
 .../parquet/TestParquetVectorizedReads.java        |  20 +--
 .../parquet/TestParquetVectorizedReads.java        |  20 +--
 .../parquet/TestParquetVectorizedReads.java        |  20 +--
 20 files changed, 210 insertions(+), 24 deletions(-)

diff --git a/LICENSE b/LICENSE
index f866893ac5..61499762d6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -229,6 +229,7 @@ This product includes code from Apache Parquet.
 * DynConstructors.java
 * IOUtil.java readFully and tests
 * ByteBufferInputStream implementations and tests
+* implementation of VectorizedByteStreamSplitValuesReader
 
 Copyright: 2014-2017 The Apache Software Foundation.
 Home page: https://parquet.apache.org/
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
new file mode 100644
index 0000000000..58ea38231c
--- /dev/null
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.io.EOFException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * A {@link VectorizedValuesReader} implementation for the encoding type 
BYTE_STREAM_SPLIT. This is
+ * adapted from Parquet's ByteStreamSplitValuesReader.
+ *
+ * @see <a
+ *     
href="https://parquet.apache.org/docs/file-format/data-pages/encodings/#byte-stream-split-byte_stream_split--9";>
+ *     Parquet format encodings: BYTE_STREAM_SPLIT</a>
+ */
+public class VectorizedByteStreamSplitValuesReader extends ValuesReader
+    implements VectorizedValuesReader {
+
+  private final int elementSizeInBytes;
+  private int totalBytesInStream;
+  private ByteBufferInputStream dataStream;
+  private ByteBuffer decodedDataStream;
+
+  public VectorizedByteStreamSplitValuesReader(int elementSizeInBytes) {
+    this.elementSizeInBytes = elementSizeInBytes;
+  }
+
+  @Override
+  public void initFromPage(int ignoredValueCount, ByteBufferInputStream in) {
+    this.totalBytesInStream = in.available();
+    this.dataStream = in;
+  }
+
+  @Override
+  public int readInteger() {
+    ensureDecoded();
+    return decodedDataStream.getInt();
+  }
+
+  @Override
+  public long readLong() {
+    ensureDecoded();
+    return decodedDataStream.getLong();
+  }
+
+  @Override
+  public float readFloat() {
+    ensureDecoded();
+    return decodedDataStream.getFloat();
+  }
+
+  @Override
+  public double readDouble() {
+    ensureDecoded();
+    return decodedDataStream.getDouble();
+  }
+
+  @Override
+  public Binary readBinary(int len) {
+    ensureDecoded();
+    byte[] bytes = new byte[len];
+    decodedDataStream.get(bytes);
+    return Binary.fromConstantByteArray(bytes);
+  }
+
+  @Override
+  public void readIntegers(int total, FieldVector vec, int rowId) {
+    readBatch(total, vec, rowId);
+  }
+
+  @Override
+  public void readLongs(int total, FieldVector vec, int rowId) {
+    readBatch(total, vec, rowId);
+  }
+
+  @Override
+  public void readFloats(int total, FieldVector vec, int rowId) {
+    readBatch(total, vec, rowId);
+  }
+
+  @Override
+  public void readDoubles(int total, FieldVector vec, int rowId) {
+    readBatch(total, vec, rowId);
+  }
+
+  @Override
+  public void skip() {
+    throw new UnsupportedOperationException("skip is not supported");
+  }
+
+  private void readBatch(int total, FieldVector vec, int rowId) {
+    ensureDecoded();
+    int bytesToRead = total * elementSizeInBytes;
+    long destOffset = (long) rowId * elementSizeInBytes;
+    ByteBuffer slice = decodedDataStream.slice();
+    slice.limit(bytesToRead);
+    vec.getDataBuffer().setBytes(destOffset, slice);
+    decodedDataStream.position(decodedDataStream.position() + bytesToRead);
+  }
+
+  private void ensureDecoded() {
+    if (decodedDataStream == null) {
+      Preconditions.checkState(
+          totalBytesInStream % elementSizeInBytes == 0,
+          "Stream size %s is not a multiple of element size %s",
+          totalBytesInStream,
+          elementSizeInBytes);
+      this.decodedDataStream = decode(totalBytesInStream / elementSizeInBytes);
+    }
+  }
+
+  private ByteBuffer decode(int valuesCount) {
+    ByteBuffer encoded;
+    try {
+      encoded = dataStream.slice(totalBytesInStream).slice();
+    } catch (EOFException e) {
+      throw new UncheckedIOException("Failed to read bytes from stream", e);
+    }
+    byte[] decoded = new byte[encoded.limit()];
+    int destByteIndex = 0;
+    for (int srcValueIndex = 0; srcValueIndex < valuesCount; srcValueIndex++) {
+      for (int stream = 0; stream < elementSizeInBytes; stream++, 
destByteIndex++) {
+        decoded[destByteIndex] = encoded.get(srcValueIndex + stream * 
valuesCount);
+      }
+    }
+    return ByteBuffer.wrap(decoded).order(ByteOrder.LITTLE_ENDIAN);
+  }
+}
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 3c743b9ad0..4a06f64b5a 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -34,6 +34,7 @@ import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
 
 public class VectorizedPageIterator extends BasePageIterator {
   private final boolean setArrowValidityVector;
@@ -106,6 +107,11 @@ public class VectorizedPageIterator extends 
BasePageIterator {
         case DELTA_BYTE_ARRAY:
           valuesReader = new VectorizedDeltaByteArrayValuesReader();
           break;
+        case BYTE_STREAM_SPLIT:
+          valuesReader =
+              new VectorizedByteStreamSplitValuesReader(
+                  byteStreamSplitElementSize(desc.getPrimitiveType()));
+          break;
         default:
           throw new UnsupportedOperationException(
               "Cannot support vectorized reads for column "
@@ -377,6 +383,22 @@ public class VectorizedPageIterator extends 
BasePageIterator {
     }
   }
 
+  private static int byteStreamSplitElementSize(PrimitiveType type) {
+    switch (type.getPrimitiveTypeName()) {
+      case INT32:
+      case FLOAT:
+        return VectorizedValuesReader.INT_SIZE;
+      case INT64:
+      case DOUBLE:
+        return VectorizedValuesReader.LONG_SIZE;
+      case FIXED_LEN_BYTE_ARRAY:
+        return type.getTypeLength();
+      default:
+        throw new UnsupportedOperationException(
+            "Byte stream split encoding is not supported for type " + 
type.getPrimitiveTypeName());
+    }
+  }
+
   private int getActualBatchSize(int expectedBatchSize) {
     return Math.min(expectedBatchSize, triplesCount - triplesRead);
   }
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet
new file mode 100644
index 0000000000..3e0edd2627
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet 
differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
new file mode 100644
index 0000000000..d23c1e6b73
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
 differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet
new file mode 100644
index 0000000000..8ba32a302f
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet 
differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
new file mode 100644
index 0000000000..c16e10e680
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
 differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet
new file mode 100644
index 0000000000..46132a924d
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet 
differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
new file mode 100644
index 0000000000..174f7e5233
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
 differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet
new file mode 100644
index 0000000000..83576717e8
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet 
differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
new file mode 100644
index 0000000000..e5e91eb0ab
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
 differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet
new file mode 100644
index 0000000000..df52946dc2
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet 
differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
new file mode 100644
index 0000000000..577bb592bc
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
 differ
diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet 
b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet
new file mode 100644
index 0000000000..edd614c66a
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet 
b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet
new file mode 100644
index 0000000000..3d4f64baa7
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet 
differ
diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet 
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet
new file mode 100644
index 0000000000..3182d22757
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet differ
diff --git 
a/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet 
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet
new file mode 100644
index 0000000000..79803f74a4
Binary files /dev/null and 
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet 
differ
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index c9d84b0cfe..2885563863 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends 
AvroDataTestBase {
           "RLE_DICTIONARY",
           "DELTA_BINARY_PACKED",
           "DELTA_LENGTH_BYTE_ARRAY",
-          "DELTA_BYTE_ARRAY");
+          "DELTA_BYTE_ARRAY",
+          "BYTE_STREAM_SPLIT");
   private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
-      ImmutableMap.of(
-          "string", Types.StringType.get(),
-          "float", Types.FloatType.get(),
-          "int32", Types.IntegerType.get(),
-          "int64", Types.LongType.get(),
-          "binary", Types.BinaryType.get(),
-          "boolean", Types.BooleanType.get());
+      ImmutableMap.<String, PrimitiveType>builder()
+          .put("string", Types.StringType.get())
+          .put("float", Types.FloatType.get())
+          .put("double", Types.DoubleType.get())
+          .put("int32", Types.IntegerType.get())
+          .put("int64", Types.LongType.get())
+          .put("binary", Types.BinaryType.get())
+          .put("boolean", Types.BooleanType.get())
+          .put("uuid", Types.UUIDType.get())
+          .buildOrThrow();
 
   static final Function<Record, Record> IDENTITY = record -> record;
 
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 76d6e7adc1..4eabac7ab8 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends 
AvroDataTestBase {
           "RLE_DICTIONARY",
           "DELTA_BINARY_PACKED",
           "DELTA_LENGTH_BYTE_ARRAY",
-          "DELTA_BYTE_ARRAY");
+          "DELTA_BYTE_ARRAY",
+          "BYTE_STREAM_SPLIT");
   private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
-      ImmutableMap.of(
-          "string", Types.StringType.get(),
-          "float", Types.FloatType.get(),
-          "int32", Types.IntegerType.get(),
-          "int64", Types.LongType.get(),
-          "binary", Types.BinaryType.get(),
-          "boolean", Types.BooleanType.get());
+      ImmutableMap.<String, PrimitiveType>builder()
+          .put("string", Types.StringType.get())
+          .put("float", Types.FloatType.get())
+          .put("double", Types.DoubleType.get())
+          .put("int32", Types.IntegerType.get())
+          .put("int64", Types.LongType.get())
+          .put("binary", Types.BinaryType.get())
+          .put("boolean", Types.BooleanType.get())
+          .put("uuid", Types.UUIDType.get())
+          .buildOrThrow();
 
   static final Function<Record, Record> IDENTITY = record -> record;
 
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 76d6e7adc1..4eabac7ab8 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends 
AvroDataTestBase {
           "RLE_DICTIONARY",
           "DELTA_BINARY_PACKED",
           "DELTA_LENGTH_BYTE_ARRAY",
-          "DELTA_BYTE_ARRAY");
+          "DELTA_BYTE_ARRAY",
+          "BYTE_STREAM_SPLIT");
   private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
-      ImmutableMap.of(
-          "string", Types.StringType.get(),
-          "float", Types.FloatType.get(),
-          "int32", Types.IntegerType.get(),
-          "int64", Types.LongType.get(),
-          "binary", Types.BinaryType.get(),
-          "boolean", Types.BooleanType.get());
+      ImmutableMap.<String, PrimitiveType>builder()
+          .put("string", Types.StringType.get())
+          .put("float", Types.FloatType.get())
+          .put("double", Types.DoubleType.get())
+          .put("int32", Types.IntegerType.get())
+          .put("int64", Types.LongType.get())
+          .put("binary", Types.BinaryType.get())
+          .put("boolean", Types.BooleanType.get())
+          .put("uuid", Types.UUIDType.get())
+          .buildOrThrow();
 
   static final Function<Record, Record> IDENTITY = record -> record;
 

Reply via email to