This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 22d194f5d6 Parquet: Add variant array reader in Parquet (#12512) 22d194f5d6 is described below commit 22d194f5d685fdf5bec17c6bcc92a69db4ae4957 Author: Aihua Xu <aihu...@gmail.com> AuthorDate: Fri Apr 25 16:17:10 2025 -0700 Parquet: Add variant array reader in Parquet (#12512) --- .../org/apache/iceberg/variants/ValueArray.java | 130 ++++++ .../java/org/apache/iceberg/variants/Variants.java | 4 + .../iceberg/variants/TestShreddedObject.java | 65 +-- .../apache/iceberg/variants/TestValueArray.java | 166 +++++++ .../iceberg/parquet/ParquetVariantReaders.java | 61 +++ .../iceberg/parquet/ParquetVariantVisitor.java | 1 + .../iceberg/parquet/VariantReaderBuilder.java | 16 +- .../apache/iceberg/parquet/TestVariantReaders.java | 498 ++++++++++++++++++++- 8 files changed, 874 insertions(+), 67 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/variants/ValueArray.java b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java new file mode 100644 index 0000000000..3da79bcef1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class ValueArray implements VariantArray { + private SerializationState serializationState = null; + private List<VariantValue> elements = Lists.newArrayList(); + + ValueArray() {} + + @Override + public VariantValue get(int index) { + return elements.get(index); + } + + @Override + public int numElements() { + return elements.size(); + } + + public void add(VariantValue value) { + elements.add(value); + this.serializationState = null; + } + + @Override + public int sizeInBytes() { + if (null == serializationState) { + this.serializationState = new SerializationState(elements); + } + + return serializationState.size(); + } + + @Override + public int writeTo(ByteBuffer buffer, int offset) { + Preconditions.checkArgument( + buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big endian"); + + if (null == serializationState) { + this.serializationState = new SerializationState(elements); + } + + return serializationState.writeTo(buffer, offset); + } + + /** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */ + private static class SerializationState { + private final List<VariantValue> elements; + private final int numElements; + private final boolean isLarge; + private final int dataSize; + private final int offsetSize; + + private SerializationState(List<VariantValue> elements) { + this.elements = elements; + this.numElements = elements.size(); + this.isLarge = numElements > 0xFF; + + int totalDataSize = 0; + for (VariantValue value : elements) { + totalDataSize += value.sizeInBytes(); + } + + this.dataSize = totalDataSize; + this.offsetSize = VariantUtil.sizeOf(totalDataSize); + } + + private int size() { + return 1 /* header */ + + (isLarge ? 4 : 1) /* num elements size */ + + (1 + numElements) * offsetSize /* offset list size */ + + dataSize; + } + + private int writeTo(ByteBuffer buffer, int offset) { + int offsetListOffset = + offset + 1 /* header size */ + (isLarge ? 4 : 1) /* num elements size */; + int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); + byte header = VariantUtil.arrayHeader(isLarge, offsetSize); + + VariantUtil.writeByte(buffer, header, offset); + VariantUtil.writeLittleEndianUnsigned(buffer, numElements, offset + 1, isLarge ? 4 : 1); + + // Insert element offsets + int nextValueOffset = 0; + int index = 0; + for (VariantValue element : elements) { + // write the data offset + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // write the data + int valueSize = element.writeTo(buffer, dataOffset + nextValueOffset); + + nextValueOffset += valueSize; + index += 1; + } + + // write the final size of the data section + VariantUtil.writeLittleEndianUnsigned( + buffer, nextValueOffset, offsetListOffset + (index * offsetSize), offsetSize); + + // return the total size + return (dataOffset - offset) + dataSize; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java b/core/src/main/java/org/apache/iceberg/variants/Variants.java index d5f8cb4ae6..5591145ca6 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variants.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java @@ -121,6 +121,10 @@ public class Variants { return VariantUtil.readByte(valueBuffer, 0) == 0; } + public static ValueArray array() { + return new ValueArray(); + } + public static <T> VariantPrimitive<T> of(PhysicalType type, T value) { return new PrimitiveWrapper<>(type, value); } diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java index 6707ae6651..66d5c9911a 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java @@ -217,11 +217,12 @@ public class TestShreddedObject { .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12")); } - @Test - public void testTwoByteOffsets() { - // a string larger than 255 bytes to push the value offset size above 1 byte - String randomString = RandomUtil.generateString(300, random); - SerializedPrimitive bigString = VariantTestUtil.createString(randomString); + @ParameterizedTest + @ValueSource(ints = {300, 70_000, 16_777_300}) + public void testMultiByteOffsets(int len) { + // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes + String randomString = RandomUtil.generateString(len, random); + VariantPrimitive<String> bigString = Variants.of(randomString); Map<String, VariantValue> data = Maps.newHashMap(); data.putAll(FIELDS); @@ -244,60 +245,6 @@ public class TestShreddedObject { assertThat(object.get("big").asPrimitive().get()).isEqualTo(randomString); } - @Test - public void testThreeByteOffsets() { - // a string larger than 65535 bytes to push the value offset size above 2 bytes - String randomString = RandomUtil.generateString(70_000, random); - SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); - - Map<String, VariantValue> data = Maps.newHashMap(); - data.putAll(FIELDS); - data.put("really-big", reallyBigString); - - ShreddedObject shredded = createShreddedObject(data); - VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata()); - - assertThat(value.type()).isEqualTo(PhysicalType.OBJECT); - SerializedObject object = (SerializedObject) value; - assertThat(object.numFields()).isEqualTo(4); - - assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32); - assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); - assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); - assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4); - assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); - assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); - } - - @Test - public void testFourByteOffsets() { - // a string larger than 16777215 bytes to push the value offset size above 3 bytes - String randomString = RandomUtil.generateString(16_777_300, random); - SerializedPrimitive reallyBigString = VariantTestUtil.createString(randomString); - - Map<String, VariantValue> data = Maps.newHashMap(); - data.putAll(FIELDS); - data.put("really-big", reallyBigString); - - ShreddedObject shredded = createShreddedObject(data); - VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata()); - - assertThat(value.type()).isEqualTo(PhysicalType.OBJECT); - SerializedObject object = (SerializedObject) value; - assertThat(object.numFields()).isEqualTo(4); - - assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32); - assertThat(object.get("a").asPrimitive().get()).isEqualTo(34); - assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg"); - assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4); - assertThat(object.get("c").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); - assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING); - assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java new file mode 100644 index 0000000000..f500f61065 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.List; +import java.util.Random; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.RandomUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestValueArray { + private static final VariantMetadata EMPTY_METADATA = Variants.emptyMetadata(); + private static final List<VariantValue> ELEMENTS = + ImmutableList.of( + Variants.of(34), Variants.of("iceberg"), Variants.of(new BigDecimal("12.21"))); + + private final Random random = new Random(871925); + + @Test + public void testElementAccess() { + ValueArray arr = createArray(ELEMENTS); + + assertThat(arr.numElements()).isEqualTo(3); + assertThat(arr.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(arr.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(arr.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(arr.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testSerializationMinimalBuffer() { + ValueArray arr = createArray(ELEMENTS); + + VariantValue value = roundTripMinimalBuffer(arr); + + assertThat(value).isInstanceOf(SerializedArray.class); + SerializedArray actual = (SerializedArray) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @Test + public void testSerializationLargeBuffer() { + ValueArray arr = createArray(ELEMENTS); + + VariantValue value = roundTripLargeBuffer(arr); + + assertThat(value).isInstanceOf(SerializedArray.class); + SerializedArray actual = (SerializedArray) value; + + assertThat(actual.numElements()).isEqualTo(3); + assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class); + assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + } + + @ParameterizedTest + @ValueSource(ints = {300, 70_000, 16_777_300}) + public void testMultiByteOffsets(int len) { + // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, and 4 bytes + String randomString = RandomUtil.generateString(len, random); + VariantPrimitive<String> bigString = Variants.of(randomString); + + List<VariantValue> data = Lists.newArrayList(); + data.addAll(ELEMENTS); + data.add(bigString); + + ValueArray shredded = createArray(data); + VariantValue value = roundTripLargeBuffer(shredded); + + assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray actualArray = (SerializedArray) value; + assertThat(actualArray.numElements()).isEqualTo(4); + + assertThat(actualArray.get(0).type()).isEqualTo(PhysicalType.INT32); + assertThat(actualArray.get(0).asPrimitive().get()).isEqualTo(34); + assertThat(actualArray.get(1).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualArray.get(1).asPrimitive().get()).isEqualTo("iceberg"); + assertThat(actualArray.get(2).type()).isEqualTo(PhysicalType.DECIMAL4); + assertThat(actualArray.get(2).asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); + assertThat(actualArray.get(3).type()).isEqualTo(PhysicalType.STRING); + assertThat(actualArray.get(3).asPrimitive().get()).isEqualTo(randomString); + } + + @Test + public void testLargeArray() { + List<VariantValue> elements = Lists.newArrayList(); + for (int i = 0; i < 10_000; i += 1) { + elements.add(Variants.of(RandomUtil.generateString(10, random))); + } + + ValueArray arr = createArray(elements); + VariantValue value = roundTripLargeBuffer(arr); + + assertThat(value.type()).isEqualTo(PhysicalType.ARRAY); + SerializedArray actualArray = (SerializedArray) value; + assertThat(actualArray.numElements()).isEqualTo(10_000); + + for (int i = 0; i < 10_000; i++) { + VariantTestUtil.assertEqual(elements.get(i), actualArray.get(i)); + } + } + + private static VariantValue roundTripMinimalBuffer(ValueArray arr) { + ByteBuffer serialized = ByteBuffer.allocate(arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + arr.writeTo(serialized, 0); + + return Variants.value(EMPTY_METADATA, serialized); + } + + private static VariantValue roundTripLargeBuffer(ValueArray arr) { + ByteBuffer serialized = + ByteBuffer.allocate(1000 + arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + arr.writeTo(serialized, 300); + + ByteBuffer slice = serialized.duplicate().order(ByteOrder.LITTLE_ENDIAN); + slice.position(300); + slice.limit(300 + arr.sizeInBytes()); + + return Variants.value(EMPTY_METADATA, slice); + } + + private static ValueArray createArray(List<VariantValue> elements) { + ValueArray arr = new ValueArray(); + for (VariantValue element : elements) { + arr.add(element); + } + + return arr; + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java index 3e5635958c..40b0aeecc3 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java @@ -31,6 +31,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; @@ -95,6 +96,14 @@ public class ParquetVariantReaders { fieldReaders); } + public static VariantValueReader array( + int repeatedDefinitionLevel, + int repeatedRepetitionLevel, + ParquetValueReader<?> elementReader) { + return new ArrayReader( + repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) elementReader); + } + public static VariantValueReader asVariant(PhysicalType type, ParquetValueReader<?> reader) { return new ValueAsVariantReader<>(type, reader); } @@ -332,6 +341,58 @@ public class ParquetVariantReaders { } } + private static class ArrayReader implements VariantValueReader { + private final int definitionLevel; + private final int repetitionLevel; + private final VariantValueReader reader; + private final TripleIterator<?> column; + private final List<TripleIterator<?>> children; + + protected ArrayReader(int definitionLevel, int repetitionLevel, VariantValueReader reader) { + this.definitionLevel = definitionLevel; + this.repetitionLevel = repetitionLevel; + this.reader = reader; + this.column = reader.column(); + this.children = reader.columns(); + } + + @Override + public void setPageSource(PageReadStore pageStore) { + reader.setPageSource(pageStore); + } + + @Override + public ValueArray read(VariantMetadata metadata) { + ValueArray arr = Variants.array(); + do { + if (column.currentDefinitionLevel() > definitionLevel) { + VariantValue value = reader.read(metadata); + arr.add(value != null ? value : Variants.ofNull()); + } else { + // consume the empty list triple + for (TripleIterator<?> child : children) { + child.nextNull(); + } + // if the current definition level is equal to the definition level of this repeated type, + // then the result is an empty list and the repetition level will always be <= rl. + break; + } + } while (column.currentRepetitionLevel() > repetitionLevel); + + return arr; + } + + @Override + public TripleIterator<?> column() { + return column; + } + + @Override + public List<TripleIterator<?>> columns() { + return children; + } + } + private static class VariantReader implements ParquetValueReader<Variant> { private final ParquetValueReader<VariantMetadata> metadataReader; private final VariantValueReader valueReader; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java index 71d2eb2662..d0ca00b193 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java @@ -31,6 +31,7 @@ public abstract class ParquetVariantVisitor<R> { static final String METADATA = "metadata"; static final String VALUE = "value"; static final String TYPED_VALUE = "typed_value"; + static final String LIST = "list"; /** * Handles the root variant column group. diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java index df41c5aa60..29ca900346 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java @@ -66,8 +66,8 @@ public class VariantReaderBuilder extends ParquetVariantVisitor<ParquetValueRead return Streams.concat(Streams.stream(basePath), fieldNames.stream()).toArray(String[]::new); } - private String[] path(String name) { - return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(name)) + private String[] path(String... names) { + return Streams.concat(Streams.stream(basePath), fieldNames.stream(), Stream.of(names)) .toArray(String[]::new); } @@ -162,8 +162,16 @@ public class VariantReaderBuilder extends ParquetVariantVisitor<ParquetValueRead @Override public VariantValueReader array( - GroupType array, ParquetValueReader<?> valueResult, ParquetValueReader<?> elementResult) { - throw new UnsupportedOperationException("Array is not yet supported"); + GroupType array, ParquetValueReader<?> valueReader, ParquetValueReader<?> elementReader) { + int valueDL = + valueReader != null ? schema.getMaxDefinitionLevel(path(VALUE)) - 1 : Integer.MAX_VALUE; + int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1; + int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)) - 1; + int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)) - 1; + VariantValueReader typedReader = + ParquetVariantReaders.array(repeatedDL, repeatedRL, elementReader); + + return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, typedReader); } private static class LogicalTypeToVariantReader diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java index b0299762f7..23c6e9b328 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java @@ -48,6 +48,7 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.VariantType; import org.apache.iceberg.variants.PhysicalType; import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; import org.apache.iceberg.variants.Variant; import org.apache.iceberg.variants.VariantMetadata; import org.apache.iceberg.variants.VariantObject; @@ -57,6 +58,8 @@ import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.variants.Variants; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.conf.ParquetConfiguration; +import org.apache.parquet.conf.PlainParquetConfiguration; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.schema.GroupType; @@ -135,6 +138,15 @@ public class TestVariantReaders { Variants.ofUUID("f24f9b64-81fa-49d1-b74e-8c09a6e31c56"), }; + // Required configuration to convert between Avro and Parquet schemas with 3-level list structure + private static final ParquetConfiguration CONF = + new PlainParquetConfiguration( + Map.of( + AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, + "false", + AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, + "false")); + private static Stream<Arguments> metadataAndValues() { Stream<Arguments> primitives = Stream.of(PRIMITIVES).map(variant -> Arguments.of(EMPTY_METADATA, variant)); @@ -255,7 +267,7 @@ public class TestVariantReaders { } @Test - public void testValueAndTypedValueConflict() throws IOException { + public void testValueAndTypedValueConflict() { GroupType variantType = variant("var", 2, shreddedPrimitive(PrimitiveTypeName.INT32)); MessageType parquetSchema = parquetSchema(variantType); @@ -885,6 +897,460 @@ public class TestVariantReaders { VariantTestUtil.assertEqual(expectedThree, actualThreeVariant.value()); } + @Test + public void testSimpleArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testNullArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(element(shreddedType))); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord var = + record( + variantType, + Map.of( + "metadata", + VariantTestUtil.emptyMetadata(), + "value", + serialize(Variants.ofNull()))); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value()); + } + + @Test + public void testEmptyArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType variantType = variant("var", 2, list(element(shreddedType))); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr = List.of(); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + Record actual = writeAndRead(parquetSchema, row); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualVariant.value().asArray().numElements()).isEqualTo(0); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + } + + @Test + public void testArrayWithNull() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("value", serialize(Variants.ofNull()))), + record(elementType, Map.of("typed_value", "drama"))); + + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.ofNull()); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testNestedArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType outerElementType = element(list(elementType)); + GroupType variantType = variant("var", 2, list(outerElementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> inner1 = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + List<GenericRecord> outer1 = + List.of( + record(outerElementType, Map.of("typed_value", inner1)), + record(outerElementType, Map.of("typed_value", List.of()))); + GenericRecord var = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", outer1)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + ValueArray expectedInner1 = Variants.array(); + expectedInner1.add(Variants.of("comedy")); + expectedInner1.add(Variants.of("drama")); + ValueArray expectedInner2 = Variants.array(); + expectedArray.add(expectedInner1); + expectedArray.add(expectedInner2); + + Record actual = writeAndRead(parquetSchema, row); + + // Verify + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testArrayWithNestedObject() throws IOException { + GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32)); + GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, STRING)); + GroupType shreddedFields = objectFields(fieldA, fieldB); + GroupType elementType = element(shreddedFields); + GroupType listType = list(elementType); + GroupType variantType = variant("var", 2, listType); + MessageType parquetSchema = parquetSchema(variantType); + + // Row 1 with nested fully shredded object + GenericRecord shredded1 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 1)), + "b", + record(fieldB, Map.of("typed_value", "comedy")))); + GenericRecord shredded2 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 2)), + "b", + record(fieldB, Map.of("typed_value", "drama")))); + List<GenericRecord> arr1 = + List.of( + record(elementType, Map.of("typed_value", shredded1)), + record(elementType, Map.of("typed_value", shredded2))); + GenericRecord var1 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr1)); + GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + + ValueArray expected1 = Variants.array(); + ShreddedObject expectedElement1 = Variants.object(TEST_METADATA); + expectedElement1.put("a", Variants.of(1)); + expectedElement1.put("b", Variants.of("comedy")); + expected1.add(expectedElement1); + ShreddedObject expectedElement2 = Variants.object(TEST_METADATA); + expectedElement2.put("a", Variants.of(2)); + expectedElement2.put("b", Variants.of("drama")); + expected1.add(expectedElement2); + + // Row 2 with nested partially shredded object + GenericRecord shredded3 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 3)), + "b", + record(fieldB, Map.of("typed_value", "action")))); + ShreddedObject baseObject3 = Variants.object(TEST_METADATA); + baseObject3.put("c", Variants.of("str")); + + GenericRecord shredded4 = + record( + shreddedFields, + Map.of( + "a", + record(fieldA, Map.of("typed_value", 4)), + "b", + record(fieldB, Map.of("typed_value", "horror")))); + ShreddedObject baseObject4 = Variants.object(TEST_METADATA); + baseObject4.put("d", Variants.ofIsoDate("2024-01-30")); + + List<GenericRecord> arr2 = + List.of( + record(elementType, Map.of("value", serialize(baseObject3), "typed_value", shredded3)), + record(elementType, Map.of("value", serialize(baseObject4), "typed_value", shredded4))); + GenericRecord var2 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr2)); + GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + + ValueArray expected2 = Variants.array(); + ShreddedObject expectedElement3 = Variants.object(TEST_METADATA); + expectedElement3.put("a", Variants.of(3)); + expectedElement3.put("b", Variants.of("action")); + expectedElement3.put("c", Variants.of("str")); + expected2.add(expectedElement3); + ShreddedObject expectedElement4 = Variants.object(TEST_METADATA); + expectedElement4.put("a", Variants.of(4)); + expectedElement4.put("b", Variants.of("horror")); + expectedElement4.put("d", Variants.ofIsoDate("2024-01-30")); + expected2.add(expectedElement4); + + // verify + List<Record> actual = writeAndRead(parquetSchema, List.of(row1, row2)); + Record actual1 = actual.get(0); + assertThat(actual1.getField("id")).isEqualTo(1); + assertThat(actual1.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant1 = (Variant) actual1.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant1.metadata()); + VariantTestUtil.assertEqual(expected1, actualVariant1.value()); + + Record actual2 = actual.get(1); + assertThat(actual2.getField("id")).isEqualTo(2); + assertThat(actual2.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant2 = (Variant) actual2.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant2.metadata()); + VariantTestUtil.assertEqual(expected2, actualVariant2.value()); + } + + @Test + public void testArrayWithNonArray() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr1 = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + GenericRecord var1 = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr1)); + GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1)); + + ValueArray expectedArray1 = Variants.array(); + expectedArray1.add(Variants.of("comedy")); + expectedArray1.add(Variants.of("drama")); + + GenericRecord var2 = + record( + variantType, + Map.of( + "metadata", VariantTestUtil.emptyMetadata(), "value", serialize(Variants.of(34)))); + GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2)); + + VariantValue expectedValue2 = Variants.of(PhysicalType.INT32, 34); + + GenericRecord var3 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "value", TEST_OBJECT_BUFFER)); + GenericRecord row3 = record(parquetSchema, Map.of("id", 3, "var", var3)); + + ShreddedObject expectedObject3 = Variants.object(TEST_METADATA); + expectedObject3.put("a", Variants.ofNull()); + expectedObject3.put("d", Variants.of("iceberg")); + + // Test array is read properly after a non-array + List<GenericRecord> arr4 = + List.of( + record(elementType, Map.of("typed_value", "action")), + record(elementType, Map.of("typed_value", "horror"))); + GenericRecord var4 = + record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "typed_value", arr4)); + GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4)); + + ValueArray expectedArray4 = Variants.array(); + expectedArray4.add(Variants.of("action")); + expectedArray4.add(Variants.of("horror")); + + List<Record> actual = writeAndRead(parquetSchema, List.of(row1, row2, row3, row4)); + + // Verify + Record actual1 = actual.get(0); + assertThat(actual1.getField("id")).isEqualTo(1); + assertThat(actual1.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant1 = (Variant) actual1.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant1.metadata()); + VariantTestUtil.assertEqual(expectedArray1, actualVariant1.value()); + + Record actual2 = actual.get(1); + assertThat(actual2.getField("id")).isEqualTo(2); + assertThat(actual2.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant2 = (Variant) actual2.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant2.metadata()); + VariantTestUtil.assertEqual(expectedValue2, actualVariant2.value()); + + Record actual3 = actual.get(2); + assertThat(actual3.getField("id")).isEqualTo(3); + assertThat(actual3.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant3 = (Variant) actual3.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant3.metadata()); + VariantTestUtil.assertEqual(expectedObject3, actualVariant3.value()); + + Record actual4 = actual.get(3); + assertThat(actual4.getField("id")).isEqualTo(4); + assertThat(actual4.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant4 = (Variant) actual4.getField("var"); + VariantTestUtil.assertEqual(TEST_METADATA, actualVariant4.metadata()); + VariantTestUtil.assertEqual(expectedArray4, actualVariant4.value()); + } + + @Test + public void testArrayMissingValueColumn() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = + Types.buildGroup(Type.Repetition.OPTIONAL) + .id(2) + .required(PrimitiveTypeName.BINARY) + .named("metadata") + .addField(list(elementType)) + .named("var"); + + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testArrayMissingElementValueColumn() throws IOException { + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = + Types.buildGroup(Type.Repetition.REQUIRED).addField(shreddedType).named("element"); + + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + List<GenericRecord> arr = + List.of( + record(elementType, Map.of("typed_value", "comedy")), + record(elementType, Map.of("typed_value", "drama"))); + GenericRecord var = + record( + variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", arr)); + GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var)); + + ValueArray expectedArray = Variants.array(); + expectedArray.add(Variants.of("comedy")); + expectedArray.add(Variants.of("drama")); + + Record actual = writeAndRead(parquetSchema, row); + + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantTestUtil.assertEqual(expectedArray, actualVariant.value()); + } + + @Test + public void testArrayWithElementNullValueAndNullTypedValue() throws IOException { + // Test the invalid case that both value and typed_value of an element are null + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord element = record(elementType, Map.of()); + GenericRecord variant = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", List.of(element))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + Record actual = writeAndRead(parquetSchema, record); + assertThat(actual.getField("id")).isEqualTo(1); + assertThat(actual.getField("var")).isInstanceOf(Variant.class); + + Variant actualVariant = (Variant) actual.getField("var"); + VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata()); + VariantValue actualValue = actualVariant.value(); + assertThat(actualValue.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(actualValue.asArray().numElements()).isEqualTo(1); + VariantTestUtil.assertEqual(Variants.ofNull(), actualValue.asArray().get(0)); + } + + @Test + public void testArrayWithElementValueTypedValueConflict() { + // Test the invalid case that both value and typed_value of an element are not null + Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING); + GroupType elementType = element(shreddedType); + GroupType variantType = variant("var", 2, list(elementType)); + MessageType parquetSchema = parquetSchema(variantType); + + GenericRecord element = + record(elementType, Map.of("value", serialize(Variants.of(3)), "typed_value", "comedy")); + GenericRecord variant = + record( + variantType, + Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", List.of(element))); + GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant)); + + assertThatThrownBy(() -> writeAndRead(parquetSchema, record)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Invalid variant, conflicting value and typed_value"); + } + private static ByteBuffer serialize(VariantValue value) { ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); value.writeTo(buffer, 0); @@ -943,7 +1409,7 @@ public class TestVariantReaders { OutputFile outputFile = new InMemoryOutputFile(); try (ParquetWriter<GenericRecord> writer = - new TestWriterBuilder(outputFile).withFileType(parquetSchema).build()) { + new TestWriterBuilder(outputFile).withFileType(parquetSchema).withConf(CONF).build()) { for (GenericRecord record : records) { writer.write(record); } @@ -1104,14 +1570,38 @@ public class TestVariantReaders { .named(name); } + private static GroupType element(Type shreddedType) { + return field("element", shreddedType); + } + + private static GroupType list(GroupType elementType) { + return Types.optionalList().element(elementType).named("typed_value"); + } + + private static void checkListType(GroupType listType) { + // Check the list is a 3-level structure + Preconditions.checkArgument( + listType.getFieldCount() == 1 + && listType.getFields().get(0).isRepetition(Type.Repetition.REPEATED), + "Invalid list type: does not contain single repeated field: %s", + listType); + + GroupType repeated = listType.getFields().get(0).asGroupType(); + Preconditions.checkArgument( + repeated.getFieldCount() == 1 + && repeated.getFields().get(0).isRepetition(Type.Repetition.REQUIRED), + "Invalid list type: does not contain single required subfield: %s", + listType); + } + private static org.apache.avro.Schema avroSchema(GroupType schema) { if (schema instanceof MessageType) { - return new AvroSchemaConverter().convert((MessageType) schema); + return new AvroSchemaConverter(CONF).convert((MessageType) schema); } else { MessageType wrapped = Types.buildMessage().addField(schema).named("table"); org.apache.avro.Schema avro = - new AvroSchemaConverter().convert(wrapped).getFields().get(0).schema(); + new AvroSchemaConverter(CONF).convert(wrapped).getFields().get(0).schema(); switch (avro.getType()) { case RECORD: return avro;