This is an automated email from the ASF dual-hosted git repository.
Fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 7be05b470 Optimizing Variant read path with lazy caching (#3481)
7be05b470 is described below
commit 7be05b4702df78ae0c0c6b44adc6b7b7af2d931f
Author: Neelesh Salian <[email protected]>
AuthorDate: Tue May 19 03:15:05 2026 -0700
Optimizing Variant read path with lazy caching (#3481)
* Fix thread-safety in Variant lazy caches and add comments
Co-authored-by: Steve Loughran <[email protected]>
* Remove unnecessary volatile fields and fix PR comments
* Add readUnsignedLittleEndian for bulk ByteBuffer reads and concurrency
javadoc
* PR comments
---------
Co-authored-by: Steve Loughran <[email protected]>
---
.../java/org/apache/parquet/variant/Variant.java | 211 ++++++++++++++-------
.../org/apache/parquet/variant/VariantBuilder.java | 20 +-
.../apache/parquet/variant/VariantConverters.java | 4 +-
.../org/apache/parquet/variant/VariantUtil.java | 23 +++
.../apache/parquet/variant/TestVariantObject.java | 30 +++
5 files changed, 212 insertions(+), 76 deletions(-)
diff --git
a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java
b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java
index 0ccb8359b..3fdfc0060 100644
--- a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java
+++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java
@@ -20,10 +20,16 @@ package org.apache.parquet.variant;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.UUID;
+import org.apache.parquet.Preconditions;
/**
* This Variant class holds the Variant-encoded value and metadata binary
values.
+ *
+ * <p>Concurrency: the byte buffers are read-only and all lazy caches are
idempotent,
+ * so concurrent reads are safe - the worst outcome is a redundant decode. The
metadata
+ * dictionary cache is {@code volatile} for safe publication to child Variants.
*/
public final class Variant {
/**
@@ -36,6 +42,26 @@ public final class Variant {
*/
final ByteBuffer metadata;
+ /**
+ * Pre-computed metadata dictionary size.
+ */
+ private final int dictSize;
+
+ /**
+ * Lazy cache for metadata dictionary strings, shared with child Variants.
+ */
+ private volatile String[] metadataCache;
+
+ /**
+ * Lazy cache for the parsed object header.
+ */
+ private VariantUtil.ObjectInfo cachedObjectInfo;
+
+ /**
+ * Lazy cache for the parsed array header.
+ */
+ private VariantUtil.ArrayInfo cachedArrayInfo;
+
/**
* The threshold to switch from linear search to binary search when looking
up a field by key in
* an object. This is a performance optimization to avoid the overhead of
binary search for a
@@ -56,10 +82,8 @@ public final class Variant {
}
public Variant(ByteBuffer value, ByteBuffer metadata) {
- // The buffers are read a single-byte at a time, so the endianness of the
input buffers
- // is not important.
- this.value = value.asReadOnlyBuffer();
- this.metadata = metadata.asReadOnlyBuffer();
+ this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
// There is currently only one allowed version.
if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) !=
VariantUtil.VERSION) {
@@ -67,6 +91,33 @@ public final class Variant {
"Unsupported variant metadata version: %d",
metadata.get(metadata.position()) & VariantUtil.VERSION_MASK));
}
+
+ // Pre-compute dictionary size for lazy metadata cache allocation.
+ int pos = this.metadata.position();
+ int metaOffsetSize = ((this.metadata.get(pos) >> 6) & 0x3) + 1;
+ if (this.metadata.remaining() > 1) {
+ Preconditions.checkArgument(
+ this.metadata.remaining() >= 1 + metaOffsetSize,
+ "variant metadata truncated: offsetSize=" + metaOffsetSize);
+ this.dictSize = VariantUtil.readUnsignedLittleEndian(this.metadata, pos
+ 1, metaOffsetSize);
+ long dictTableEnd = 1L + metaOffsetSize + ((long) this.dictSize + 1) *
metaOffsetSize;
+ Preconditions.checkArgument(
+ dictTableEnd <= this.metadata.remaining(),
+ "variant metadata dictionary extends past buffer: dictSize=" +
this.dictSize);
+ } else {
+ this.dictSize = 0;
+ }
+ this.metadataCache = null;
+ }
+
+ /**
+ * Package-private constructor that shares pre-parsed metadata state from a
parent Variant.
+ */
+ Variant(ByteBuffer value, ByteBuffer metadata, String[] metadataCache, int
dictSize) {
+ this.value = value.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadata = metadata.asReadOnlyBuffer().order(ByteOrder.LITTLE_ENDIAN);
+ this.metadataCache = metadataCache;
+ this.dictSize = dictSize;
}
public ByteBuffer getValueBuffer() {
@@ -194,7 +245,7 @@ public final class Variant {
* @throws IllegalArgumentException if `getType()` does not return
`Type.OBJECT`
*/
public int numObjectElements() {
- return VariantUtil.getObjectInfo(value).numElements;
+ return objectInfo().numElements;
}
/**
@@ -206,22 +257,19 @@ public final class Variant {
* @throws IllegalArgumentException if `getType()` does not return
`Type.OBJECT`
*/
public Variant getFieldByKey(String key) {
- VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
- // Use linear search for a short list. Switch to binary search when the
length reaches
- // `BINARY_SEARCH_THRESHOLD`.
+ VariantUtil.ObjectInfo info = objectInfo();
+ int idStart = value.position() + info.idStartOffset;
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+
if (info.numElements < BINARY_SEARCH_THRESHOLD) {
for (int i = 0; i < info.numElements; ++i) {
- ObjectField field = getFieldAtIndex(
- i,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- if (field.key.equals(key)) {
- return field.value;
+ int id = VariantUtil.readUnsignedLittleEndian(value, idStart +
info.idSize * i, info.idSize);
+ String fieldKey = getMetadataKeyCached(id);
+ if (fieldKey.equals(key)) {
+ int offset = VariantUtil.readUnsignedLittleEndian(
+ value, offsetStart + info.offsetSize * i, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
} else {
@@ -232,22 +280,17 @@ public final class Variant {
// performance optimization, because it can properly handle the case
where `low + high`
// overflows int.
int mid = (low + high) >>> 1;
- ObjectField field = getFieldAtIndex(
- mid,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- int cmp = field.key.compareTo(key);
+ int midId = VariantUtil.readUnsignedLittleEndian(value, idStart +
info.idSize * mid, info.idSize);
+ String midKey = getMetadataKeyCached(midId);
+ int cmp = midKey.compareTo(key);
if (cmp < 0) {
low = mid + 1;
} else if (cmp > 0) {
high = mid - 1;
} else {
- return field.value;
+ int offset = VariantUtil.readUnsignedLittleEndian(
+ value, offsetStart + info.offsetSize * mid, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
}
}
}
@@ -275,35 +318,14 @@ public final class Variant {
* @throws IllegalArgumentException if `getType()` does not return
`Type.OBJECT`
*/
public ObjectField getFieldAtIndex(int idx) {
- VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value);
- // Use linear search for a short list. Switch to binary search when the
length reaches
- // `BINARY_SEARCH_THRESHOLD`.
- ObjectField field = getFieldAtIndex(
- idx,
- value,
- metadata,
- info.idSize,
- info.offsetSize,
- value.position() + info.idStartOffset,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
- return field;
- }
-
- static ObjectField getFieldAtIndex(
- int index,
- ByteBuffer value,
- ByteBuffer metadata,
- int idSize,
- int offsetSize,
- int idStart,
- int offsetStart,
- int dataStart) {
- // idStart, offsetStart, and dataStart are absolute positions in the
`value` buffer.
- int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize);
- int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize *
index, offsetSize);
- String key = VariantUtil.getMetadataKey(metadata, id);
- Variant v = new Variant(VariantUtil.slice(value, dataStart + offset),
metadata);
+ VariantUtil.ObjectInfo info = objectInfo();
+ int idStart = value.position() + info.idStartOffset;
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+ int id = VariantUtil.readUnsignedLittleEndian(value, idStart + info.idSize
* idx, info.idSize);
+ int offset = VariantUtil.readUnsignedLittleEndian(value, offsetStart +
info.offsetSize * idx, info.offsetSize);
+ String key = getMetadataKeyCached(id);
+ Variant v = childVariant(VariantUtil.slice(value, dataStart + offset));
return new ObjectField(key, v);
}
@@ -312,7 +334,7 @@ public final class Variant {
* @throws IllegalArgumentException if `getType()` does not return
`Type.ARRAY`
*/
public int numArrayElements() {
- return VariantUtil.getArrayInfo(value).numElements;
+ return arrayInfo().numElements;
}
/**
@@ -324,23 +346,66 @@ public final class Variant {
* @throws IllegalArgumentException if `getType()` does not return
`Type.ARRAY`
*/
public Variant getElementAtIndex(int index) {
- VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value);
+ VariantUtil.ArrayInfo info = arrayInfo();
if (index < 0 || index >= info.numElements) {
return null;
}
- return getElementAtIndex(
- index,
- value,
- metadata,
- info.offsetSize,
- value.position() + info.offsetStartOffset,
- value.position() + info.dataStartOffset);
+ int offsetStart = value.position() + info.offsetStartOffset;
+ int dataStart = value.position() + info.dataStartOffset;
+ int offset =
+ VariantUtil.readUnsignedLittleEndian(value, offsetStart +
info.offsetSize * index, info.offsetSize);
+ return childVariant(VariantUtil.slice(value, dataStart + offset));
+ }
+
+ /**
+ * Creates a child Variant that shares this instance's metadata cache.
+ */
+ private Variant childVariant(ByteBuffer childValue) {
+ return new Variant(childValue, metadata, metadataCache, dictSize);
}
- private static Variant getElementAtIndex(
- int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int
offsetStart, int dataStart) {
- // offsetStart and dataStart are absolute positions in the `value` buffer.
- int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize *
index, offsetSize);
- return new Variant(VariantUtil.slice(value, dataStart + offset), metadata);
+ /**
+ * Returns the metadata dictionary string for the given ID, caching the
result.
+ */
+ String getMetadataKeyCached(int id) {
+ if (id < 0 || id >= dictSize) {
+ return VariantUtil.getMetadataKey(metadata, id);
+ }
+ // Demand-create shared dictionary cache.
+ String[] cache = metadataCache;
+ if (cache == null) {
+ cache = new String[dictSize];
+ metadataCache = cache;
+ }
+ String key = cache[id];
+ if (key == null) {
+ key = VariantUtil.getMetadataKey(metadata, id);
+ cache[id] = key;
+ }
+ return key;
+ }
+
+ /**
+ * Returns the cached object header, parsing it on first access.
+ */
+ private VariantUtil.ObjectInfo objectInfo() {
+ VariantUtil.ObjectInfo info = cachedObjectInfo;
+ if (info == null) {
+ info = VariantUtil.getObjectInfo(value);
+ cachedObjectInfo = info;
+ }
+ return info;
+ }
+
+ /**
+ * Returns the cached array header, parsing it on first access.
+ */
+ private VariantUtil.ArrayInfo arrayInfo() {
+ VariantUtil.ArrayInfo info = cachedArrayInfo;
+ if (info == null) {
+ info = VariantUtil.getArrayInfo(value);
+ cachedArrayInfo = info;
+ }
+ return info;
}
}
diff --git
a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
index bf42b0c44..4e166190d 100644
---
a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
+++
b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
+import org.apache.parquet.io.api.Binary;
/**
* Builder for creating Variant value and metadata.
@@ -109,7 +110,14 @@ public class VariantBuilder {
*/
public void appendString(String str) {
onAppend();
- byte[] data = str.getBytes(StandardCharsets.UTF_8);
+ writeUTF8bytes(str.getBytes(StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Write bytes as a UTF8 string.
+ * @param data data to write; this is not modified.
+ */
+ private void writeUTF8bytes(final byte[] data) {
boolean longStr = data.length > VariantUtil.MAX_SHORT_STR_SIZE;
checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + data.length);
if (longStr) {
@@ -125,6 +133,16 @@ public class VariantBuilder {
writePos += data.length;
}
+ /**
+ * Given a Binary, append it to the variant as a string.
+ * Avoids intermediate String creation when unmarshalling from shredded
string columns.
+ * @param binary source data.
+ */
+ void appendAsString(Binary binary) {
+ onAppend();
+ writeUTF8bytes(binary.getBytesUnsafe());
+ }
+
/**
* Appends a null value to the Variant builder.
*/
diff --git
a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
index 6d0986c2b..bda088c55 100644
---
a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
+++
b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantConverters.java
@@ -233,6 +233,7 @@ public class VariantConverters {
PartiallyShreddedFieldsConverter(GroupType fieldsType,
ParentConverter<VariantBuilder> parent) {
this.converters = new Converter[fieldsType.getFieldCount()];
this.parent = parent;
+ ParentConverter<VariantObjectBuilder> newParent = converter ->
converter.accept(objectBuilder);
for (int index = 0; index < fieldsType.getFieldCount(); index += 1) {
Type field = fieldsType.getType(index);
@@ -240,7 +241,6 @@ public class VariantConverters {
String name = field.getName();
shreddedFieldNames.add(name);
- ParentConverter<VariantObjectBuilder> newParent = converter ->
converter.accept(objectBuilder);
converters[index] = new FieldValueConverter(name, field.asGroupType(),
newParent);
}
}
@@ -501,7 +501,7 @@ public class VariantConverters {
@Override
public void addBinary(Binary value) {
- parent.build(builder -> builder.appendString(value.toStringUsingUTF8()));
+ parent.build(builder -> builder.appendAsString(value));
}
}
diff --git
a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java
b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java
index ef1168583..7ad867e0f 100644
--- a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java
+++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.HashMap;
+import org.apache.parquet.Preconditions;
/**
* This class defines constants related to the Variant format and provides
functions for
@@ -299,6 +300,28 @@ class VariantUtil {
return result;
}
+ /**
+ * Fast little-endian unsigned read using bulk ByteBuffer operations.
+ * Requires the buffer to have {@link java.nio.ByteOrder#LITTLE_ENDIAN} byte
order.
+ * Adapted from Apache Iceberg's VariantUtil.readLittleEndianUnsigned.
+ */
+ static int readUnsignedLittleEndian(ByteBuffer buffer, int pos, int
numBytes) {
+ switch (numBytes) {
+ case 1:
+ return buffer.get(pos) & U8_MAX;
+ case 2:
+ return buffer.getShort(pos) & U16_MAX;
+ case 3:
+ return (buffer.getShort(pos) & U16_MAX) | ((buffer.get(pos + 2) &
U8_MAX) << 16);
+ case 4:
+ int v = buffer.getInt(pos);
+ Preconditions.checkArgument(v >= 0, "Failed to read unsigned int.
numBytes: " + numBytes);
+ return v;
+ default:
+ throw new IllegalArgumentException(String.format("Invalid numBytes:
%d", numBytes));
+ }
+ }
+
/**
* Returns the value type of Variant value `value[pos...]`. It is only legal
to call `get*` if
* `getType` returns the corresponding type. For example, it is only legal
to call
diff --git
a/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantObject.java
b/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantObject.java
index ddcf9f7fd..1c823bd76 100644
---
a/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantObject.java
+++
b/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantObject.java
@@ -342,6 +342,36 @@ public class TestVariantObject {
}
}
+ @Test
+ public void testMalformedMetadataDictSize() {
+ // Metadata header: version=1, offsetSize=1. Declares dictSize=200, but the
+ // buffer is only 3 bytes, so the offset table cannot fit.
+ byte[] metadata = new byte[] {0x01, (byte) 200, 0x00};
+ byte[] value = new byte[] {0x00};
+ Assert.assertThrows(
+ IllegalArgumentException.class, () -> new
Variant(ByteBuffer.wrap(value), ByteBuffer.wrap(metadata)));
+ }
+
+ @Test
+ public void testMalformedMetadataLargeDictSize() {
+ // Header byte 0xC1: offsetSize=4, version=1. Declares
dictSize=Integer.MAX_VALUE
+ // to guard against int overflow in the bound check arithmetic.
+ byte[] metadata = new byte[] {(byte) 0xC1, (byte) 0xFF, (byte) 0xFF,
(byte) 0xFF, (byte) 0x7F};
+ byte[] value = new byte[] {0x00};
+ Assert.assertThrows(
+ IllegalArgumentException.class, () -> new
Variant(ByteBuffer.wrap(value), ByteBuffer.wrap(metadata)));
+ }
+
+ @Test
+ public void testMalformedMetadataTruncated() {
+ // Header byte 0xC1 declares offsetSize=4, but only 3 bytes total so the
+ // dictSize field itself can't be read.
+ byte[] metadata = new byte[] {(byte) 0xC1, 0x00, 0x00};
+ byte[] value = new byte[] {0x00};
+ Assert.assertThrows(
+ IllegalArgumentException.class, () -> new
Variant(ByteBuffer.wrap(value), ByteBuffer.wrap(metadata)));
+ }
+
@Test
public void testMetadataWithNonZeroPositionReadOnly() {
// Build a variant with object fields to populate the metadata dictionary