pvary commented on code in PR #14435: URL: https://github.com/apache/iceberg/pull/14435#discussion_r2606554179
########## parquet/src/test/java/org/apache/iceberg/parquet/TestParquetFileMerger.java: ########## @@ -0,0 +1,1069 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import static org.apache.iceberg.parquet.Parquet.writeData; +import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestTables; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.MessageType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetFileMerger { + + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + private static final PartitionSpec UNPARTITIONED_SPEC = PartitionSpec.unpartitioned(); + private static final long DEFAULT_ROW_GROUP_SIZE = + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT; + + @TempDir private Path temp; + + private FileIO fileIO; + + @BeforeEach + public void setupFileIO() { + this.fileIO = new TestTables.LocalFileIO(); + } + + @Test + public void testCanMergeReturnsFalseForEmptyList() { + MessageType result = ParquetFileMerger.canMergeAndGetSchema(Collections.emptyList()); + assertThat(result).isNull(); + } + + @Test + public void testCanMergeReturnsFalseForNullInput() { + MessageType result = ParquetFileMerger.canMergeAndGetSchema(null); + assertThat(result).isNull(); + } + + @Test + public void testCanMergeReturnsFalseForNonParquetFile() throws IOException { + // Create a non-Parquet file (just a text file) + File textFile = createTempFile(temp); + textFile.getParentFile().mkdirs(); // Ensure directory exists + java.nio.file.Files.write(textFile.toPath(), "This is not a Parquet file".getBytes()); + + InputFile inputFile = Files.localInput(textFile); + List<InputFile> inputFiles = Lists.newArrayList(inputFile); + + // Should return null because file is not valid Parquet + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result).isNull(); + } + + @Test + public void testCanMergeReturnsFalseForDifferentSchemas() throws IOException { + // Create first Parquet file with schema1 + Schema icebergSchema1 = SCHEMA; + + File parquetFile1 = createTempFile(temp); + OutputFile outputFile1 = Files.localOutput(parquetFile1); + createParquetFileWithSchema(outputFile1, icebergSchema1); + + // Create second Parquet file with different schema + Schema icebergSchema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(3, "other", Types.LongType.get())); // Different field + + File parquetFile2 = createTempFile(temp); + OutputFile outputFile2 = Files.localOutput(parquetFile2); + createParquetFileWithSchema(outputFile2, icebergSchema2); + + // Try to validate - should return null due to different schemas + InputFile inputFile1 = Files.localInput(parquetFile1); + InputFile inputFile2 = Files.localInput(parquetFile2); + List<InputFile> inputFiles = Arrays.asList(inputFile1, inputFile2); + + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result).isNull(); + } + + @Test + public void testCanMergeReturnsTrueForIdenticalSchemas() throws IOException { + // Create two Parquet files with the same schema and some data + File parquetFile1 = createTempFile(temp); + writeRecordsToFile(parquetFile1, Arrays.asList(createRecord(1, "a"))); + + File parquetFile2 = createTempFile(temp); + writeRecordsToFile(parquetFile2, Arrays.asList(createRecord(2, "b"))); + + // Should return non-null MessageType for identical schemas + InputFile inputFile1 = Files.localInput(parquetFile1); + InputFile inputFile2 = Files.localInput(parquetFile2); + List<InputFile> inputFiles = Arrays.asList(inputFile1, inputFile2); + + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result).isNotNull(); + } + + @Test + public void testCanMergeReturnsTrueForSingleFile() throws IOException { + // Create a single Parquet file with some data + File parquetFile = createTempFile(temp); + writeRecordsToFile(parquetFile, Arrays.asList(createRecord(1, "a"))); + + // Should return non-null MessageType for single file + InputFile inputFile = Files.localInput(parquetFile); + List<InputFile> inputFiles = Lists.newArrayList(inputFile); + + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result).isNotNull(); + } + + @Test + public void testMergeFilesSynthesizesRowLineageColumns() throws IOException { + // Test that merging files with virtual row lineage synthesizes physical _row_id and + // _last_updated_sequence_number columns + + // Create two files with test data + File file1 = createTempFile(temp); + writeRecordsToFile( + file1, Arrays.asList(createRecord(1, "a"), createRecord(2, "b"), createRecord(3, "c"))); + + File file2 = createTempFile(temp); + writeRecordsToFile(file2, Arrays.asList(createRecord(4, "d"), createRecord(5, "e"))); + + // Create DataFiles with row lineage metadata + List<DataFile> dataFiles = + Arrays.asList( + createDataFile(file1.getAbsolutePath(), 100L, 5L), + createDataFile(file2.getAbsolutePath(), 103L, 5L)); + + File mergedFile = createTempFile(temp); + OutputFile mergedOutput = Files.localOutput(mergedFile); + + // Perform merge + mergeFilesHelper(dataFiles, mergedOutput, DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Verify the merged file has both row lineage columns + InputFile mergedInput = Files.localInput(mergedFile); + MessageType mergedSchema; + try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(mergedInput))) { + mergedSchema = reader.getFooter().getFileMetaData().getSchema(); + } + assertThat(mergedSchema.containsField(MetadataColumns.ROW_ID.name())) + .as("Merged file should have _row_id column") + .isTrue(); + assertThat(mergedSchema.containsField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())) + .as("Merged file should have _last_updated_sequence_number column") + .isTrue(); + + // Verify row lineage values are correct + List<RowLineageRecord> records = readRowLineageData(mergedInput); + assertThat(records).hasSize(5); + + // Verify _row_id values are sequential starting from firstRowId + // File1 records: 100, 101, 102 + assertThat(records.get(0).rowId).isEqualTo(100L); + assertThat(records.get(1).rowId).isEqualTo(101L); + assertThat(records.get(2).rowId).isEqualTo(102L); + + // File2 records: 103, 104 + assertThat(records.get(3).rowId).isEqualTo(103L); + assertThat(records.get(4).rowId).isEqualTo(104L); + + // Verify _last_updated_sequence_number is constant (5L) for all records + for (RowLineageRecord record : records) { + assertThat(record.seqNum).isEqualTo(5L); + } + + // Verify original data is preserved + assertThat(records.get(0).id).isEqualTo(1); + assertThat(records.get(0).data).isEqualTo("a"); + assertThat(records.get(1).id).isEqualTo(2); + assertThat(records.get(4).data).isEqualTo("e"); + } + + @Test + public void testMergeFilesWithMultipleRowGroups() throws IOException { + // Test row lineage synthesis works correctly across multiple row groups + + // Create file with multiple row groups by setting small row group size + File file1 = createTempFile(temp); + OutputFile output1 = Files.localOutput(file1); + + // Create enough records to span multiple row groups + List<Record> records = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + records.add(createRecord(i, "data" + i)); + } + createParquetFileWithData(output1, SCHEMA, records, 1024); // Small row group size + + // Merge with row lineage + List<DataFile> dataFiles = Arrays.asList(createDataFile(file1.getAbsolutePath(), 1000L, 7L)); + + File mergedFile = createTempFile(temp); + OutputFile mergedOutput = Files.localOutput(mergedFile); + + mergeFilesHelper(dataFiles, mergedOutput, DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Verify row IDs are sequential across row groups + List<RowLineageRecord> mergedRecords = readRowLineageData(Files.localInput(mergedFile)); + assertThat(mergedRecords).hasSize(100); + + for (int i = 0; i < 100; i++) { + assertThat(mergedRecords.get(i).rowId).isEqualTo(1000L + i); + assertThat(mergedRecords.get(i).seqNum).isEqualTo(7L); + } + } + + @Test + public void testMergeFilesWithDifferentDataSequenceNumbers() throws IOException { + // Test that each file's dataSequenceNumber is correctly applied to its rows + + // Create three files + File file1 = createTempFile(temp); + writeRecordsToFile(file1, Arrays.asList(createRecord(1, "a"), createRecord(2, "b"))); + + File file2 = createTempFile(temp); + writeRecordsToFile(file2, Arrays.asList(createRecord(3, "c"))); + + File file3 = createTempFile(temp); + writeRecordsToFile( + file3, Arrays.asList(createRecord(4, "d"), createRecord(5, "e"), createRecord(6, "f"))); + + // Merge with different sequence numbers for each file + List<DataFile> dataFiles = + Arrays.asList( + createDataFile(file1.getAbsolutePath(), 0L, 10L), + createDataFile(file2.getAbsolutePath(), 2L, 20L), + createDataFile(file3.getAbsolutePath(), 3L, 30L)); + + File mergedFile = createTempFile(temp); + + mergeFilesHelper( + dataFiles, Files.localOutput(mergedFile), DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Verify sequence numbers transition correctly between files + List<RowLineageRecord> records = readRowLineageData(Files.localInput(mergedFile)); + assertThat(records).hasSize(6); + + // File1 records (2 records, seqNum=10) + assertThat(records.get(0).seqNum).isEqualTo(10L); + assertThat(records.get(1).seqNum).isEqualTo(10L); + + // File2 records (1 record, seqNum=20) + assertThat(records.get(2).seqNum).isEqualTo(20L); + + // File3 records (3 records, seqNum=30) + assertThat(records.get(3).seqNum).isEqualTo(30L); + assertThat(records.get(4).seqNum).isEqualTo(30L); + assertThat(records.get(5).seqNum).isEqualTo(30L); + } + + @Test + public void testMergeFilesWithoutRowLineage() throws IOException { + // Test that merging without firstRowIds/dataSequenceNumbers works (no row lineage columns) + + File file1 = createTempFile(temp); + writeRecordsToFile(file1, Arrays.asList(createRecord(1, "a"), createRecord(2, "b"))); + + File file2 = createTempFile(temp); + writeRecordsToFile(file2, Arrays.asList(createRecord(3, "c"))); + + // Merge without row lineage (null firstRowIds and dataSequenceNumbers) + List<DataFile> dataFiles = + Arrays.asList( + createDataFile(file1.getAbsolutePath(), null, null), + createDataFile(file2.getAbsolutePath(), null, null)); + + File mergedFile = createTempFile(temp); + + mergeFilesHelper( + dataFiles, Files.localOutput(mergedFile), DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Verify merged file does NOT have row lineage columns + MessageType mergedSchema; + try (ParquetFileReader reader = + ParquetFileReader.open(ParquetIO.file(Files.localInput(mergedFile)))) { + mergedSchema = reader.getFooter().getFileMetaData().getSchema(); + } + assertThat(mergedSchema.containsField(MetadataColumns.ROW_ID.name())) + .as("Merged file should not have _row_id column when no lineage provided") + .isFalse(); + assertThat(mergedSchema.containsField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())) + .as( + "Merged file should not have _last_updated_sequence_number column when no lineage provided") + .isFalse(); + + // Verify data is still correct + List<Record> records = readData(Files.localInput(mergedFile), SCHEMA); + assertThat(records).hasSize(3); + assertThat(records.get(0).getField("id")).isEqualTo(1); + assertThat(records.get(1).getField("id")).isEqualTo(2); + assertThat(records.get(2).getField("id")).isEqualTo(3); + } + + @Test + public void testMergeFilesWithPhysicalRowLineageColumns() throws IOException { + // Test that files already having physical row lineage columns are preserved via binary copy + + // Create schema with row lineage columns + Schema schemaWithLineage = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.required( + MetadataColumns.ROW_ID.fieldId(), + MetadataColumns.ROW_ID.name(), + Types.LongType.get()), + Types.NestedField.required( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + Types.LongType.get())); + + // Create files with physical row lineage columns + File file1 = createTempFile(temp); + createParquetFileWithRowLineage( + Files.localOutput(file1), + schemaWithLineage, + Arrays.asList( + createRecordWithLineage(schemaWithLineage, 1, "a", 100L, 5L), + createRecordWithLineage(schemaWithLineage, 2, "b", 101L, 5L))); + + File file2 = createTempFile(temp); + createParquetFileWithRowLineage( + Files.localOutput(file2), + schemaWithLineage, + Arrays.asList(createRecordWithLineage(schemaWithLineage, 3, "c", 102L, 5L))); + + // Merge with firstRowIds/dataSequenceNumbers (should use binary copy path) + List<DataFile> dataFiles = + Arrays.asList( + createDataFile(file1.getAbsolutePath(), 100L, 5L), + createDataFile(file2.getAbsolutePath(), 102L, 5L)); + + File mergedFile = createTempFile(temp); + + mergeFilesHelper( + dataFiles, Files.localOutput(mergedFile), DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Verify physical columns are preserved + List<RowLineageRecord> records = readRowLineageData(Files.localInput(mergedFile)); + assertThat(records).hasSize(3); + + // Values should match original files (binary copy preserves them) + assertThat(records.get(0).rowId).isEqualTo(100L); + assertThat(records.get(0).seqNum).isEqualTo(5L); + assertThat(records.get(1).rowId).isEqualTo(101L); + assertThat(records.get(1).seqNum).isEqualTo(5L); + assertThat(records.get(2).rowId).isEqualTo(102L); + assertThat(records.get(2).seqNum).isEqualTo(5L); + } + + @Test + public void testCanMergeReturnsFalseForPhysicalRowLineageWithNulls() throws IOException { + // Test that validation catches files with null values in physical row lineage columns + Schema schemaWithLineage = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + // OPTIONAL - allows nulls + Types.NestedField.optional( + MetadataColumns.ROW_ID.fieldId(), + MetadataColumns.ROW_ID.name(), + Types.LongType.get()), + Types.NestedField.required( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name(), + Types.LongType.get())); + + File file1 = createTempFile(temp); + // Create file with null in _row_id column (this simulates corrupted data) + createParquetFileWithRowLineage( + Files.localOutput(file1), + schemaWithLineage, + Arrays.asList( + createRecordWithLineage(schemaWithLineage, 1, "a", null, 5L), // null _row_id + createRecordWithLineage(schemaWithLineage, 2, "b", 101L, 5L))); + + List<InputFile> inputFiles = Arrays.asList(Files.localInput(file1)); + + // canMergeAndGetSchema should return null due to null values + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result) + .as("canMergeAndGetSchema should return null for files with null row lineage values") + .isNull(); + } + + @Test + public void testMergeFilesVerifiesExactRowIdValues() throws IOException { + // Test that verifies exact row ID and sequence number values in merged output + + // Create file 1 with 3 records, firstRowId=100, dataSequenceNumber=5 + File file1 = createTempFile(temp); + writeRecordsToFile( + file1, Arrays.asList(createRecord(1, "a"), createRecord(2, "b"), createRecord(3, "c"))); + + // Create file 2 with 2 records, firstRowId=200, dataSequenceNumber=7 + File file2 = createTempFile(temp); + writeRecordsToFile(file2, Arrays.asList(createRecord(4, "d"), createRecord(5, "e"))); + + // Create file 3 with 4 records, firstRowId=300, dataSequenceNumber=10 + File file3 = createTempFile(temp); + writeRecordsToFile( + file3, + Arrays.asList( + createRecord(6, "f"), + createRecord(7, "g"), + createRecord(8, "h"), + createRecord(9, "i"))); + + List<DataFile> dataFiles = + Arrays.asList( + createDataFile(file1.getAbsolutePath(), 100L, 5L), + createDataFile(file2.getAbsolutePath(), 200L, 7L), + createDataFile(file3.getAbsolutePath(), 300L, 10L)); + + // Merge files with row lineage preservation + File outputFile = createTempFile(temp); + + mergeFilesHelper( + dataFiles, Files.localOutput(outputFile), DEFAULT_ROW_GROUP_SIZE, UNPARTITIONED_SPEC, null); + + // Read back the merged data and verify exact row ID values + List<RowLineageRecord> records = readRowLineageData(Files.localInput(outputFile)); + + // Verify we have all 9 records (3 + 2 + 4) + assertThat(records).hasSize(9); + + // Verify exact row ID and sequence number values for file 1 (3 records) + assertThat(records.get(0).rowId).isEqualTo(100L); // firstRowId=100, offset=0 + assertThat(records.get(0).seqNum).isEqualTo(5L); + assertThat(records.get(1).rowId).isEqualTo(101L); // firstRowId=100, offset=1 + assertThat(records.get(1).seqNum).isEqualTo(5L); + assertThat(records.get(2).rowId).isEqualTo(102L); // firstRowId=100, offset=2 + assertThat(records.get(2).seqNum).isEqualTo(5L); + + // Verify exact row ID and sequence number values for file 2 (2 records) + assertThat(records.get(3).rowId).isEqualTo(200L); // firstRowId=200, offset=0 + assertThat(records.get(3).seqNum).isEqualTo(7L); + assertThat(records.get(4).rowId).isEqualTo(201L); // firstRowId=200, offset=1 + assertThat(records.get(4).seqNum).isEqualTo(7L); + + // Verify exact row ID and sequence number values for file 3 (4 records) + assertThat(records.get(5).rowId).isEqualTo(300L); // firstRowId=300, offset=0 + assertThat(records.get(5).seqNum).isEqualTo(10L); + assertThat(records.get(6).rowId).isEqualTo(301L); // firstRowId=300, offset=1 + assertThat(records.get(6).seqNum).isEqualTo(10L); + assertThat(records.get(7).rowId).isEqualTo(302L); // firstRowId=300, offset=2 + assertThat(records.get(7).seqNum).isEqualTo(10L); + assertThat(records.get(8).rowId).isEqualTo(303L); // firstRowId=300, offset=3 + assertThat(records.get(8).seqNum).isEqualTo(10L); + } + + @Test + public void testCanMergeReturnsFalseForSameFieldNameDifferentType() throws IOException { + // Test schema validation edge case: same field name but different types + + // Create first file with 'data' as StringType + Schema schema1 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + File file1 = createTempFile(temp); + createParquetFileWithSchema(Files.localOutput(file1), schema1); + + // Create second file with 'data' as LongType (incompatible) + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.LongType.get())); + + File file2 = createTempFile(temp); + createParquetFileWithSchema(Files.localOutput(file2), schema2); + + // Validation should fail due to type mismatch + List<InputFile> inputFiles = Arrays.asList(Files.localInput(file1), Files.localInput(file2)); + + MessageType result = ParquetFileMerger.canMergeAndGetSchema(inputFiles); + assertThat(result) + .as("canMergeAndGetSchema should return null for same field name with different types") + .isNull(); + } + + @Test + public void testCanMergeReturnsTrueForDifferentCompressionCodecs() throws IOException { + // Test that files with different compression codecs can still be merged + // (compression is at the row group level, not schema level) + + // Create file 1 with SNAPPY compression + File file1 = createTempFile(temp); + var writer1 = + writeData(Files.localOutput(file1)) + .schema(SCHEMA) + .withSpec(UNPARTITIONED_SPEC) + .createWriterFunc(GenericParquetWriter::create) + .set(TableProperties.PARQUET_COMPRESSION, "snappy") Review Comment: Maybe add a compression parameter to `createParquetFileWithData`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
