This is an automated email from the ASF dual-hosted git repository.
Fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 28593d5e4 GH-3451: Add JMH benchmark for variants (#3452)
28593d5e4 is described below
commit 28593d5e46b228c389ebac5a6ae82b46a1c1c6b8
Author: Steve Loughran <[email protected]>
AuthorDate: Mon May 11 19:52:17 2026 +0100
GH-3451: Add JMH benchmark for variants (#3452)
* GH-3451. Add a JMH benchmark for variants
Initial impl.
* WiP: add a deeper version.
* revert plans for a deeper version
* fixes to this benchmark (copilot review)
* deser to recurse down
* include uuid and bigdecimal
* reset counter on benchmark setup
* Measure parquet write/read costs.
iterations of class code and #of rows are the same
for easy compare of overheads.
* WiP, on a file read/write benchmark
Using the same structure as the iceberg tests do
* building a projection schema
* projection works
* Test to validate impact of lean schema against unshredded files.
painful.
* GH-561 ongoing work
* GH-561 profile driven review of VariantConverters.
* GH-561 Variant benchmarks
* Move under variant package to access private members
* add "variant" group to run.sh
* new benchmark VariantConverterBenchmark
* spotless
* Benchmark enhancements
* UUIDs in projection benchmark.
* col4 var column now a long string and named as such.
* builder and converter benchmarks use @OperationsPerInvocation
rather than internal iterators.
* revert production code enhancement.
including pulling a benchmark from VariantConverters.
Pull the warmup/measurement times to a shared constant.
If someone thinks the numbers should be bigger: one place to change.
---
parquet-benchmarks/pom.xml | 5 +
parquet-benchmarks/run.sh | 4 +
.../VariantBenchmarkMeasurementSettings.java | 36 +
.../parquet/variant/VariantBuilderBenchmark.java | 1045 ++++++++++++++++++++
.../parquet/variant/VariantConverterBenchmark.java | 126 +++
.../variant/VariantProjectionBenchmark.java | 642 ++++++++++++
6 files changed, 1858 insertions(+)
diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml
index 65d6dbf3e..d5a288b67 100644
--- a/parquet-benchmarks/pom.xml
+++ b/parquet-benchmarks/pom.xml
@@ -52,6 +52,11 @@
<artifactId>parquet-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-variant</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
diff --git a/parquet-benchmarks/run.sh b/parquet-benchmarks/run.sh
index 7589b55ee..d5e9b1656 100755
--- a/parquet-benchmarks/run.sh
+++ b/parquet-benchmarks/run.sh
@@ -43,6 +43,7 @@ read | Reading files with different compression, page
and block sizes.
write | Writing files.
checksum | Reading and writing with and without CRC checksums.
filter | Filtering column indexes
+variant | Variant performance
Examples:
@@ -100,6 +101,9 @@ else
"filter")
BENCHMARK_REGEX="org.apache.parquet.benchmarks.FilteringBenchmarks"
;;
+ "variant")
+ BENCHMARK_REGEX="org.apache.parquet.variant.Variant*"
+ ;;
esac
echo JMH command: java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar
$BENCHMARK_REGEX $JMH_OPTIONS
diff --git
a/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBenchmarkMeasurementSettings.java
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBenchmarkMeasurementSettings.java
new file mode 100644
index 000000000..21ab5c4fa
--- /dev/null
+++
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBenchmarkMeasurementSettings.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.variant;
+
+/**
+ * Common measurement settings.
+ */
+class VariantBenchmarkMeasurementSettings {
+
+ /**
+ * How long to warm up. Goal: JIT compiler has done its work.
+ * If flamegraphs show compilation: increase.
+ */
+ static final int SMALL_BENCHMARK_WARMUP = 500;
+
+ /**
+ * Meaurement for the small benchmarks.
+ */
+ static final int SMALL_BENCHMARK_MEASUREMENTS = 500;
+}
diff --git
a/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBuilderBenchmark.java
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBuilderBenchmark.java
new file mode 100644
index 000000000..65a000d82
--- /dev/null
+++
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantBuilderBenchmark.java
@@ -0,0 +1,1045 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.variant;
+
+import static
org.apache.parquet.variant.VariantBenchmarkMeasurementSettings.SMALL_BENCHMARK_MEASUREMENTS;
+import static
org.apache.parquet.variant.VariantBenchmarkMeasurementSettings.SMALL_BENCHMARK_WARMUP;
+
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMH benchmarks for {@link VariantBuilder}: construction, serialization and
deserialization of
+ * Variant objects.
+ *
+ * <p>The benchmark mirrors the structure of the Iceberg {@code
VariantSerializationBenchmark} so
+ * that results from the two projects can be compared directly. Parameters are
kept identical where
+ * the APIs permit:
+ *
+ * <ul>
+ * <li>{@link #fieldCount} – total number of top-level fields per object.
+ * <li>{@link #depth} – {@code Flat} (primitives only) or {@code Nested}
(some fields are
+ * 5-field sub-objects).
+ * </ul>
+ *
+ * <p>Unlike the Iceberg benchmark there is no "shredded percent" axis:
parquet-java's
+ * {@link VariantBuilder} constructs unshredded Variant binary directly;
shredding is handled
+ * separately by the Parquet writer layer.
+ *
+ * <p>Build and run:
+ *
+ * <pre>
+ * ./mvnw --projects parquet-benchmarks -amd -DskipTests
-Denforcer.skip=true clean package
+ * ./parquet-benchmarks/run.sh all
org.apache.parquet.variant.VariantBuilderBenchmark \
+ * -wi 5 -i 5 -f 1 -rff target/results.json
+ * </pre>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = SMALL_BENCHMARK_WARMUP)
+@Measurement(iterations = SMALL_BENCHMARK_MEASUREMENTS)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class VariantBuilderBenchmark {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VariantBuilderBenchmark.class);
+
+ /** Whether to include nested sub-objects in the field values. */
+ public enum Depth {
+ /** Flat structure: no nesting. */
+ Flat,
+ /** Nested values. */
+ Nested,
+ }
+ /**
+ * Iterations on the small benchmarks whose operations are so fast that
clocks, especially ARM clocks,
+ * can't reliably measure them.
+ */
+ private static final int ITERATIONS = 1000;
+
+ /** Number of rows written per file in the file-based write/read benchmarks.
*/
+ private static final int FILE_ROWS = ITERATIONS;
+
+ /** Total number of top-level fields in each variant object. */
+ @Param({"200" /*, "1000"*/})
+ private int fieldCount;
+
+ /** Whether to include nested variant objects as some of the field values. */
+ @Param
+ private Depth depth;
+
+ /**
+ * A counter of strings created; used to ensure limited uniqueness in
strings.
+ * Reset to 0 in {@link #setupTrial()} so each trial is reproducible.
+ */
+ private static int counter;
+
+ /**
+ * Get a count value.
+ * @return a new value.
+ */
+ private static int count() {
+ int c = counter++;
+ if (c >= 512) {
+ counter = 0;
+ c = counter;
+ }
+ return c;
+ }
+ /**
+ * Type of a field and the operations to (a) append an instance of that type
to
+ * the variant builder and (b) add the type to a GroupBuilder.
+ */
+ private enum FieldType {
+ String((o, builder) -> builder.appendString(((String) o) + count()), b ->
b.optional(PrimitiveTypeName.BINARY)
+ .as(LogicalTypeAnnotation.stringType())
+ .named("typed_value")),
+ Int((o, builder) -> builder.appendInt((Integer) o), b ->
b.optional(PrimitiveTypeName.INT32)
+ .named("typed_value")),
+ Long((o, builder) -> builder.appendLong((Long) o), b ->
b.optional(PrimitiveTypeName.INT64)
+ .named("typed_value")),
+ Float((o, builder) -> builder.appendFloat((Float) o), b ->
b.optional(PrimitiveTypeName.FLOAT)
+ .named("typed_value")),
+ Double((o, builder) -> builder.appendDouble((Double) o), b ->
b.optional(PrimitiveTypeName.DOUBLE)
+ .named("typed_value")),
+ BigDecimal((o, builder) -> builder.appendDecimal((BigDecimal) o), b ->
b.optional(PrimitiveTypeName.INT32)
+ .as(LogicalTypeAnnotation.decimalType(0, 9))
+ .named("typed_value")),
+ UUID((o, builder) -> builder.appendUUID((UUID) o), b ->
b.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
+ .length(16)
+ .as(LogicalTypeAnnotation.uuidType())
+ .named("typed_value")),
+ /** Nested MUST be the last in the enum. */
+ Nested(
+ (o, builder) -> {
+ throw new UnsupportedOperationException("Nested object");
+ },
+ b -> {
+ /* falls back to value column */
+ });
+
+ /**
+ * Append an object during variant construction.
+ */
+ final BiConsumer<Object, VariantObjectBuilder> append;
+
+ final Consumer<Types.GroupBuilder<GroupType>> addTypedValue;
+
+ FieldType(
+ final BiConsumer<Object, VariantObjectBuilder> append,
+ final Consumer<Types.GroupBuilder<GroupType>> addTypedValue) {
+ this.append = append;
+ this.addTypedValue = addTypedValue;
+ }
+
+ void append(Object o, VariantObjectBuilder builder) {
+ append.accept(o, builder);
+ }
+
+ void addTypedValue(Types.GroupBuilder<GroupType> builder) {
+ addTypedValue.accept(builder);
+ }
+ }
+
+ /**
+ * Each field entry is its type and the value.
+ */
+ private static final class FieldEntry {
+ final FieldType type;
+ final Object value;
+
+ public FieldEntry(final FieldType type, final Object value) {
+ this.type = type;
+ this.value = value;
+ }
+ }
+
+ /** Number of fields in each nested sub-object in a nested variant. */
+ private static final int NESTED_FIELD_COUNT = 5;
+
+ // ---- state built once per trial ----
+
+ /** Ordered list of top-level field names, e.g. "field_0" … "field_N-1". */
+ private List<String> fieldNames;
+ /**
+ * Some types are pregenerated to keep RNG costs out of the benchmark,
placed in generic object map then
+ * cast to the correct type.
+ */
+ private FieldEntry[] fieldValues;
+ /**
+ * Indices of fields that are strings, used when constructing nested
sub-objects so that nested
+ * fields share the top-level field-name dictionary.
+ */
+ private int[] stringFieldIndices;
+
+ /**
+ * How many string fields were generated.
+ */
+ private int stringFieldCount;
+ /**
+ * A pre-built {@link Variant} for all benchmarks which want to keep build
costs out
+ * of their measurements.
+ */
+ private Variant preBuiltVariant;
+ /**
+ * Fixed random seed for reproducibility across runs. The same seed is used
in the Iceberg
+ * benchmark.
+ */
+ private Random random;
+
+ /** Shredded schema for the variant, built from field types in setup. */
+ private GroupType shreddedSchema;
+
+ /** Unshredded schema (metadata + value only, no typed_value). */
+ private GroupType unshreddedSchema;
+
+ /** No-op RecordConsumer that discards all output. */
+ private RecordConsumer noopConsumer;
+
+ /** Pre-written shredded Parquet file bytes, used by {@link
#readFileShredded}. */
+ private byte[] shreddedFileBytes;
+
+ /** Pre-written unshredded Parquet file bytes, used by {@link
#readFileUnshredded}. */
+ private byte[] unshreddedFileBytes;
+
+ // ------------------------------------------------------------------
+ // Setup
+ // ------------------------------------------------------------------
+
+ /**
+ * Build all benchmark state. Called once per parameter combination before
any iterations run.
+ *
+ * <p>Field values are pre-decided (type tags + numeric arrays) so benchmark
methods are free of
+ * allocation and RNG cost outside what VariantBuilder itself does.
+ */
+ @Setup(Level.Trial)
+ public void setupTrial() {
+ random = new Random(0x1ceb1cebL);
+ counter = 0;
+
+ // --- field names ---
+ fieldNames = new ArrayList<>(fieldCount);
+ for (int i = 0; i < fieldCount; i++) {
+ fieldNames.add("field_" + i);
+ }
+
+ // --- pre-generate typed values ---
+ // Type distribution: biased towards strings.
+
+ int typeCount = FieldType.Nested.ordinal();
+ if (depth == Depth.Flat) {
+ typeCount--;
+ }
+
+ List<Integer> stringIndices = new ArrayList<>();
+
+ fieldValues = new FieldEntry[fieldCount];
+ for (int i = 0; i < fieldCount; i++) {
+
+ // slightly more than the type count as there are extra strings
+ int typeIndex = random.nextInt(typeCount + 4);
+ // based on type, create entries.
+ FieldEntry fieldEntry;
+ switch (typeIndex) {
+ case 0:
+ fieldEntry = new FieldEntry(FieldType.String, "string-");
+ break;
+ case 1:
+ fieldEntry = new FieldEntry(FieldType.String, "longer string-");
+ break;
+ case 2:
+ fieldEntry =
+ new FieldEntry(FieldType.String, "a longer string assuming these
will be more common #");
+ break;
+ case 3: // no char option here
+ fieldEntry = new FieldEntry(FieldType.String, "a");
+ break;
+ case 4:
+ fieldEntry = new FieldEntry(FieldType.Int, random.nextInt());
+ break;
+ case 5:
+ fieldEntry = new FieldEntry(FieldType.Long, random.nextLong());
+ break;
+ case 6:
+ fieldEntry = new FieldEntry(FieldType.Float, random.nextFloat());
+ break;
+ case 7:
+ fieldEntry = new FieldEntry(FieldType.Double, random.nextDouble());
+ break;
+ case 8:
+ fieldEntry = new FieldEntry(FieldType.BigDecimal,
BigDecimal.valueOf(random.nextInt()));
+ break;
+ case 9:
+ fieldEntry = new FieldEntry(FieldType.UUID, UUID.randomUUID());
+ break;
+ case 10:
+ fieldEntry = new FieldEntry(FieldType.Nested, null);
+ break;
+ default:
+ throw new AssertionError("out of range: " + typeIndex);
+ }
+ // safety check
+ Objects.requireNonNull(fieldEntry, "field entry is null");
+ fieldValues[i] = fieldEntry;
+ if (fieldEntry.type == FieldType.String) {
+ stringIndices.add(i);
+ }
+ }
+
+ stringFieldCount = stringIndices.size();
+ stringFieldIndices = new int[stringFieldCount];
+ for (int i = 0; i < stringFieldCount; i++) {
+ stringFieldIndices[i] = stringIndices.get(i);
+ }
+
+ // --- pre-built variant for deserialization benchmark ---
+ preBuiltVariant = buildVariant();
+ // for writing
+ noopConsumer = new NoopRecordConsumer();
+
+ // --- schemas for shredding benchmarks ---
+ unshreddedSchema = Types.buildGroup(Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.variantType((byte) 1))
+ .required(PrimitiveTypeName.BINARY)
+ .named("metadata")
+ .optional(PrimitiveTypeName.BINARY)
+ .named("value")
+ .named("variant_field");
+ shreddedSchema = buildShreddedSchema();
+
+ // --- pre-written Parquet files for file-based read benchmarks ---
+ try {
+ shreddedFileBytes = writeVariantsToMemory(shreddedSchema);
+ unshreddedFileBytes = writeVariantsToMemory(unshreddedSchema);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to pre-write variant files", e);
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // Benchmark methods
+ // ------------------------------------------------------------------
+
+ /**
+ * Create a {@link VariantBuilder} from scratch, append all fields, call
{@link
+ * VariantBuilder#build()}. Measures object construction including
dictionary encoding.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void buildVariant(Blackhole bh) {
+ Variant v = buildVariant();
+ bh.consume(v.getValueBuffer());
+ bh.consume(v.getMetadataBuffer());
+ }
+
+ /**
+ * Serialize-only: re-serializes the pre-built variant value buffer.
Isolates the cost of
+ * extracting the encoded bytes from a finished Variant without paying for
construction.
+ *
+ * <p>Because {@link Variant#getValueBuffer()} returns the existing buffer,
this benchmark
+ * primarily measures the ByteBuffer access and the Blackhole overhead..
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void serializeVariant(Blackhole bh) {
+ // duplicate() gives an independent position/limit on the same backing
array –
+ // equivalent to the Iceberg benchmark's outputBuffer.clear() + writeTo()
pattern.
+ ByteBuffer value = preBuiltVariant.getValueBuffer().duplicate();
+ ByteBuffer meta = preBuiltVariant.getMetadataBuffer().duplicate();
+ bh.consume(value);
+ bh.consume(meta);
+ }
+
+ /**
+ * Read path: iterate all fields of the pre-built variant, extracting each
value. This exercises
+ * the field-name lookup and type dispatch that a query engine performs on
every row. Nested
+ * objects are recursively traversed so that {@code depth=Nested} incurs the
full deserialization
+ * cost of sub-objects.
+ * @param blackhole black hole.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void deserializeVariant(Blackhole blackhole) {
+ deserializeAndConsume(preBuiltVariant, blackhole);
+ }
+
+ /**
+ * Shred the pre-built variant into a fully typed schema. Measures the cost
of type dispatch,
+ * field matching, and recursive decomposition that {@link
VariantValueWriter} perform
+ * @param blackhole black hole.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void consumeRecordsShredded(Blackhole blackhole) {
+ VariantValueWriter.write(noopConsumer, shreddedSchema, preBuiltVariant);
+ blackhole.consume(noopConsumer);
+ }
+
+ /**
+ * Write {@link #FILE_ROWS} rows of the pre-built variant to an in-memory
Parquet file using the
+ * shredded schema. Measures end-to-end Parquet encoding cost including
page/row-group framing.
+ * Compare with {@link #consumeRecordsShredded} to quantify the overhead
over raw schema traversal.
+ * @param blackhole black hole.
+ */
+ @Benchmark
+ public void writeToMemoryFile(Blackhole blackhole) throws IOException {
+ writeToMemory(blackhole, shreddedSchema);
+ }
+
+ /**
+ * Write the pre-built variant to an unshredded schema (metadata + value
only).
+ * This is the baseline: the entire variant is written as a single binary
blob.
+ * Compare with {@link #consumeRecordsShredded} to see the cost of shredding.
+ * @param blackhole black hole.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void consumeRecordsUnshredded(Blackhole blackhole) {
+ VariantValueWriter.write(noopConsumer, unshreddedSchema, preBuiltVariant);
+ blackhole.consume(noopConsumer);
+ }
+
+ /**
+ * Write {@link #FILE_ROWS} rows of the pre-built variant to an in-memory
Parquet file using the
+ * unshredded schema (metadata + value binary blobs only). Baseline for
{@link #writeToMemoryFile}.
+ * @param blackhole black hole.
+ */
+ @Benchmark
+ public void writeToMemoryUnshredded(Blackhole blackhole) throws IOException {
+ writeToMemory(blackhole, unshreddedSchema);
+ }
+
+ /**
+ * Read all rows from the pre-written shredded Parquet file in memory.
Measures full Parquet
+ * decode cost including typed column decoding and Variant reassembly.
+ * @param blackhole black hole.
+ * @throws IOException IO failure.
+ */
+ @Benchmark
+ public void readFileShredded(Blackhole blackhole) throws IOException {
+ final ByteArrayInputFile inputFile = new
ByteArrayInputFile(shreddedFileBytes);
+ consumeInputFile(blackhole, inputFile);
+ }
+
+ /**
+ * Read all rows from the pre-written unshredded Parquet file in memory.
Baseline for
+ * {@link #readFileShredded}: measures raw binary blob read with no typed
column decoding.
+ * @param blackhole black hole.
+ * @throws IOException IO failure.
+ */
+ @Benchmark
+ public void readFileUnshredded(Blackhole blackhole) throws IOException {
+ consumeInputFile(blackhole, new ByteArrayInputFile(unshreddedFileBytes));
+ }
+
+ // ------------------------------------------------------------------
+ // Internal helpers
+ // ------------------------------------------------------------------
+
+ /**
+ * Build a complete Variant object from the pre-decided field types.
+ */
+ private Variant buildVariant() {
+ VariantBuilder builder = new VariantBuilder();
+ VariantObjectBuilder ob = builder.startObject();
+
+ for (int i = 0; i < fieldCount; i++) {
+ ob.appendKey(fieldNames.get(i));
+ appendFieldValue(ob, i);
+ }
+
+ builder.endObject();
+ return builder.build();
+ }
+
+ /**
+ * Append the value for field {@code i} to {@code ob} according to its type,
building nested objects on demand.
+ * @param ob object
+ * @param index index
+ */
+ private void appendFieldValue(VariantObjectBuilder ob, int index) {
+ final FieldEntry entry = fieldValues[index];
+ // special handling of nested.
+ if (entry.type == FieldType.Nested) {
+ if (depth == Depth.Nested && stringFieldCount > 0) {
+ appendNestedObject(ob, index);
+ } else {
+ // outlier.
+ ob.appendNull();
+ }
+ } else {
+ entry.type.append(entry.value, ob);
+ }
+ }
+
+ /**
+ * Append a nested sub-object with {@link #NESTED_FIELD_COUNT} string
fields. Field names are
+ * drawn from the set of top-level string fields so the nested dictionary
overlaps with the parent.
+ *
+ * @param parentOb parent object.
+ * @param parentIndex parent index.
+ */
+ private void appendNestedObject(VariantObjectBuilder parentOb, int
parentIndex) {
+ // VariantObjectBuilder does not expose startObject() for nesting directly;
+ // build the nested variant separately and embed it as an encoded value.
+ VariantBuilder nestedBuilder = new VariantBuilder();
+ VariantObjectBuilder nestedOb = nestedBuilder.startObject();
+
+ for (int j = 0; j < NESTED_FIELD_COUNT; j++) {
+ int nameIdx = stringFieldIndices[random.nextInt(stringFieldCount)];
+ nestedOb.appendKey(fieldNames.get(nameIdx));
+ nestedOb.appendString("nested_" + parentIndex + "_" + j);
+ }
+
+ nestedBuilder.endObject();
+ Variant nested = nestedBuilder.build();
+ // embed the nested value buffer directly
+ parentOb.appendEncodedValue(nested.getValueBuffer());
+ }
+
+ /**
+ * Build a shredded schema with typed_value columns matching each field's
type.
+ * For nested fields, the typed_value is an object group with string
sub-fields.
+ * @return the group type for a shredded object.
+ */
+ private GroupType buildShreddedSchema() {
+ Types.GroupBuilder<GroupType> typedValueBuilder = Types.optionalGroup();
+
+ for (int i = 0; i < fieldCount; i++) {
+ FieldEntry entry = fieldValues[i];
+ // Each field in typed_value is a variant group: optional value +
optional typed_value
+ Types.GroupBuilder<GroupType> fieldBuilder = Types.optionalGroup();
+ fieldBuilder.optional(PrimitiveTypeName.BINARY).named("value");
+ entry.type.addTypedValue(fieldBuilder);
+ typedValueBuilder.addField(fieldBuilder.named(fieldNames.get(i)));
+ }
+
+ return Types.buildGroup(Repetition.REQUIRED)
+ .as(LogicalTypeAnnotation.variantType((byte) 1))
+ .required(PrimitiveTypeName.BINARY)
+ .named("metadata")
+ .optional(PrimitiveTypeName.BINARY)
+ .named("value")
+ .addField(typedValueBuilder.named("typed_value"))
+ .named("variant_field");
+ }
+
+ /**
+ * Recursively deserialize a variant object, descending into any nested
objects.
+ *
+ * @param variant variant to deserialize.
+ * @param blackhole black hole.
+ */
+ private void deserializeAndConsume(Variant variant, Blackhole blackhole) {
+ int n = variant.numObjectElements();
+ for (int i = 0; i < n; i++) {
+ Variant.ObjectField field = variant.getFieldAtIndex(i);
+ blackhole.consume(field.key);
+ if (field.value.getType() == Variant.Type.OBJECT) {
+ deserializeAndConsume(field.value, blackhole);
+ } else {
+ blackhole.consume(field.value.getValueBuffer());
+ }
+ }
+ }
+
+ /**
+ * Write {@link #FILE_ROWS} copies of {@link #preBuiltVariant} to a fresh
in-memory Parquet file
+ * using the given schema. Used both in {@link #setupTrial()} to pre-build
read buffers and as the
+ * body of the write-file benchmarks.
+ *
+ * @param schema group schema.
+ * @return the byte of an in-memory parquet file.
+ * @throws IOException IO failure.
+ */
+ private byte[] writeVariantsToMemory(GroupType schema) throws IOException {
+ ByteArrayOutputFile out = new ByteArrayOutputFile();
+ try (ParquetWriter<Variant> writer = new VariantWriterBuilder(out,
schema).build()) {
+ for (int i = 0; i < FILE_ROWS; i++) {
+ writer.write(preBuiltVariant);
+ }
+ }
+ LOG.info("Written Parquet file has size: {}", out.size());
+ return out.toByteArray();
+ }
+
+ /**
+ * Write the prebuilt variant to a memory output stream.
+ * As the same variant is written, compression on the shredded variant
should be good.
+ * @param blackhole black hole
+ * @param schema schema
+ * @throws IOException write failure
+ */
+ private void writeToMemory(final Blackhole blackhole, final GroupType
schema) throws IOException {
+ blackhole.consume(writeVariantsToMemory(schema));
+ }
+
+ /**
+ * Consume an Input file.
+ * @param blackhole black hole
+ * @param inputFile input file
+ * @throws IOException IO failure.
+ */
+ private static void consumeInputFile(final Blackhole blackhole, final
ByteArrayInputFile inputFile)
+ throws IOException {
+ try (ParquetReader<Variant> reader = new
VariantReaderBuilder(inputFile).build()) {
+ Variant v;
+ while ((v = reader.read()) != null) {
+ blackhole.consume(v);
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // In-memory I/O
+ // ------------------------------------------------------------------
+
+ /** An {@link OutputFile} backed by a {@link ByteArrayOutputStream}. */
+ private static final class ByteArrayOutputFile implements OutputFile {
+ private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ byte[] toByteArray() {
+ return baos.toByteArray();
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) {
+ return newStream();
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+ return newStream();
+ }
+
+ private PositionOutputStream newStream() {
+ return new PositionOutputStream() {
+ private long pos = 0;
+
+ @Override
+ public void write(int b) throws IOException {
+ baos.write(b);
+ pos++;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ baos.write(b, off, len);
+ pos += len;
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+ };
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 0;
+ }
+
+ int size() {
+ return baos.size();
+ }
+ }
+
+ /** An {@link InputFile} backed by a {@code byte[]}. */
+ private static final class ByteArrayInputFile implements InputFile {
+ private final byte[] data;
+
+ ByteArrayInputFile(byte[] data) {
+ this.data = data;
+ }
+
+ @Override
+ public long getLength() {
+ return data.length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new SeekableInputStream() {
+ private int pos = 0;
+
+ @Override
+ public int read() {
+ return pos < data.length ? (data[pos++] & 0xFF) : -1;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ int remaining = data.length - pos;
+ if (remaining <= 0) return -1;
+ int n = Math.min(len, remaining);
+ System.arraycopy(data, pos, b, off, n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public long getPos() {
+ return pos;
+ }
+
+ @Override
+ public void seek(long newPos) {
+ pos = (int) newPos;
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ readFully(bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int start, int len) throws
IOException {
+ if (pos + len > data.length) {
+ throw new EOFException("Unexpected end of data");
+ }
+ System.arraycopy(data, pos, bytes, start, len);
+ pos += len;
+ }
+
+ @Override
+ public int read(ByteBuffer buf) {
+ int len = buf.remaining();
+ int remaining = data.length - pos;
+ if (remaining <= 0) return -1;
+ int n = Math.min(len, remaining);
+ buf.put(data, pos, n);
+ pos += n;
+ return n;
+ }
+
+ @Override
+ public void readFully(ByteBuffer buf) throws IOException {
+ int len = buf.remaining();
+ if (pos + len > data.length) {
+ throw new IOException("Unexpected end of data");
+ }
+ buf.put(data, pos, len);
+ pos += len;
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // Write support
+ // ------------------------------------------------------------------
+
+ /**
+ * {@link ParquetWriter.Builder} for {@link Variant} records using {@link
VariantWriteSupport}.
+ */
+ private static final class VariantWriterBuilder extends
ParquetWriter.Builder<Variant, VariantWriterBuilder> {
+ private final GroupType variantGroup;
+
+ VariantWriterBuilder(OutputFile file, GroupType variantGroup) {
+ super(file);
+ this.variantGroup = variantGroup;
+ }
+
+ @Override
+ protected VariantWriterBuilder self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport<Variant> getWriteSupport(Configuration conf) {
+ return new VariantWriteSupport(variantGroup);
+ }
+ }
+
+ /**
+ * {@link WriteSupport} that writes a single {@link Variant} field named
{@code "variant_field"}
+ * per message using {@link VariantValueWriter}.
+ */
+ private static final class VariantWriteSupport extends WriteSupport<Variant>
{
+ private static final String FIELD_NAME = "variant_field";
+ private final GroupType variantGroup;
+ private RecordConsumer consumer;
+
+ VariantWriteSupport(GroupType variantGroup) {
+ this.variantGroup = variantGroup;
+ }
+
+ @Override
+ public WriteContext init(Configuration conf) {
+ return new WriteContext(new MessageType("message", variantGroup),
Collections.emptyMap());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.consumer = recordConsumer;
+ }
+
+ @Override
+ public void write(Variant record) {
+ consumer.startMessage();
+ consumer.startField(FIELD_NAME, 0);
+ VariantValueWriter.write(consumer, variantGroup, record);
+ consumer.endField(FIELD_NAME, 0);
+ consumer.endMessage();
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // Read support
+ // ------------------------------------------------------------------
+
+ /**
+ * {@link ParquetReader.Builder} for {@link Variant} records using {@link
VariantReadSupport}.
+ */
+ private static final class VariantReaderBuilder extends
ParquetReader.Builder<Variant> {
+ VariantReaderBuilder(InputFile file) {
+ super(file);
+ }
+
+ @Override
+ protected ReadSupport<Variant> getReadSupport() {
+ return new VariantReadSupport();
+ }
+ }
+
+ /**
+ * {@link ReadSupport} that materializes each row as a {@link Variant} using
+ * {@link VariantConverters}.
+ */
+ private static final class VariantReadSupport extends ReadSupport<Variant> {
+ @Override
+ public ReadContext init(InitContext context) {
+ return new ReadContext(context.getFileSchema());
+ }
+
+ @Override
+ public RecordMaterializer<Variant> prepareForRead(
+ Configuration conf,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ GroupType variantGroup =
fileSchema.getType("variant_field").asGroupType();
+ return new VariantRecordMaterializer(fileSchema, variantGroup);
+ }
+ }
+
+ /** Materializes a single {@link Variant} from a Parquet message containing
one variant field. */
+ private static final class VariantRecordMaterializer extends
RecordMaterializer<Variant> {
+ private final MessageGroupConverter root;
+
+ VariantRecordMaterializer(MessageType messageType, GroupType variantGroup)
{
+ this.root = new MessageGroupConverter(variantGroup);
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+
+ @Override
+ public Variant getCurrentRecord() {
+ return root.getCurrentVariant();
+ }
+ }
+
+ /**
+ * Top-level (message) {@link GroupConverter} that delegates field 0 ({@code
"variant_field"})
+ * to a {@link VariantGroupConverter}.
+ */
+ private static final class MessageGroupConverter extends GroupConverter {
+ private final VariantGroupConverter variantConverter;
+
+ MessageGroupConverter(GroupType variantGroup) {
+ this.variantConverter = new VariantGroupConverter(variantGroup);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return variantConverter;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {}
+
+ Variant getCurrentVariant() {
+ return variantConverter.getCurrentVariant();
+ }
+ }
+
+ /**
+ * {@link GroupConverter} for a variant group. Implements
+ * {@link VariantConverters.ParentConverter} so it can be used with
+ * {@link VariantConverters#newVariantConverter}.
+ */
+ private static final class VariantGroupConverter extends GroupConverter
+ implements VariantConverters.ParentConverter<VariantBuilder> {
+ private final GroupConverter wrapped;
+ private VariantBuilder builder;
+ private ImmutableMetadata metadata;
+ private Variant currentVariant;
+
+ VariantGroupConverter(GroupType variantGroup) {
+ this.wrapped = VariantConverters.newVariantConverter(variantGroup,
this::setMetadata, this);
+ }
+
+ private void setMetadata(ByteBuffer buf) {
+ this.metadata = new ImmutableMetadata(buf);
+ }
+
+ @Override
+ public void build(Consumer<VariantBuilder> consumer) {
+ if (builder == null) {
+ builder = new VariantBuilder(metadata);
+ }
+ consumer.accept(builder);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return wrapped.getConverter(fieldIndex);
+ }
+
+ @Override
+ public void start() {
+ builder = null;
+ metadata = null;
+ wrapped.start();
+ }
+
+ @Override
+ public void end() {
+ wrapped.end();
+ if (builder == null) {
+ builder = new VariantBuilder(metadata);
+ }
+ builder.appendNullIfEmpty();
+ currentVariant = builder.build();
+ }
+
+ Variant getCurrentVariant() {
+ return currentVariant;
+ }
+ }
+
+ /**
+ * A {@link RecordConsumer} that discards all output, used to isolate
shredding cost
+ * from actual Parquet encoding and I/O.
+ */
+ private static final class NoopRecordConsumer extends RecordConsumer {
+ @Override
+ public void startMessage() {}
+
+ @Override
+ public void endMessage() {}
+
+ @Override
+ public void startField(String field, int index) {}
+
+ @Override
+ public void endField(String field, int index) {}
+
+ @Override
+ public void startGroup() {}
+
+ @Override
+ public void endGroup() {}
+
+ @Override
+ public void addInteger(int value) {}
+
+ @Override
+ public void addLong(long value) {}
+
+ @Override
+ public void addBoolean(boolean value) {}
+
+ @Override
+ public void addBinary(Binary value) {}
+
+ @Override
+ public void addFloat(float value) {}
+
+ @Override
+ public void addDouble(double value) {}
+ }
+}
diff --git
a/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantConverterBenchmark.java
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantConverterBenchmark.java
new file mode 100644
index 000000000..72c7ad337
--- /dev/null
+++
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantConverterBenchmark.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.variant;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static
org.apache.parquet.variant.VariantBenchmarkMeasurementSettings.SMALL_BENCHMARK_MEASUREMENTS;
+import static
org.apache.parquet.variant.VariantBenchmarkMeasurementSettings.SMALL_BENCHMARK_WARMUP;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import org.apache.parquet.io.api.Binary;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Benchmark {@link VariantConverters}. These converters are used
+ * when reconstructing shredded variants, so their performance
+ * and memory consumption is on the critical path of queries reading
+ * variants.
+ * <p>Run:
+ * <pre>
+ * ./run.sh all org.apache.parquet.variant.VariantConverterBenchmark \
+ * -f 1 -foe true -rf json -rff target/results.json
+ * </pre>
+ * <p>Profile
+ * <pre>
+ * java -jar target/parquet-benchmarks.jar VariantProjectionBenchmark \
+ * -prof "async:output=flamegraph;dir=target/perf" -rf json -rff
target/results.json
+ * </pre>
+ * */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = SMALL_BENCHMARK_WARMUP)
+@Measurement(iterations = SMALL_BENCHMARK_MEASUREMENTS)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(MICROSECONDS)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class VariantConverterBenchmark {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VariantConverterBenchmark.class);
+
+ /** Number of iterations within each benchmark.
+ * This compensates for the low resolution of ARM CPUs,
+ * while also ensuring profile graphs are filled out in detail. */
+ private static final int ITERATIONS = 100_000;
+
+ /** How long is the string to convert? */
+ @Param({"16", "512", "2048"})
+ int stringLength;
+
+ /**
+ * An input string, created in setup from random ASCII characters of length
{@link #stringLength}
+ */
+ private String inputString;
+
+ /** {@link #inputString} as a binary. */
+ private Binary inputBinary;
+
+ @Setup
+ public void setup() {
+ byte[] bytes = new byte[stringLength];
+ Random random = new Random(42);
+ for (int i = 0; i < stringLength; i++) {
+ bytes[i] = (byte) ('a' + random.nextInt(26));
+ }
+ inputString = new String(bytes, StandardCharsets.UTF_8);
+ inputBinary = Binary.fromConstantByteArray(bytes);
+ LOG.info("Setup: stringLength={}", stringLength);
+ }
+
+ /**
+ * Benchmark converting a {@link Binary} as a string value and building a
{@link Variant}.
+ * This exercises the path taken by {@link VariantConverters} when decoding
a shredded
+ * {@code typed_value} string column.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void appendStringAsString(Blackhole blackhole) {
+ VariantBuilder builder = new VariantBuilder();
+ builder.appendString(inputString);
+ blackhole.consume(builder.build());
+ }
+ /**
+ * Benchmark appending a {@link Binary} as a binary value and building a
{@link Variant}.
+ * This exercises the path taken by {@link VariantConverters} when decoding
a shredded
+ * {@code typed_value} binary column.
+ */
+ @Benchmark
+ @OperationsPerInvocation(ITERATIONS)
+ public void appendBinary(Blackhole blackhole) {
+ VariantBuilder builder = new VariantBuilder();
+ builder.appendBinary(inputBinary.toByteBuffer());
+ blackhole.consume(builder.build());
+ }
+}
diff --git
a/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantProjectionBenchmark.java
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantProjectionBenchmark.java
new file mode 100644
index 000000000..506fb95e1
--- /dev/null
+++
b/parquet-benchmarks/src/main/java/org/apache/parquet/variant/VariantProjectionBenchmark.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.variant;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.benchmarks.BenchmarkFiles;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <pre>
+ * id: int64 :- unique per row counter
+ * category: int32 :- in range 0-19: (fileNum % 2) * 10 + (id % 10);
+ * nested: variant
+ * .idstr: string :- unique string per row
+ * .varid: int64 :- id
+ * .varcategory: int32 :- category (0-19)
+ * .longstr: string :- non-unique string per row (picked from 20 values
based on category),
+ * extended to be over the "long string" threshold.
+ * </pre>
+ * <p>Build and run:
+ *
+ * <pre>
+ * ./mvnw --projects parquet-benchmarks -amd -DskipTests
-Denforcer.skip=true clean package
+ * ./parquet-benchmarks/run.sh all
org.apache.parquet.variant.VariantProjectionBenchmark \
+ * -wi 3 -i 5 -f 1 -foe true -rf json -rff target/results.json
+ * </pre>
+ *
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 5)
+@Measurement(iterations = 10)
+@BenchmarkMode(Mode.SingleShotTime)
+@OutputTimeUnit(MILLISECONDS)
+@Timeout(time = 10, timeUnit = TimeUnit.MINUTES)
+public class VariantProjectionBenchmark {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(VariantProjectionBenchmark.class);
+
+ /** Number of rows written per file. */
+ private static final int NUM_ROWS = 1_000_000;
+ /**
+ * General specification of the records doesn't declare any values within
the variant.
+ * The per-record metadata declares that.
+ */
+ public static final String UNSHREDDED_SCHEMA = "message vschema {"
+ + "required int64 id = 1;"
+ + "required int32 category = 2;"
+ + "required group nested (VARIANT(1)) = 3 {"
+ + " required binary metadata;"
+ + " required binary value;"
+ + " }"
+ + "}";
+
+ /**
+ * Detailed specification declaring all the columns as shredded variants.
+ */
+ public static final String SHREDDED_SCHEMA = "message vschema {"
+ + "required int64 id = 1;"
+ + "required int32 category = 2;"
+ + "required group nested (VARIANT(1)) = 3 {"
+ + " required binary metadata;"
+ + " optional binary value;"
+ + " optional group typed_value {"
+ + " required group idstr {"
+ + " optional binary value;"
+ + " optional binary typed_value (STRING);"
+ + " }"
+ + " required group varid {"
+ + " optional binary value;"
+ + " optional int64 typed_value;"
+ + " }"
+ + " required group varcategory {"
+ + " optional binary value;"
+ + " optional int32 typed_value;"
+ + " }"
+ + " required group longstr {"
+ + " optional binary value;"
+ + " optional binary typed_value (STRING);"
+ + " }"
+ + " required group uuid_entry {"
+ + " optional binary value;"
+ + " optional FIXED_LEN_BYTE_ARRAY(16) typed_value (UUID);"
+ + " }"
+ + " }"
+ + " }"
+ + "}";
+
+ /**
+ * The select schema is a lean subset of {@link #SHREDDED_SCHEMA}, containing
+ * only the variant column desired.
+ */
+ public static final String SELECT_SCHEMA = "message vschema {"
+ + "required int64 id = 1;"
+ + "required int32 category = 2;"
+ + "required group nested (VARIANT(1)) = 3 {"
+ + " required binary metadata;"
+ + " optional binary value;"
+ + " optional group typed_value {"
+ + " required group varcategory {"
+ + " optional binary value;"
+ + " optional int32 typed_value;"
+ + " }"
+ + " }"
+ + " }"
+ + "}";
+
+ /**
+ * Categories: limits uniqueness of category columns and longstr.
+ */
+ private static final int CATEGORIES = 20;
+
+ /** The longstr values, one per category. */
+ private static final String[] CATEGORY_STRINGS;
+
+ static {
+ CATEGORY_STRINGS = new String[CATEGORIES];
+ for (int i = 0; i < CATEGORIES; i++) {
+ CATEGORY_STRINGS[i] =
+ "longstr_category_" + i + " in a long string" +
"-0123456789-0123456789-0123456789-0123456789";
+ }
+ }
+
+ /** Table to use in benchmark. */
+ public enum TableType {
+ /** Parquet, no shedding. */
+ Unshredded,
+ /** Parquet, shedded. */
+ Shredded,
+ }
+
+ @Param({"Unshredded", "Shredded"})
+ private TableType tableType;
+
+ public boolean shredded() {
+ return tableType == TableType.Shredded;
+ }
+
+ /**
+ * The record schema with the unshredded variant.
+ */
+ private final MessageType unshreddedSchema;
+
+ /**
+ * The shredded schema splits all expected variant group entries
+ * into their own columns.
+ */
+ private final MessageType shreddedSchema;
+
+ /**
+ * Select schema.
+ * A subset of the {@link }
+ */
+ private final MessageType selectSchema;
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path dataFile;
+
+ public VariantProjectionBenchmark() {
+ // build the schemas.
+ // doing this in the constructor makes it slightly easier to debug
+ // schema errors.
+ unshreddedSchema = parseMessageType(UNSHREDDED_SCHEMA);
+ shreddedSchema = parseMessageType(SHREDDED_SCHEMA);
+ selectSchema = parseMessageType(SELECT_SCHEMA);
+ }
+
+ @Setup(Level.Trial)
+ public void setupBenchmarks() throws IOException {
+ conf = new Configuration();
+ // hadoop 3.4.3 turn off CRC and use direct nio range reads.
+ conf.setBoolean("fs.file.checksum.verify", false);
+ fs = FileSystem.getLocal(conf);
+ fs.mkdirs(BenchmarkFiles.targetDir);
+ // using different filenames assists with manual examination
+ // of the contents.
+ MessageType activeSchema;
+ if (shredded()) {
+ dataFile = new Path(BenchmarkFiles.targetDir, "shredded.parquet");
+ activeSchema = shreddedSchema;
+ } else {
+ dataFile = new Path(BenchmarkFiles.targetDir, "unshredded.parquet");
+ activeSchema = unshreddedSchema;
+ }
+ fs.delete(dataFile, false);
+ writeDataset(activeSchema, dataFile);
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ FileSystem fs = FileSystem.getLocal(conf);
+ fs.delete(BenchmarkFiles.targetDir, true);
+ }
+
+ private void writeDataset(final MessageType schema, final Path path) throws
IOException {
+ GroupType nestedGroup = schema.getType("nested").asGroupType();
+ try (ParquetWriter<RowRecord> writer =
+ new RowWriterBuilder(HadoopOutputFile.fromPath(path, conf), schema,
nestedGroup).build()) {
+ for (int i = 0; i < NUM_ROWS; i++) {
+ int category = i % CATEGORIES;
+ writer.write(new RowRecord(i, category, buildVariant(i, category,
CATEGORY_STRINGS[category])));
+ }
+ }
+ final FileStatus st = fs.getFileStatus(path);
+ LOG.info("Size of file {} size {}", path, st.getLen());
+ }
+
+ /**
+ * Reads the records, reconstructing the full record from the variant.
+ * @param blackhole black hole.
+ * @throws IOException IO failure.
+ */
+ @Benchmark
+ public void readAllRecords(Blackhole blackhole) throws IOException {
+ try (ParquetReader<RowRecord> reader =
+ new RowReaderBuilder(HadoopInputFile.fromPath(dataFile, conf),
false).build()) {
+ RowRecord row;
+ while ((row = reader.read()) != null) {
+ blackhole.consume(row.id);
+ blackhole.consume(row.category);
+ consumeField(row.variant, "varid", v ->
blackhole.consume(v.getLong()));
+ consumeField(row.variant, "varcategory", v ->
blackhole.consume(v.getInt()));
+ consumeField(row.variant, "idstr", v ->
blackhole.consume(v.getString()));
+ consumeField(row.variant, "longstr", v ->
blackhole.consume(v.getString()));
+ consumeField(row.variant, "uuid_entry", v ->
blackhole.consume(v.getUUID()));
+ }
+ }
+ }
+
+ /**
+ * Projected read, using {@link #SELECT_SCHEMA} as the record schema.
+ * @param blackhole black hole.
+ * @throws IOException IO failure.
+ */
+ @Benchmark
+ public void readProjectedLeanSchema(Blackhole blackhole) throws IOException {
+ try (ParquetReader<RowRecord> reader =
+ new RowReaderBuilder(HadoopInputFile.fromPath(dataFile, conf),
true).build()) {
+ consumeProjectedFields(blackhole, reader);
+ }
+ }
+
+ /**
+ * Consume one nested field.
+ * @param nested base nested group
+ * @param key key
+ * @param consume consume operation.
+ */
+ private void consumeField(Variant nested, String key, Consumer<Variant>
consume) {
+ Variant variant = nested.getFieldByKey(key);
+ if (variant != null) {
+ consume.accept(variant);
+ }
+ }
+
+ /**
+ * Consume only those fields which are in the projection schema.
+ * Other variant columns may or may not be present.
+ * @param blackhole black hole.
+ * @param reader reader.
+ * @throws IOException IO failure.
+ */
+ private void consumeProjectedFields(final Blackhole blackhole, final
ParquetReader<RowRecord> reader)
+ throws IOException {
+ RowRecord row;
+ while ((row = reader.read()) != null) {
+ blackhole.consume(row.id);
+ blackhole.consume(row.category);
+ consumeField(row.variant, "varcategory", v ->
blackhole.consume(v.getInt()));
+ }
+ }
+
+ /**
+ * Read projected with the file schema, not the leaner one.
+ * @throws IOException IO failure.
+ */
+ @Benchmark
+ public void readProjectedFileSchema(Blackhole blackhole) throws IOException {
+ try (ParquetReader<RowRecord> reader =
+ new RowReaderBuilder(HadoopInputFile.fromPath(dataFile, conf),
false).build()) {
+ consumeProjectedFields(blackhole, reader);
+ }
+ }
+
+ /**
+ * Build the nested variant structure.
+ *
+ * @param id row ID
+ * @param category category
+ * @param longstr string for column 4
+ *
+ * @return a variant
+ */
+ private static Variant buildVariant(long id, int category, String longstr) {
+ VariantBuilder builder = new VariantBuilder();
+ VariantObjectBuilder obj = builder.startObject();
+ obj.appendKey("idstr");
+ obj.appendString("item_" + id);
+ obj.appendKey("varid");
+ obj.appendLong(id);
+ obj.appendKey("varcategory");
+ obj.appendInt(category);
+ obj.appendKey("longstr");
+ obj.appendString(longstr);
+ obj.appendKey("uuid_entry");
+ obj.appendUUID(UUID.randomUUID());
+ builder.endObject();
+ return builder.build();
+ }
+
+ // ------------------------------------------------------------------
+ // Row record
+ // ------------------------------------------------------------------
+
+ /** A single row: integer id and category columns plus a nested variant. */
+ private static final class RowRecord {
+ final long id;
+ final int category;
+ final Variant variant;
+
+ RowRecord(long id, int category, Variant variant) {
+ this.id = id;
+ this.category = category;
+ this.variant = variant;
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // Write support
+ // ------------------------------------------------------------------
+
+ /** {@link ParquetWriter.Builder} for {@link RowRecord} values. */
+ private static final class RowWriterBuilder extends
ParquetWriter.Builder<RowRecord, RowWriterBuilder> {
+ private final MessageType schema;
+ private final GroupType nestedGroup;
+
+ RowWriterBuilder(OutputFile file, MessageType schema, GroupType
nestedGroup) {
+ super(file);
+ this.schema = schema;
+ this.nestedGroup = nestedGroup;
+ }
+
+ @Override
+ protected RowWriterBuilder self() {
+ return this;
+ }
+
+ @Override
+ protected WriteSupport<RowRecord> getWriteSupport(Configuration conf) {
+ return new RowWriteSupport(schema, nestedGroup);
+ }
+ }
+
+ /**
+ * {@link WriteSupport} that writes {@code id} (INT64), {@code category}
(INT32), and
+ * {@code nested} (VARIANT group) fields for each {@link RowRecord}.
+ */
+ private static final class RowWriteSupport extends WriteSupport<RowRecord> {
+ private final MessageType schema;
+ private final GroupType nestedGroup;
+ private RecordConsumer consumer;
+
+ RowWriteSupport(MessageType schema, GroupType nestedGroup) {
+ this.schema = schema;
+ this.nestedGroup = nestedGroup;
+ }
+
+ @Override
+ public WriteContext init(Configuration conf) {
+ return new WriteContext(schema, Collections.emptyMap());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.consumer = recordConsumer;
+ }
+
+ @Override
+ public void write(RowRecord record) {
+ consumer.startMessage();
+ consumer.startField("id", 0);
+ consumer.addLong(record.id);
+ consumer.endField("id", 0);
+ consumer.startField("category", 1);
+ consumer.addInteger(record.category);
+ consumer.endField("category", 1);
+ consumer.startField("nested", 2);
+ VariantValueWriter.write(consumer, nestedGroup, record.variant);
+ consumer.endField("nested", 2);
+ consumer.endMessage();
+ }
+ }
+
+ // ------------------------------------------------------------------
+ // Read support
+ // ------------------------------------------------------------------
+
+ /**
+ * {@link ParquetReader.Builder} for {@link RowRecord} values.
+ *
+ */
+ private final class RowReaderBuilder extends
ParquetReader.Builder<RowRecord> {
+ private final boolean useSelectSchema;
+
+ /**
+ * Row reader builder.
+ * @param file file to read.
+ * @param useSelectSchema true to project using {@link #selectSchema};
false to use the full
+ * file schema.
+ */
+ RowReaderBuilder(InputFile file, boolean useSelectSchema) {
+ super(file);
+ this.useSelectSchema = useSelectSchema;
+ }
+
+ @Override
+ protected ReadSupport<RowRecord> getReadSupport() {
+ return new RowReadSupport(useSelectSchema);
+ }
+ }
+
+ /**
+ * {@link ReadSupport} that materializes each row as a {@link RowRecord}.
+ * When {@code useSelectSchema} is true and the file contains shredded typed
columns,
+ * the read is projected to {@link #selectSchema} so unneeded columns are
skipped.
+ */
+ private final class RowReadSupport extends ReadSupport<RowRecord> {
+ private final boolean useSelectSchema;
+
+ RowReadSupport(boolean useSelectSchema) {
+ this.useSelectSchema = useSelectSchema;
+ }
+
+ @Override
+ public ReadContext init(InitContext context) {
+ MessageType fileSchema = useSelectSchema ? selectSchema :
context.getFileSchema();
+ return new ReadContext(fileSchema);
+ }
+
+ @Override
+ public RecordMaterializer<RowRecord> prepareForRead(
+ Configuration conf,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ GroupType nestedGroup = requestedSchema.getType("nested").asGroupType();
+ return new RowRecordMaterializer(requestedSchema, nestedGroup);
+ }
+ }
+
+ /** Materializes a {@link RowRecord} from any schema containing {@code id},
{@code category}, and {@code nested}. */
+ private static final class RowRecordMaterializer extends
RecordMaterializer<RowRecord> {
+ private final MessageConverter root;
+
+ RowRecordMaterializer(MessageType schema, GroupType nestedGroup) {
+ this.root = new MessageConverter(schema, nestedGroup);
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+
+ @Override
+ public RowRecord getCurrentRecord() {
+ return root.getCurrentRecord();
+ }
+ }
+
+ /**
+ * Root {@link GroupConverter} for a message containing {@code id}, {@code
category}, and
+ * {@code nested}. Field indices are resolved dynamically from the schema so
both the full file
+ * schema and projected schemas are handled correctly.
+ */
+ private static final class MessageConverter extends GroupConverter {
+
+ private final PrimitiveConverter idConverter;
+ private final PrimitiveConverter categoryConverter;
+ private final RowVariantGroupConverter variantConverter;
+ private long id;
+ private int category;
+
+ MessageConverter(MessageType schema, GroupType nestedGroup) {
+ idConverter = new PrimitiveConverter() {
+ @Override
+ public void addLong(long value) {
+ id = value;
+ }
+ };
+ categoryConverter = new PrimitiveConverter() {
+ @Override
+ public void addInt(int value) {
+ category = value;
+ }
+ };
+ variantConverter = new RowVariantGroupConverter(nestedGroup);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ switch (fieldIndex) {
+ case 0:
+ return idConverter;
+ case 1:
+ return categoryConverter;
+ case 2:
+ return variantConverter;
+ default:
+ throw new IllegalArgumentException("Unknown field index: " +
fieldIndex);
+ }
+ }
+
+ @Override
+ public void start() {
+ id = -1;
+ category = -1;
+ }
+
+ @Override
+ public void end() {}
+
+ RowRecord getCurrentRecord() {
+ return new RowRecord(id, category, variantConverter.getCurrentVariant());
+ }
+ }
+
+ /**
+ * {@link GroupConverter} for the {@code nested} variant field.
+ */
+ private static final class RowVariantGroupConverter extends GroupConverter
+ implements VariantConverters.ParentConverter<VariantBuilder> {
+ private final GroupConverter wrapped;
+ private VariantBuilder builder;
+ private ImmutableMetadata metadata;
+ private Variant currentVariant;
+
+ RowVariantGroupConverter(GroupType variantGroup) {
+ this.wrapped = VariantConverters.newVariantConverter(variantGroup,
this::setMetadata, this);
+ }
+
+ private void setMetadata(ByteBuffer buf) {
+ this.metadata = new ImmutableMetadata(buf);
+ }
+
+ @Override
+ public void build(Consumer<VariantBuilder> consumer) {
+ if (builder == null) {
+ builder = new VariantBuilder(metadata);
+ }
+ consumer.accept(builder);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return wrapped.getConverter(fieldIndex);
+ }
+
+ @Override
+ public void start() {
+ builder = null;
+ metadata = null;
+ wrapped.start();
+ }
+
+ @Override
+ public void end() {
+ wrapped.end();
+ if (builder == null) {
+ builder = new VariantBuilder(metadata);
+ }
+ builder.appendNullIfEmpty();
+ currentVariant = builder.build();
+ }
+
+ Variant getCurrentVariant() {
+ return currentVariant;
+ }
+ }
+}