This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 22d194f5d6 Parquet: Add variant array reader in Parquet (#12512)
22d194f5d6 is described below

commit 22d194f5d685fdf5bec17c6bcc92a69db4ae4957
Author: Aihua Xu <aihu...@gmail.com>
AuthorDate: Fri Apr 25 16:17:10 2025 -0700

    Parquet: Add variant array reader in Parquet (#12512)
---
 .../org/apache/iceberg/variants/ValueArray.java    | 130 ++++++
 .../java/org/apache/iceberg/variants/Variants.java |   4 +
 .../iceberg/variants/TestShreddedObject.java       |  65 +--
 .../apache/iceberg/variants/TestValueArray.java    | 166 +++++++
 .../iceberg/parquet/ParquetVariantReaders.java     |  61 +++
 .../iceberg/parquet/ParquetVariantVisitor.java     |   1 +
 .../iceberg/parquet/VariantReaderBuilder.java      |  16 +-
 .../apache/iceberg/parquet/TestVariantReaders.java | 498 ++++++++++++++++++++-
 8 files changed, 874 insertions(+), 67 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/variants/ValueArray.java 
b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java
new file mode 100644
index 0000000000..3da79bcef1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/variants/ValueArray.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.variants;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class ValueArray implements VariantArray {
+  private SerializationState serializationState = null;
+  private List<VariantValue> elements = Lists.newArrayList();
+
+  ValueArray() {}
+
+  @Override
+  public VariantValue get(int index) {
+    return elements.get(index);
+  }
+
+  @Override
+  public int numElements() {
+    return elements.size();
+  }
+
+  public void add(VariantValue value) {
+    elements.add(value);
+    this.serializationState = null;
+  }
+
+  @Override
+  public int sizeInBytes() {
+    if (null == serializationState) {
+      this.serializationState = new SerializationState(elements);
+    }
+
+    return serializationState.size();
+  }
+
+  @Override
+  public int writeTo(ByteBuffer buffer, int offset) {
+    Preconditions.checkArgument(
+        buffer.order() == ByteOrder.LITTLE_ENDIAN, "Invalid byte order: big 
endian");
+
+    if (null == serializationState) {
+      this.serializationState = new SerializationState(elements);
+    }
+
+    return serializationState.writeTo(buffer, offset);
+  }
+
+  /** Common state for {@link #size()} and {@link #writeTo(ByteBuffer, int)} */
+  private static class SerializationState {
+    private final List<VariantValue> elements;
+    private final int numElements;
+    private final boolean isLarge;
+    private final int dataSize;
+    private final int offsetSize;
+
+    private SerializationState(List<VariantValue> elements) {
+      this.elements = elements;
+      this.numElements = elements.size();
+      this.isLarge = numElements > 0xFF;
+
+      int totalDataSize = 0;
+      for (VariantValue value : elements) {
+        totalDataSize += value.sizeInBytes();
+      }
+
+      this.dataSize = totalDataSize;
+      this.offsetSize = VariantUtil.sizeOf(totalDataSize);
+    }
+
+    private int size() {
+      return 1 /* header */
+          + (isLarge ? 4 : 1) /* num elements size */
+          + (1 + numElements) * offsetSize /* offset list size */
+          + dataSize;
+    }
+
+    private int writeTo(ByteBuffer buffer, int offset) {
+      int offsetListOffset =
+          offset + 1 /* header size */ + (isLarge ? 4 : 1) /* num elements 
size */;
+      int dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
+      byte header = VariantUtil.arrayHeader(isLarge, offsetSize);
+
+      VariantUtil.writeByte(buffer, header, offset);
+      VariantUtil.writeLittleEndianUnsigned(buffer, numElements, offset + 1, 
isLarge ? 4 : 1);
+
+      // Insert element offsets
+      int nextValueOffset = 0;
+      int index = 0;
+      for (VariantValue element : elements) {
+        // write the data offset
+        VariantUtil.writeLittleEndianUnsigned(
+            buffer, nextValueOffset, offsetListOffset + (index * offsetSize), 
offsetSize);
+
+        // write the data
+        int valueSize = element.writeTo(buffer, dataOffset + nextValueOffset);
+
+        nextValueOffset += valueSize;
+        index += 1;
+      }
+
+      // write the final size of the data section
+      VariantUtil.writeLittleEndianUnsigned(
+          buffer, nextValueOffset, offsetListOffset + (index * offsetSize), 
offsetSize);
+
+      // return the total size
+      return (dataOffset - offset) + dataSize;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/variants/Variants.java 
b/core/src/main/java/org/apache/iceberg/variants/Variants.java
index d5f8cb4ae6..5591145ca6 100644
--- a/core/src/main/java/org/apache/iceberg/variants/Variants.java
+++ b/core/src/main/java/org/apache/iceberg/variants/Variants.java
@@ -121,6 +121,10 @@ public class Variants {
     return VariantUtil.readByte(valueBuffer, 0) == 0;
   }
 
+  public static ValueArray array() {
+    return new ValueArray();
+  }
+
   public static <T> VariantPrimitive<T> of(PhysicalType type, T value) {
     return new PrimitiveWrapper<>(type, value);
   }
diff --git 
a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java 
b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
index 6707ae6651..66d5c9911a 100644
--- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
+++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java
@@ -217,11 +217,12 @@ public class TestShreddedObject {
         .isEqualTo(DateTimeUtil.isoDateToDays("2024-10-12"));
   }
 
-  @Test
-  public void testTwoByteOffsets() {
-    // a string larger than 255 bytes to push the value offset size above 1 
byte
-    String randomString = RandomUtil.generateString(300, random);
-    SerializedPrimitive bigString = VariantTestUtil.createString(randomString);
+  @ParameterizedTest
+  @ValueSource(ints = {300, 70_000, 16_777_300})
+  public void testMultiByteOffsets(int len) {
+    // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, 
and 4 bytes
+    String randomString = RandomUtil.generateString(len, random);
+    VariantPrimitive<String> bigString = Variants.of(randomString);
 
     Map<String, VariantValue> data = Maps.newHashMap();
     data.putAll(FIELDS);
@@ -244,60 +245,6 @@ public class TestShreddedObject {
     assertThat(object.get("big").asPrimitive().get()).isEqualTo(randomString);
   }
 
-  @Test
-  public void testThreeByteOffsets() {
-    // a string larger than 65535 bytes to push the value offset size above 2 
bytes
-    String randomString = RandomUtil.generateString(70_000, random);
-    SerializedPrimitive reallyBigString = 
VariantTestUtil.createString(randomString);
-
-    Map<String, VariantValue> data = Maps.newHashMap();
-    data.putAll(FIELDS);
-    data.put("really-big", reallyBigString);
-
-    ShreddedObject shredded = createShreddedObject(data);
-    VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata());
-
-    assertThat(value.type()).isEqualTo(PhysicalType.OBJECT);
-    SerializedObject object = (SerializedObject) value;
-    assertThat(object.numFields()).isEqualTo(4);
-
-    assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32);
-    assertThat(object.get("a").asPrimitive().get()).isEqualTo(34);
-    assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING);
-    assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg");
-    assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4);
-    assertThat(object.get("c").asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
-    assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING);
-    
assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString);
-  }
-
-  @Test
-  public void testFourByteOffsets() {
-    // a string larger than 16777215 bytes to push the value offset size above 
3 bytes
-    String randomString = RandomUtil.generateString(16_777_300, random);
-    SerializedPrimitive reallyBigString = 
VariantTestUtil.createString(randomString);
-
-    Map<String, VariantValue> data = Maps.newHashMap();
-    data.putAll(FIELDS);
-    data.put("really-big", reallyBigString);
-
-    ShreddedObject shredded = createShreddedObject(data);
-    VariantValue value = roundTripLargeBuffer(shredded, shredded.metadata());
-
-    assertThat(value.type()).isEqualTo(PhysicalType.OBJECT);
-    SerializedObject object = (SerializedObject) value;
-    assertThat(object.numFields()).isEqualTo(4);
-
-    assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT32);
-    assertThat(object.get("a").asPrimitive().get()).isEqualTo(34);
-    assertThat(object.get("b").type()).isEqualTo(PhysicalType.STRING);
-    assertThat(object.get("b").asPrimitive().get()).isEqualTo("iceberg");
-    assertThat(object.get("c").type()).isEqualTo(PhysicalType.DECIMAL4);
-    assertThat(object.get("c").asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
-    assertThat(object.get("really-big").type()).isEqualTo(PhysicalType.STRING);
-    
assertThat(object.get("really-big").asPrimitive().get()).isEqualTo(randomString);
-  }
-
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   @SuppressWarnings({"unchecked", "rawtypes"})
diff --git a/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java 
b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java
new file mode 100644
index 0000000000..f500f61065
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/variants/TestValueArray.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.variants;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Random;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.RandomUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class TestValueArray {
+  private static final VariantMetadata EMPTY_METADATA = 
Variants.emptyMetadata();
+  private static final List<VariantValue> ELEMENTS =
+      ImmutableList.of(
+          Variants.of(34), Variants.of("iceberg"), Variants.of(new 
BigDecimal("12.21")));
+
+  private final Random random = new Random(871925);
+
+  @Test
+  public void testElementAccess() {
+    ValueArray arr = createArray(ELEMENTS);
+
+    assertThat(arr.numElements()).isEqualTo(3);
+    assertThat(arr.get(0)).isInstanceOf(VariantPrimitive.class);
+    assertThat(arr.get(0).asPrimitive().get()).isEqualTo(34);
+    assertThat(arr.get(1)).isInstanceOf(VariantPrimitive.class);
+    assertThat(arr.get(1).asPrimitive().get()).isEqualTo("iceberg");
+    assertThat(arr.get(2)).isInstanceOf(VariantPrimitive.class);
+    assertThat(arr.get(2).asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
+  }
+
+  @Test
+  public void testSerializationMinimalBuffer() {
+    ValueArray arr = createArray(ELEMENTS);
+
+    VariantValue value = roundTripMinimalBuffer(arr);
+
+    assertThat(value).isInstanceOf(SerializedArray.class);
+    SerializedArray actual = (SerializedArray) value;
+
+    assertThat(actual.numElements()).isEqualTo(3);
+    assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34);
+    assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg");
+    assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
+  }
+
+  @Test
+  public void testSerializationLargeBuffer() {
+    ValueArray arr = createArray(ELEMENTS);
+
+    VariantValue value = roundTripLargeBuffer(arr);
+
+    assertThat(value).isInstanceOf(SerializedArray.class);
+    SerializedArray actual = (SerializedArray) value;
+
+    assertThat(actual.numElements()).isEqualTo(3);
+    assertThat(actual.get(0)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(0).asPrimitive().get()).isEqualTo(34);
+    assertThat(actual.get(1)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(1).asPrimitive().get()).isEqualTo("iceberg");
+    assertThat(actual.get(2)).isInstanceOf(VariantPrimitive.class);
+    assertThat(actual.get(2).asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {300, 70_000, 16_777_300})
+  public void testMultiByteOffsets(int len) {
+    // Use a string exceeding 255 bytes to test value offset sizes of 2, 3, 
and 4 bytes
+    String randomString = RandomUtil.generateString(len, random);
+    VariantPrimitive<String> bigString = Variants.of(randomString);
+
+    List<VariantValue> data = Lists.newArrayList();
+    data.addAll(ELEMENTS);
+    data.add(bigString);
+
+    ValueArray shredded = createArray(data);
+    VariantValue value = roundTripLargeBuffer(shredded);
+
+    assertThat(value.type()).isEqualTo(PhysicalType.ARRAY);
+    SerializedArray actualArray = (SerializedArray) value;
+    assertThat(actualArray.numElements()).isEqualTo(4);
+
+    assertThat(actualArray.get(0).type()).isEqualTo(PhysicalType.INT32);
+    assertThat(actualArray.get(0).asPrimitive().get()).isEqualTo(34);
+    assertThat(actualArray.get(1).type()).isEqualTo(PhysicalType.STRING);
+    assertThat(actualArray.get(1).asPrimitive().get()).isEqualTo("iceberg");
+    assertThat(actualArray.get(2).type()).isEqualTo(PhysicalType.DECIMAL4);
+    assertThat(actualArray.get(2).asPrimitive().get()).isEqualTo(new 
BigDecimal("12.21"));
+    assertThat(actualArray.get(3).type()).isEqualTo(PhysicalType.STRING);
+    assertThat(actualArray.get(3).asPrimitive().get()).isEqualTo(randomString);
+  }
+
+  @Test
+  public void testLargeArray() {
+    List<VariantValue> elements = Lists.newArrayList();
+    for (int i = 0; i < 10_000; i += 1) {
+      elements.add(Variants.of(RandomUtil.generateString(10, random)));
+    }
+
+    ValueArray arr = createArray(elements);
+    VariantValue value = roundTripLargeBuffer(arr);
+
+    assertThat(value.type()).isEqualTo(PhysicalType.ARRAY);
+    SerializedArray actualArray = (SerializedArray) value;
+    assertThat(actualArray.numElements()).isEqualTo(10_000);
+
+    for (int i = 0; i < 10_000; i++) {
+      VariantTestUtil.assertEqual(elements.get(i), actualArray.get(i));
+    }
+  }
+
+  private static VariantValue roundTripMinimalBuffer(ValueArray arr) {
+    ByteBuffer serialized = 
ByteBuffer.allocate(arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+    arr.writeTo(serialized, 0);
+
+    return Variants.value(EMPTY_METADATA, serialized);
+  }
+
+  private static VariantValue roundTripLargeBuffer(ValueArray arr) {
+    ByteBuffer serialized =
+        ByteBuffer.allocate(1000 + 
arr.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+    arr.writeTo(serialized, 300);
+
+    ByteBuffer slice = serialized.duplicate().order(ByteOrder.LITTLE_ENDIAN);
+    slice.position(300);
+    slice.limit(300 + arr.sizeInBytes());
+
+    return Variants.value(EMPTY_METADATA, slice);
+  }
+
+  private static ValueArray createArray(List<VariantValue> elements) {
+    ValueArray arr = new ValueArray();
+    for (VariantValue element : elements) {
+      arr.add(element);
+    }
+
+    return arr;
+  }
+}
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java
index 3e5635958c..40b0aeecc3 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantReaders.java
@@ -31,6 +31,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.variants.PhysicalType;
 import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.ValueArray;
 import org.apache.iceberg.variants.Variant;
 import org.apache.iceberg.variants.VariantMetadata;
 import org.apache.iceberg.variants.VariantObject;
@@ -95,6 +96,14 @@ public class ParquetVariantReaders {
         fieldReaders);
   }
 
+  public static VariantValueReader array(
+      int repeatedDefinitionLevel,
+      int repeatedRepetitionLevel,
+      ParquetValueReader<?> elementReader) {
+    return new ArrayReader(
+        repeatedDefinitionLevel, repeatedRepetitionLevel, (VariantValueReader) 
elementReader);
+  }
+
   public static VariantValueReader asVariant(PhysicalType type, 
ParquetValueReader<?> reader) {
     return new ValueAsVariantReader<>(type, reader);
   }
@@ -332,6 +341,58 @@ public class ParquetVariantReaders {
     }
   }
 
+  private static class ArrayReader implements VariantValueReader {
+    private final int definitionLevel;
+    private final int repetitionLevel;
+    private final VariantValueReader reader;
+    private final TripleIterator<?> column;
+    private final List<TripleIterator<?>> children;
+
+    protected ArrayReader(int definitionLevel, int repetitionLevel, 
VariantValueReader reader) {
+      this.definitionLevel = definitionLevel;
+      this.repetitionLevel = repetitionLevel;
+      this.reader = reader;
+      this.column = reader.column();
+      this.children = reader.columns();
+    }
+
+    @Override
+    public void setPageSource(PageReadStore pageStore) {
+      reader.setPageSource(pageStore);
+    }
+
+    @Override
+    public ValueArray read(VariantMetadata metadata) {
+      ValueArray arr = Variants.array();
+      do {
+        if (column.currentDefinitionLevel() > definitionLevel) {
+          VariantValue value = reader.read(metadata);
+          arr.add(value != null ? value : Variants.ofNull());
+        } else {
+          // consume the empty list triple
+          for (TripleIterator<?> child : children) {
+            child.nextNull();
+          }
+          // if the current definition level is equal to the definition level 
of this repeated type,
+          // then the result is an empty list and the repetition level will 
always be <= rl.
+          break;
+        }
+      } while (column.currentRepetitionLevel() > repetitionLevel);
+
+      return arr;
+    }
+
+    @Override
+    public TripleIterator<?> column() {
+      return column;
+    }
+
+    @Override
+    public List<TripleIterator<?>> columns() {
+      return children;
+    }
+  }
+
   private static class VariantReader implements ParquetValueReader<Variant> {
     private final ParquetValueReader<VariantMetadata> metadataReader;
     private final VariantValueReader valueReader;
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java
index 71d2eb2662..d0ca00b193 100644
--- 
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java
+++ 
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetVariantVisitor.java
@@ -31,6 +31,7 @@ public abstract class ParquetVariantVisitor<R> {
   static final String METADATA = "metadata";
   static final String VALUE = "value";
   static final String TYPED_VALUE = "typed_value";
+  static final String LIST = "list";
 
   /**
    * Handles the root variant column group.
diff --git 
a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java
index df41c5aa60..29ca900346 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/VariantReaderBuilder.java
@@ -66,8 +66,8 @@ public class VariantReaderBuilder extends 
ParquetVariantVisitor<ParquetValueRead
     return Streams.concat(Streams.stream(basePath), 
fieldNames.stream()).toArray(String[]::new);
   }
 
-  private String[] path(String name) {
-    return Streams.concat(Streams.stream(basePath), fieldNames.stream(), 
Stream.of(name))
+  private String[] path(String... names) {
+    return Streams.concat(Streams.stream(basePath), fieldNames.stream(), 
Stream.of(names))
         .toArray(String[]::new);
   }
 
@@ -162,8 +162,16 @@ public class VariantReaderBuilder extends 
ParquetVariantVisitor<ParquetValueRead
 
   @Override
   public VariantValueReader array(
-      GroupType array, ParquetValueReader<?> valueResult, 
ParquetValueReader<?> elementResult) {
-    throw new UnsupportedOperationException("Array is not yet supported");
+      GroupType array, ParquetValueReader<?> valueReader, 
ParquetValueReader<?> elementReader) {
+    int valueDL =
+        valueReader != null ? schema.getMaxDefinitionLevel(path(VALUE)) - 1 : 
Integer.MAX_VALUE;
+    int typedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE)) - 1;
+    int repeatedDL = schema.getMaxDefinitionLevel(path(TYPED_VALUE, LIST)) - 1;
+    int repeatedRL = schema.getMaxRepetitionLevel(path(TYPED_VALUE, LIST)) - 1;
+    VariantValueReader typedReader =
+        ParquetVariantReaders.array(repeatedDL, repeatedRL, elementReader);
+
+    return ParquetVariantReaders.shredded(valueDL, valueReader, typedDL, 
typedReader);
   }
 
   private static class LogicalTypeToVariantReader
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java
index b0299762f7..23c6e9b328 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestVariantReaders.java
@@ -48,6 +48,7 @@ import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.VariantType;
 import org.apache.iceberg.variants.PhysicalType;
 import org.apache.iceberg.variants.ShreddedObject;
+import org.apache.iceberg.variants.ValueArray;
 import org.apache.iceberg.variants.Variant;
 import org.apache.iceberg.variants.VariantMetadata;
 import org.apache.iceberg.variants.VariantObject;
@@ -57,6 +58,8 @@ import org.apache.iceberg.variants.VariantValue;
 import org.apache.iceberg.variants.Variants;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.conf.ParquetConfiguration;
+import org.apache.parquet.conf.PlainParquetConfiguration;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.schema.GroupType;
@@ -135,6 +138,15 @@ public class TestVariantReaders {
         Variants.ofUUID("f24f9b64-81fa-49d1-b74e-8c09a6e31c56"),
       };
 
+  // Required configuration to convert between Avro and Parquet schemas with 
3-level list structure
+  private static final ParquetConfiguration CONF =
+      new PlainParquetConfiguration(
+          Map.of(
+              AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
+              "false",
+              AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
+              "false"));
+
   private static Stream<Arguments> metadataAndValues() {
     Stream<Arguments> primitives =
         Stream.of(PRIMITIVES).map(variant -> Arguments.of(EMPTY_METADATA, 
variant));
@@ -255,7 +267,7 @@ public class TestVariantReaders {
   }
 
   @Test
-  public void testValueAndTypedValueConflict() throws IOException {
+  public void testValueAndTypedValueConflict() {
     GroupType variantType = variant("var", 2, 
shreddedPrimitive(PrimitiveTypeName.INT32));
     MessageType parquetSchema = parquetSchema(variantType);
 
@@ -885,6 +897,460 @@ public class TestVariantReaders {
     VariantTestUtil.assertEqual(expectedThree, actualThreeVariant.value());
   }
 
+  @Test
+  public void testSimpleArray() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("typed_value", "drama")));
+
+    GenericRecord var =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    ValueArray expectedArray = Variants.array();
+    expectedArray.add(Variants.of("comedy"));
+    expectedArray.add(Variants.of("drama"));
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(expectedArray, actualVariant.value());
+  }
+
+  @Test
+  public void testNullArray() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType variantType = variant("var", 2, list(element(shreddedType)));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    GenericRecord var =
+        record(
+            variantType,
+            Map.of(
+                "metadata",
+                VariantTestUtil.emptyMetadata(),
+                "value",
+                serialize(Variants.ofNull())));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(Variants.ofNull(), actualVariant.value());
+  }
+
+  @Test
+  public void testEmptyArray() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType variantType = variant("var", 2, list(element(shreddedType)));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr = List.of();
+    GenericRecord var =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    Record actual = writeAndRead(parquetSchema, row);
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY);
+    assertThat(actualVariant.value().asArray().numElements()).isEqualTo(0);
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+  }
+
+  @Test
+  public void testArrayWithNull() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("value", serialize(Variants.ofNull()))),
+            record(elementType, Map.of("typed_value", "drama")));
+
+    GenericRecord var =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    ValueArray expectedArray = Variants.array();
+    expectedArray.add(Variants.of("comedy"));
+    expectedArray.add(Variants.ofNull());
+    expectedArray.add(Variants.of("drama"));
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    assertThat(actualVariant.value().type()).isEqualTo(PhysicalType.ARRAY);
+    assertThat(actualVariant.value().asArray().numElements()).isEqualTo(3);
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(expectedArray, actualVariant.value());
+  }
+
+  @Test
+  public void testNestedArray() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType outerElementType = element(list(elementType));
+    GroupType variantType = variant("var", 2, list(outerElementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> inner1 =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("typed_value", "drama")));
+    List<GenericRecord> outer1 =
+        List.of(
+            record(outerElementType, Map.of("typed_value", inner1)),
+            record(outerElementType, Map.of("typed_value", List.of())));
+    GenericRecord var =
+        record(
+            variantType,
+            Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 
outer1));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    ValueArray expectedArray = Variants.array();
+    ValueArray expectedInner1 = Variants.array();
+    expectedInner1.add(Variants.of("comedy"));
+    expectedInner1.add(Variants.of("drama"));
+    ValueArray expectedInner2 = Variants.array();
+    expectedArray.add(expectedInner1);
+    expectedArray.add(expectedInner2);
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    // Verify
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(expectedArray, actualVariant.value());
+  }
+
+  @Test
+  public void testArrayWithNestedObject() throws IOException {
+    GroupType fieldA = field("a", shreddedPrimitive(PrimitiveTypeName.INT32));
+    GroupType fieldB = field("b", shreddedPrimitive(PrimitiveTypeName.BINARY, 
STRING));
+    GroupType shreddedFields = objectFields(fieldA, fieldB);
+    GroupType elementType = element(shreddedFields);
+    GroupType listType = list(elementType);
+    GroupType variantType = variant("var", 2, listType);
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    // Row 1 with nested fully shredded object
+    GenericRecord shredded1 =
+        record(
+            shreddedFields,
+            Map.of(
+                "a",
+                record(fieldA, Map.of("typed_value", 1)),
+                "b",
+                record(fieldB, Map.of("typed_value", "comedy"))));
+    GenericRecord shredded2 =
+        record(
+            shreddedFields,
+            Map.of(
+                "a",
+                record(fieldA, Map.of("typed_value", 2)),
+                "b",
+                record(fieldB, Map.of("typed_value", "drama"))));
+    List<GenericRecord> arr1 =
+        List.of(
+            record(elementType, Map.of("typed_value", shredded1)),
+            record(elementType, Map.of("typed_value", shredded2)));
+    GenericRecord var1 =
+        record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, 
"typed_value", arr1));
+    GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1));
+
+    ValueArray expected1 = Variants.array();
+    ShreddedObject expectedElement1 = Variants.object(TEST_METADATA);
+    expectedElement1.put("a", Variants.of(1));
+    expectedElement1.put("b", Variants.of("comedy"));
+    expected1.add(expectedElement1);
+    ShreddedObject expectedElement2 = Variants.object(TEST_METADATA);
+    expectedElement2.put("a", Variants.of(2));
+    expectedElement2.put("b", Variants.of("drama"));
+    expected1.add(expectedElement2);
+
+    // Row 2 with nested partially shredded object
+    GenericRecord shredded3 =
+        record(
+            shreddedFields,
+            Map.of(
+                "a",
+                record(fieldA, Map.of("typed_value", 3)),
+                "b",
+                record(fieldB, Map.of("typed_value", "action"))));
+    ShreddedObject baseObject3 = Variants.object(TEST_METADATA);
+    baseObject3.put("c", Variants.of("str"));
+
+    GenericRecord shredded4 =
+        record(
+            shreddedFields,
+            Map.of(
+                "a",
+                record(fieldA, Map.of("typed_value", 4)),
+                "b",
+                record(fieldB, Map.of("typed_value", "horror"))));
+    ShreddedObject baseObject4 = Variants.object(TEST_METADATA);
+    baseObject4.put("d", Variants.ofIsoDate("2024-01-30"));
+
+    List<GenericRecord> arr2 =
+        List.of(
+            record(elementType, Map.of("value", serialize(baseObject3), 
"typed_value", shredded3)),
+            record(elementType, Map.of("value", serialize(baseObject4), 
"typed_value", shredded4)));
+    GenericRecord var2 =
+        record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, 
"typed_value", arr2));
+    GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2));
+
+    ValueArray expected2 = Variants.array();
+    ShreddedObject expectedElement3 = Variants.object(TEST_METADATA);
+    expectedElement3.put("a", Variants.of(3));
+    expectedElement3.put("b", Variants.of("action"));
+    expectedElement3.put("c", Variants.of("str"));
+    expected2.add(expectedElement3);
+    ShreddedObject expectedElement4 = Variants.object(TEST_METADATA);
+    expectedElement4.put("a", Variants.of(4));
+    expectedElement4.put("b", Variants.of("horror"));
+    expectedElement4.put("d", Variants.ofIsoDate("2024-01-30"));
+    expected2.add(expectedElement4);
+
+    // verify
+    List<Record> actual = writeAndRead(parquetSchema, List.of(row1, row2));
+    Record actual1 = actual.get(0);
+    assertThat(actual1.getField("id")).isEqualTo(1);
+    assertThat(actual1.getField("var")).isInstanceOf(Variant.class);
+
+    Variant actualVariant1 = (Variant) actual1.getField("var");
+    VariantTestUtil.assertEqual(TEST_METADATA, actualVariant1.metadata());
+    VariantTestUtil.assertEqual(expected1, actualVariant1.value());
+
+    Record actual2 = actual.get(1);
+    assertThat(actual2.getField("id")).isEqualTo(2);
+    assertThat(actual2.getField("var")).isInstanceOf(Variant.class);
+
+    Variant actualVariant2 = (Variant) actual2.getField("var");
+    VariantTestUtil.assertEqual(TEST_METADATA, actualVariant2.metadata());
+    VariantTestUtil.assertEqual(expected2, actualVariant2.value());
+  }
+
+  @Test
+  public void testArrayWithNonArray() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr1 =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("typed_value", "drama")));
+    GenericRecord var1 =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr1));
+    GenericRecord row1 = record(parquetSchema, Map.of("id", 1, "var", var1));
+
+    ValueArray expectedArray1 = Variants.array();
+    expectedArray1.add(Variants.of("comedy"));
+    expectedArray1.add(Variants.of("drama"));
+
+    GenericRecord var2 =
+        record(
+            variantType,
+            Map.of(
+                "metadata", VariantTestUtil.emptyMetadata(), "value", 
serialize(Variants.of(34))));
+    GenericRecord row2 = record(parquetSchema, Map.of("id", 2, "var", var2));
+
+    VariantValue expectedValue2 = Variants.of(PhysicalType.INT32, 34);
+
+    GenericRecord var3 =
+        record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, "value", 
TEST_OBJECT_BUFFER));
+    GenericRecord row3 = record(parquetSchema, Map.of("id", 3, "var", var3));
+
+    ShreddedObject expectedObject3 = Variants.object(TEST_METADATA);
+    expectedObject3.put("a", Variants.ofNull());
+    expectedObject3.put("d", Variants.of("iceberg"));
+
+    // Test array is read properly after a non-array
+    List<GenericRecord> arr4 =
+        List.of(
+            record(elementType, Map.of("typed_value", "action")),
+            record(elementType, Map.of("typed_value", "horror")));
+    GenericRecord var4 =
+        record(variantType, Map.of("metadata", TEST_METADATA_BUFFER, 
"typed_value", arr4));
+    GenericRecord row4 = record(parquetSchema, Map.of("id", 4, "var", var4));
+
+    ValueArray expectedArray4 = Variants.array();
+    expectedArray4.add(Variants.of("action"));
+    expectedArray4.add(Variants.of("horror"));
+
+    List<Record> actual = writeAndRead(parquetSchema, List.of(row1, row2, 
row3, row4));
+
+    // Verify
+    Record actual1 = actual.get(0);
+    assertThat(actual1.getField("id")).isEqualTo(1);
+    assertThat(actual1.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant1 = (Variant) actual1.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant1.metadata());
+    VariantTestUtil.assertEqual(expectedArray1, actualVariant1.value());
+
+    Record actual2 = actual.get(1);
+    assertThat(actual2.getField("id")).isEqualTo(2);
+    assertThat(actual2.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant2 = (Variant) actual2.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant2.metadata());
+    VariantTestUtil.assertEqual(expectedValue2, actualVariant2.value());
+
+    Record actual3 = actual.get(2);
+    assertThat(actual3.getField("id")).isEqualTo(3);
+    assertThat(actual3.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant3 = (Variant) actual3.getField("var");
+    VariantTestUtil.assertEqual(TEST_METADATA, actualVariant3.metadata());
+    VariantTestUtil.assertEqual(expectedObject3, actualVariant3.value());
+
+    Record actual4 = actual.get(3);
+    assertThat(actual4.getField("id")).isEqualTo(4);
+    assertThat(actual4.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant4 = (Variant) actual4.getField("var");
+    VariantTestUtil.assertEqual(TEST_METADATA, actualVariant4.metadata());
+    VariantTestUtil.assertEqual(expectedArray4, actualVariant4.value());
+  }
+
+  @Test
+  public void testArrayMissingValueColumn() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType =
+        Types.buildGroup(Type.Repetition.OPTIONAL)
+            .id(2)
+            .required(PrimitiveTypeName.BINARY)
+            .named("metadata")
+            .addField(list(elementType))
+            .named("var");
+
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("typed_value", "drama")));
+    GenericRecord var =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    ValueArray expectedArray = Variants.array();
+    expectedArray.add(Variants.of("comedy"));
+    expectedArray.add(Variants.of("drama"));
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(expectedArray, actualVariant.value());
+  }
+
+  @Test
+  public void testArrayMissingElementValueColumn() throws IOException {
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType =
+        
Types.buildGroup(Type.Repetition.REQUIRED).addField(shreddedType).named("element");
+
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    List<GenericRecord> arr =
+        List.of(
+            record(elementType, Map.of("typed_value", "comedy")),
+            record(elementType, Map.of("typed_value", "drama")));
+    GenericRecord var =
+        record(
+            variantType, Map.of("metadata", VariantTestUtil.emptyMetadata(), 
"typed_value", arr));
+    GenericRecord row = record(parquetSchema, Map.of("id", 1, "var", var));
+
+    ValueArray expectedArray = Variants.array();
+    expectedArray.add(Variants.of("comedy"));
+    expectedArray.add(Variants.of("drama"));
+
+    Record actual = writeAndRead(parquetSchema, row);
+
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantTestUtil.assertEqual(expectedArray, actualVariant.value());
+  }
+
+  @Test
+  public void testArrayWithElementNullValueAndNullTypedValue() throws 
IOException {
+    // Test the invalid case that both value and typed_value of an element are 
null
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    GenericRecord element = record(elementType, Map.of());
+    GenericRecord variant =
+        record(
+            variantType,
+            Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 
List.of(element)));
+    GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", 
variant));
+
+    Record actual = writeAndRead(parquetSchema, record);
+    assertThat(actual.getField("id")).isEqualTo(1);
+    assertThat(actual.getField("var")).isInstanceOf(Variant.class);
+
+    Variant actualVariant = (Variant) actual.getField("var");
+    VariantTestUtil.assertEqual(EMPTY_METADATA, actualVariant.metadata());
+    VariantValue actualValue = actualVariant.value();
+    assertThat(actualValue.type()).isEqualTo(PhysicalType.ARRAY);
+    assertThat(actualValue.asArray().numElements()).isEqualTo(1);
+    VariantTestUtil.assertEqual(Variants.ofNull(), 
actualValue.asArray().get(0));
+  }
+
+  @Test
+  public void testArrayWithElementValueTypedValueConflict() {
+    // Test the invalid case that both value and typed_value of an element are 
not null
+    Type shreddedType = shreddedPrimitive(PrimitiveTypeName.BINARY, STRING);
+    GroupType elementType = element(shreddedType);
+    GroupType variantType = variant("var", 2, list(elementType));
+    MessageType parquetSchema = parquetSchema(variantType);
+
+    GenericRecord element =
+        record(elementType, Map.of("value", serialize(Variants.of(3)), 
"typed_value", "comedy"));
+    GenericRecord variant =
+        record(
+            variantType,
+            Map.of("metadata", VariantTestUtil.emptyMetadata(), "typed_value", 
List.of(element)));
+    GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", 
variant));
+
+    assertThatThrownBy(() -> writeAndRead(parquetSchema, record))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Invalid variant, conflicting value and 
typed_value");
+  }
+
   private static ByteBuffer serialize(VariantValue value) {
     ByteBuffer buffer = 
ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
     value.writeTo(buffer, 0);
@@ -943,7 +1409,7 @@ public class TestVariantReaders {
     OutputFile outputFile = new InMemoryOutputFile();
 
     try (ParquetWriter<GenericRecord> writer =
-        new TestWriterBuilder(outputFile).withFileType(parquetSchema).build()) 
{
+        new 
TestWriterBuilder(outputFile).withFileType(parquetSchema).withConf(CONF).build())
 {
       for (GenericRecord record : records) {
         writer.write(record);
       }
@@ -1104,14 +1570,38 @@ public class TestVariantReaders {
         .named(name);
   }
 
+  private static GroupType element(Type shreddedType) {
+    return field("element", shreddedType);
+  }
+
+  private static GroupType list(GroupType elementType) {
+    return Types.optionalList().element(elementType).named("typed_value");
+  }
+
+  private static void checkListType(GroupType listType) {
+    // Check the list is a 3-level structure
+    Preconditions.checkArgument(
+        listType.getFieldCount() == 1
+            && 
listType.getFields().get(0).isRepetition(Type.Repetition.REPEATED),
+        "Invalid list type: does not contain single repeated field: %s",
+        listType);
+
+    GroupType repeated = listType.getFields().get(0).asGroupType();
+    Preconditions.checkArgument(
+        repeated.getFieldCount() == 1
+            && 
repeated.getFields().get(0).isRepetition(Type.Repetition.REQUIRED),
+        "Invalid list type: does not contain single required subfield: %s",
+        listType);
+  }
+
   private static org.apache.avro.Schema avroSchema(GroupType schema) {
     if (schema instanceof MessageType) {
-      return new AvroSchemaConverter().convert((MessageType) schema);
+      return new AvroSchemaConverter(CONF).convert((MessageType) schema);
 
     } else {
       MessageType wrapped = 
Types.buildMessage().addField(schema).named("table");
       org.apache.avro.Schema avro =
-          new 
AvroSchemaConverter().convert(wrapped).getFields().get(0).schema();
+          new 
AvroSchemaConverter(CONF).convert(wrapped).getFields().get(0).schema();
       switch (avro.getType()) {
         case RECORD:
           return avro;


Reply via email to