This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 08a4a0cde2 Spark, Arrow, Parquet: Add vectorized read support for
parquet BYTE_STREAM_SPLIT encoding (#15373)
08a4a0cde2 is described below
commit 08a4a0cde2f4ae4f67fa776b7d46c31be287c54f
Author: jbewing <[email protected]>
AuthorDate: Wed Feb 25 11:19:57 2026 -0500
Spark, Arrow, Parquet: Add vectorized read support for parquet
BYTE_STREAM_SPLIT encoding (#15373)
---
LICENSE | 1 +
.../VectorizedByteStreamSplitValuesReader.java | 151 +++++++++++++++++++++
.../vectorized/parquet/VectorizedPageIterator.java | 22 +++
.../encodings/BYTE_STREAM_SPLIT/double.parquet | Bin 0 -> 8459 bytes
.../BYTE_STREAM_SPLIT/double_with_nulls.parquet | Bin 0 -> 6986 bytes
.../encodings/BYTE_STREAM_SPLIT/float.parquet | Bin 0 -> 4427 bytes
.../BYTE_STREAM_SPLIT/float_with_nulls.parquet | Bin 0 -> 3754 bytes
.../encodings/BYTE_STREAM_SPLIT/int32.parquet | Bin 0 -> 4439 bytes
.../BYTE_STREAM_SPLIT/int32_with_nulls.parquet | Bin 0 -> 3766 bytes
.../encodings/BYTE_STREAM_SPLIT/int64.parquet | Bin 0 -> 8471 bytes
.../BYTE_STREAM_SPLIT/int64_with_nulls.parquet | Bin 0 -> 6998 bytes
.../encodings/BYTE_STREAM_SPLIT/uuid.parquet | Bin 0 -> 16643 bytes
.../BYTE_STREAM_SPLIT/uuid_with_nulls.parquet | Bin 0 -> 13570 bytes
.../resources/encodings/PLAIN/double.parquet | Bin 0 -> 8459 bytes
.../encodings/PLAIN/double_with_nulls.parquet | Bin 0 -> 6986 bytes
.../resources/encodings/PLAIN/uuid.parquet | Bin 0 -> 16643 bytes
.../encodings/PLAIN/uuid_with_nulls.parquet | Bin 0 -> 13570 bytes
.../parquet/TestParquetVectorizedReads.java | 20 +--
.../parquet/TestParquetVectorizedReads.java | 20 +--
.../parquet/TestParquetVectorizedReads.java | 20 +--
20 files changed, 210 insertions(+), 24 deletions(-)
diff --git a/LICENSE b/LICENSE
index f866893ac5..61499762d6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -229,6 +229,7 @@ This product includes code from Apache Parquet.
* DynConstructors.java
* IOUtil.java readFully and tests
* ByteBufferInputStream implementations and tests
+* implementation of VectorizedByteStreamSplitValuesReader
Copyright: 2014-2017 The Apache Software Foundation.
Home page: https://parquet.apache.org/
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
new file mode 100644
index 0000000000..58ea38231c
--- /dev/null
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.io.EOFException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * A {@link VectorizedValuesReader} implementation for the encoding type
BYTE_STREAM_SPLIT. This is
+ * adapted from Parquet's ByteStreamSplitValuesReader.
+ *
+ * @see <a
+ *
href="https://parquet.apache.org/docs/file-format/data-pages/encodings/#byte-stream-split-byte_stream_split--9">
+ * Parquet format encodings: BYTE_STREAM_SPLIT</a>
+ */
+public class VectorizedByteStreamSplitValuesReader extends ValuesReader
+ implements VectorizedValuesReader {
+
+ private final int elementSizeInBytes;
+ private int totalBytesInStream;
+ private ByteBufferInputStream dataStream;
+ private ByteBuffer decodedDataStream;
+
+ public VectorizedByteStreamSplitValuesReader(int elementSizeInBytes) {
+ this.elementSizeInBytes = elementSizeInBytes;
+ }
+
+ @Override
+ public void initFromPage(int ignoredValueCount, ByteBufferInputStream in) {
+ this.totalBytesInStream = in.available();
+ this.dataStream = in;
+ }
+
+ @Override
+ public int readInteger() {
+ ensureDecoded();
+ return decodedDataStream.getInt();
+ }
+
+ @Override
+ public long readLong() {
+ ensureDecoded();
+ return decodedDataStream.getLong();
+ }
+
+ @Override
+ public float readFloat() {
+ ensureDecoded();
+ return decodedDataStream.getFloat();
+ }
+
+ @Override
+ public double readDouble() {
+ ensureDecoded();
+ return decodedDataStream.getDouble();
+ }
+
+ @Override
+ public Binary readBinary(int len) {
+ ensureDecoded();
+ byte[] bytes = new byte[len];
+ decodedDataStream.get(bytes);
+ return Binary.fromConstantByteArray(bytes);
+ }
+
+ @Override
+ public void readIntegers(int total, FieldVector vec, int rowId) {
+ readBatch(total, vec, rowId);
+ }
+
+ @Override
+ public void readLongs(int total, FieldVector vec, int rowId) {
+ readBatch(total, vec, rowId);
+ }
+
+ @Override
+ public void readFloats(int total, FieldVector vec, int rowId) {
+ readBatch(total, vec, rowId);
+ }
+
+ @Override
+ public void readDoubles(int total, FieldVector vec, int rowId) {
+ readBatch(total, vec, rowId);
+ }
+
+ @Override
+ public void skip() {
+ throw new UnsupportedOperationException("skip is not supported");
+ }
+
+ private void readBatch(int total, FieldVector vec, int rowId) {
+ ensureDecoded();
+ int bytesToRead = total * elementSizeInBytes;
+ long destOffset = (long) rowId * elementSizeInBytes;
+ ByteBuffer slice = decodedDataStream.slice();
+ slice.limit(bytesToRead);
+ vec.getDataBuffer().setBytes(destOffset, slice);
+ decodedDataStream.position(decodedDataStream.position() + bytesToRead);
+ }
+
+ private void ensureDecoded() {
+ if (decodedDataStream == null) {
+ Preconditions.checkState(
+ totalBytesInStream % elementSizeInBytes == 0,
+ "Stream size %s is not a multiple of element size %s",
+ totalBytesInStream,
+ elementSizeInBytes);
+ this.decodedDataStream = decode(totalBytesInStream / elementSizeInBytes);
+ }
+ }
+
+ private ByteBuffer decode(int valuesCount) {
+ ByteBuffer encoded;
+ try {
+ encoded = dataStream.slice(totalBytesInStream).slice();
+ } catch (EOFException e) {
+ throw new UncheckedIOException("Failed to read bytes from stream", e);
+ }
+ byte[] decoded = new byte[encoded.limit()];
+ int destByteIndex = 0;
+ for (int srcValueIndex = 0; srcValueIndex < valuesCount; srcValueIndex++) {
+ for (int stream = 0; stream < elementSizeInBytes; stream++,
destByteIndex++) {
+ decoded[destByteIndex] = encoded.get(srcValueIndex + stream *
valuesCount);
+ }
+ }
+ return ByteBuffer.wrap(decoded).order(ByteOrder.LITTLE_ENDIAN);
+ }
+}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 3c743b9ad0..4a06f64b5a 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -34,6 +34,7 @@ import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
public class VectorizedPageIterator extends BasePageIterator {
private final boolean setArrowValidityVector;
@@ -106,6 +107,11 @@ public class VectorizedPageIterator extends
BasePageIterator {
case DELTA_BYTE_ARRAY:
valuesReader = new VectorizedDeltaByteArrayValuesReader();
break;
+ case BYTE_STREAM_SPLIT:
+ valuesReader =
+ new VectorizedByteStreamSplitValuesReader(
+ byteStreamSplitElementSize(desc.getPrimitiveType()));
+ break;
default:
throw new UnsupportedOperationException(
"Cannot support vectorized reads for column "
@@ -377,6 +383,22 @@ public class VectorizedPageIterator extends
BasePageIterator {
}
}
+ private static int byteStreamSplitElementSize(PrimitiveType type) {
+ switch (type.getPrimitiveTypeName()) {
+ case INT32:
+ case FLOAT:
+ return VectorizedValuesReader.INT_SIZE;
+ case INT64:
+ case DOUBLE:
+ return VectorizedValuesReader.LONG_SIZE;
+ case FIXED_LEN_BYTE_ARRAY:
+ return type.getTypeLength();
+ default:
+ throw new UnsupportedOperationException(
+ "Byte stream split encoding is not supported for type " +
type.getPrimitiveTypeName());
+ }
+ }
+
private int getActualBatchSize(int expectedBatchSize) {
return Math.min(expectedBatchSize, triplesCount - triplesRead);
}
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet
new file mode 100644
index 0000000000..3e0edd2627
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
new file mode 100644
index 0000000000..d23c1e6b73
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet
new file mode 100644
index 0000000000..8ba32a302f
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
new file mode 100644
index 0000000000..c16e10e680
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet
new file mode 100644
index 0000000000..46132a924d
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
new file mode 100644
index 0000000000..174f7e5233
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int32_with_nulls.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet
new file mode 100644
index 0000000000..83576717e8
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
new file mode 100644
index 0000000000..e5e91eb0ab
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/int64_with_nulls.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet
new file mode 100644
index 0000000000..df52946dc2
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid.parquet
differ
diff --git
a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
new file mode 100644
index 0000000000..577bb592bc
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/uuid_with_nulls.parquet
differ
diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet
b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet
new file mode 100644
index 0000000000..edd614c66a
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet differ
diff --git
a/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet
new file mode 100644
index 0000000000..3d4f64baa7
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet
differ
diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet
new file mode 100644
index 0000000000..3182d22757
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid.parquet differ
diff --git
a/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet
new file mode 100644
index 0000000000..79803f74a4
Binary files /dev/null and
b/parquet/src/testFixtures/resources/encodings/PLAIN/uuid_with_nulls.parquet
differ
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index c9d84b0cfe..2885563863 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends
AvroDataTestBase {
"RLE_DICTIONARY",
"DELTA_BINARY_PACKED",
"DELTA_LENGTH_BYTE_ARRAY",
- "DELTA_BYTE_ARRAY");
+ "DELTA_BYTE_ARRAY",
+ "BYTE_STREAM_SPLIT");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
- ImmutableMap.of(
- "string", Types.StringType.get(),
- "float", Types.FloatType.get(),
- "int32", Types.IntegerType.get(),
- "int64", Types.LongType.get(),
- "binary", Types.BinaryType.get(),
- "boolean", Types.BooleanType.get());
+ ImmutableMap.<String, PrimitiveType>builder()
+ .put("string", Types.StringType.get())
+ .put("float", Types.FloatType.get())
+ .put("double", Types.DoubleType.get())
+ .put("int32", Types.IntegerType.get())
+ .put("int64", Types.LongType.get())
+ .put("binary", Types.BinaryType.get())
+ .put("boolean", Types.BooleanType.get())
+ .put("uuid", Types.UUIDType.get())
+ .buildOrThrow();
static final Function<Record, Record> IDENTITY = record -> record;
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 76d6e7adc1..4eabac7ab8 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends
AvroDataTestBase {
"RLE_DICTIONARY",
"DELTA_BINARY_PACKED",
"DELTA_LENGTH_BYTE_ARRAY",
- "DELTA_BYTE_ARRAY");
+ "DELTA_BYTE_ARRAY",
+ "BYTE_STREAM_SPLIT");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
- ImmutableMap.of(
- "string", Types.StringType.get(),
- "float", Types.FloatType.get(),
- "int32", Types.IntegerType.get(),
- "int64", Types.LongType.get(),
- "binary", Types.BinaryType.get(),
- "boolean", Types.BooleanType.get());
+ ImmutableMap.<String, PrimitiveType>builder()
+ .put("string", Types.StringType.get())
+ .put("float", Types.FloatType.get())
+ .put("double", Types.DoubleType.get())
+ .put("int32", Types.IntegerType.get())
+ .put("int64", Types.LongType.get())
+ .put("binary", Types.BinaryType.get())
+ .put("boolean", Types.BooleanType.get())
+ .put("uuid", Types.UUIDType.get())
+ .buildOrThrow();
static final Function<Record, Record> IDENTITY = record -> record;
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 76d6e7adc1..4eabac7ab8 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -86,15 +86,19 @@ public class TestParquetVectorizedReads extends
AvroDataTestBase {
"RLE_DICTIONARY",
"DELTA_BINARY_PACKED",
"DELTA_LENGTH_BYTE_ARRAY",
- "DELTA_BYTE_ARRAY");
+ "DELTA_BYTE_ARRAY",
+ "BYTE_STREAM_SPLIT");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
- ImmutableMap.of(
- "string", Types.StringType.get(),
- "float", Types.FloatType.get(),
- "int32", Types.IntegerType.get(),
- "int64", Types.LongType.get(),
- "binary", Types.BinaryType.get(),
- "boolean", Types.BooleanType.get());
+ ImmutableMap.<String, PrimitiveType>builder()
+ .put("string", Types.StringType.get())
+ .put("float", Types.FloatType.get())
+ .put("double", Types.DoubleType.get())
+ .put("int32", Types.IntegerType.get())
+ .put("int64", Types.LongType.get())
+ .put("binary", Types.BinaryType.get())
+ .put("boolean", Types.BooleanType.get())
+ .put("uuid", Types.UUIDType.get())
+ .buildOrThrow();
static final Function<Record, Record> IDENTITY = record -> record;