This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new df01d88c93 Parquet: Expose variantShreddingFunc() in 
Parquet.DataWriteBuilder (#14153)
df01d88c93 is described below

commit df01d88c9360cda398c40493eb4135181a8976c5
Author: Denys Kuzmenko <dkuzme...@cloudera.com>
AuthorDate: Thu Sep 25 21:44:45 2025 +0200

    Parquet: Expose variantShreddingFunc() in Parquet.DataWriteBuilder (#14153)
---
 .../java/org/apache/iceberg/parquet/Parquet.java   |  5 ++
 .../iceberg/parquet/TestParquetDataWriter.java     | 81 +++++++++++++++++++---
 2 files changed, 78 insertions(+), 8 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 6f68fbe150..0178ad5fc5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -827,6 +827,11 @@ public class Parquet {
       return this;
     }
 
+    public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction 
func) {
+      appenderBuilder.variantShreddingFunc(func);
+      return this;
+    }
+
     public DataWriteBuilder withSpec(PartitionSpec newSpec) {
       this.spec = newSpec;
       return this;
diff --git 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
index bbad38f518..3918fdc630 100644
--- 
a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
+++ 
b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java
@@ -25,10 +25,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.List;
+import java.util.Optional;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalTestHelpers;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -47,6 +49,13 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantTestUtil;
+import org.apache.iceberg.variants.Variants;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -78,25 +87,29 @@ public class TestParquetDataWriter {
 
   @Test
   public void testDataWriter() throws IOException {
+    testDataWriter(SCHEMA, (id, name) -> null);
+  }
+
+  private void testDataWriter(Schema schema, VariantShreddingFunction 
variantShreddingFunc)
+      throws IOException {
     OutputFile file = Files.localOutput(createTempFile(temp));
 
-    SortOrder sortOrder = 
SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build();
+    SortOrder sortOrder = 
SortOrder.builderFor(schema).withOrderId(10).asc("id").build();
 
     DataWriter<Record> dataWriter =
         Parquet.writeData(file)
-            .schema(SCHEMA)
+            .schema(schema)
             .createWriterFunc(GenericParquetWriter::create)
+            .variantShreddingFunc(variantShreddingFunc)
             .overwrite()
             .withSpec(PartitionSpec.unpartitioned())
             .withSortOrder(sortOrder)
             .build();
 
-    try {
+    try (dataWriter) {
       for (Record record : records) {
         dataWriter.write(record);
       }
-    } finally {
-      dataWriter.close();
     }
 
     DataFile dataFile = dataWriter.toDataFile();
@@ -113,13 +126,32 @@ public class TestParquetDataWriter {
     List<Record> writtenRecords;
     try (CloseableIterable<Record> reader =
         Parquet.read(file.toInputFile())
-            .project(SCHEMA)
-            .createReaderFunc(fileSchema -> 
GenericParquetReaders.buildReader(SCHEMA, fileSchema))
+            .project(schema)
+            .createReaderFunc(fileSchema -> 
GenericParquetReaders.buildReader(schema, fileSchema))
             .build()) {
       writtenRecords = Lists.newArrayList(reader);
     }
 
-    assertThat(writtenRecords).as("Written records should 
match").isEqualTo(records);
+    assertThat(writtenRecords).hasSameSizeAs(records);
+
+    for (int i = 0; i < records.size(); i++) {
+      InternalTestHelpers.assertEquals(schema.asStruct(), records.get(i), 
writtenRecords.get(i));
+    }
+
+    // Check physical Parquet schema if variant shredding function is provided
+    Optional<Types.NestedField> variantField =
+        schema.columns().stream()
+            .filter(field -> field.type().equals(Types.VariantType.get()))
+            .findFirst();
+
+    if (variantField.isPresent() && variantShreddingFunc != null) {
+      try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(file.toInputFile()))) {
+        MessageType parquetSchema = 
reader.getFooter().getFileMetaData().getSchema();
+        GroupType variantType = 
parquetSchema.getType(variantField.get().name()).asGroupType();
+
+        assertThat(variantType.containsField("typed_value")).isTrue();
+      }
+    }
   }
 
   @SuppressWarnings("checkstyle:AvoidEscapedUnicodeCharacters")
@@ -266,4 +298,37 @@ public class TestParquetDataWriter {
     assertThat(dataFile.lowerBounds()).as("Should have a valid lower 
bound").containsKey(3);
     assertThat(dataFile.upperBounds()).as("Should have a null upper 
bound").doesNotContainKey(3);
   }
+
+  @Test
+  public void testDataWriterWithVariantShredding() throws IOException {
+    Schema variantSchema =
+        new Schema(
+            ImmutableList.<Types.NestedField>builder()
+                .addAll(SCHEMA.columns())
+                .add(Types.NestedField.optional(4, "variant", 
Types.VariantType.get()))
+                .build());
+
+    ByteBuffer metadataBuffer = 
VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true);
+    VariantMetadata metadata = Variants.metadata(metadataBuffer);
+
+    ByteBuffer objectBuffer =
+        VariantTestUtil.createObject(
+            metadataBuffer,
+            ImmutableMap.of(
+                "a", Variants.of(123456789),
+                "b", Variants.of("string")));
+
+    Variant variant = Variant.of(metadata, Variants.value(metadata, 
objectBuffer));
+
+    // Create records with variant data
+    GenericRecord record = GenericRecord.create(variantSchema);
+
+    records =
+        ImmutableList.of(
+            record.copy(ImmutableMap.of("id", 1L, "variant", variant)),
+            record.copy(ImmutableMap.of("id", 2L, "variant", variant)));
+
+    testDataWriter(
+        variantSchema, (id, name) -> 
ParquetVariantUtil.toParquetSchema(variant.value()));
+  }
 }

Reply via email to