This is an automated email from the ASF dual-hosted git repository. amoghj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new df01d88c93 Parquet: Expose variantShreddingFunc() in Parquet.DataWriteBuilder (#14153) df01d88c93 is described below commit df01d88c9360cda398c40493eb4135181a8976c5 Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Thu Sep 25 21:44:45 2025 +0200 Parquet: Expose variantShreddingFunc() in Parquet.DataWriteBuilder (#14153) --- .../java/org/apache/iceberg/parquet/Parquet.java | 5 ++ .../iceberg/parquet/TestParquetDataWriter.java | 81 +++++++++++++++++++--- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 6f68fbe150..0178ad5fc5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -827,6 +827,11 @@ public class Parquet { return this; } + public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction func) { + appenderBuilder.variantShreddingFunc(func); + return this; + } + public DataWriteBuilder withSpec(PartitionSpec newSpec) { this.spec = newSpec; return this; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java index bbad38f518..3918fdc630 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java @@ -25,10 +25,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.List; +import java.util.Optional; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Files; +import org.apache.iceberg.InternalTestHelpers; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -47,6 +49,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantTestUtil; +import org.apache.iceberg.variants.Variants; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -78,25 +87,29 @@ public class TestParquetDataWriter { @Test public void testDataWriter() throws IOException { + testDataWriter(SCHEMA, (id, name) -> null); + } + + private void testDataWriter(Schema schema, VariantShreddingFunction variantShreddingFunc) + throws IOException { OutputFile file = Files.localOutput(createTempFile(temp)); - SortOrder sortOrder = SortOrder.builderFor(SCHEMA).withOrderId(10).asc("id").build(); + SortOrder sortOrder = SortOrder.builderFor(schema).withOrderId(10).asc("id").build(); DataWriter<Record> dataWriter = Parquet.writeData(file) - .schema(SCHEMA) + .schema(schema) .createWriterFunc(GenericParquetWriter::create) + .variantShreddingFunc(variantShreddingFunc) .overwrite() .withSpec(PartitionSpec.unpartitioned()) .withSortOrder(sortOrder) .build(); - try { + try (dataWriter) { for (Record record : records) { dataWriter.write(record); } - } finally { - dataWriter.close(); } DataFile dataFile = dataWriter.toDataFile(); @@ -113,13 +126,32 @@ public class TestParquetDataWriter { List<Record> writtenRecords; try (CloseableIterable<Record> reader = Parquet.read(file.toInputFile()) - .project(SCHEMA) - .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(SCHEMA, fileSchema)) + .project(schema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) .build()) { writtenRecords = Lists.newArrayList(reader); } - assertThat(writtenRecords).as("Written records should match").isEqualTo(records); + assertThat(writtenRecords).hasSameSizeAs(records); + + for (int i = 0; i < records.size(); i++) { + InternalTestHelpers.assertEquals(schema.asStruct(), records.get(i), writtenRecords.get(i)); + } + + // Check physical Parquet schema if variant shredding function is provided + Optional<Types.NestedField> variantField = + schema.columns().stream() + .filter(field -> field.type().equals(Types.VariantType.get())) + .findFirst(); + + if (variantField.isPresent() && variantShreddingFunc != null) { + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(file.toInputFile()))) { + MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); + GroupType variantType = parquetSchema.getType(variantField.get().name()).asGroupType(); + + assertThat(variantType.containsField("typed_value")).isTrue(); + } + } } @SuppressWarnings("checkstyle:AvoidEscapedUnicodeCharacters") @@ -266,4 +298,37 @@ public class TestParquetDataWriter { assertThat(dataFile.lowerBounds()).as("Should have a valid lower bound").containsKey(3); assertThat(dataFile.upperBounds()).as("Should have a null upper bound").doesNotContainKey(3); } + + @Test + public void testDataWriterWithVariantShredding() throws IOException { + Schema variantSchema = + new Schema( + ImmutableList.<Types.NestedField>builder() + .addAll(SCHEMA.columns()) + .add(Types.NestedField.optional(4, "variant", Types.VariantType.get())) + .build()); + + ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); + VariantMetadata metadata = Variants.metadata(metadataBuffer); + + ByteBuffer objectBuffer = + VariantTestUtil.createObject( + metadataBuffer, + ImmutableMap.of( + "a", Variants.of(123456789), + "b", Variants.of("string"))); + + Variant variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); + + // Create records with variant data + GenericRecord record = GenericRecord.create(variantSchema); + + records = + ImmutableList.of( + record.copy(ImmutableMap.of("id", 1L, "variant", variant)), + record.copy(ImmutableMap.of("id", 2L, "variant", variant))); + + testDataWriter( + variantSchema, (id, name) -> ParquetVariantUtil.toParquetSchema(variant.value())); + } }