pvary commented on code in PR #14435:
URL: https://github.com/apache/iceberg/pull/14435#discussion_r2675580977
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java:
##########
@@ -2657,4 +2682,389 @@ public boolean matches(RewriteFileGroup argument) {
return groupIDs.contains(argument.info().globalIndex());
}
}
+
+ @TestTemplate
+ public void testBinPackUsesCorrectRunnerBasedOnOption() {
+ assumeThat(useParquetFileMerger).isTrue();
+
+ Table table = createTable(4);
+ shouldHaveFiles(table, 4);
+
+ // Test that binPack() respects the configuration option
+ // When enabled, should use SparkParquetFileMergeRunner
+ RewriteDataFiles.Result resultWithMerger =
+ basicRewrite(table)
+ .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true")
+ .binPack()
+ .execute();
+
+ assertThat(resultWithMerger.rewrittenDataFilesCount()).isEqualTo(4);
+ assertThat(resultWithMerger.addedDataFilesCount()).isGreaterThan(0);
+
+ // Write more data to the table so we can test again
+ writeRecords(100, SCALE);
+
+ // When disabled, should use SparkBinPackFileRewriteRunner
+ RewriteDataFiles.Result resultWithoutMerger =
+ basicRewrite(table)
+ .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "false")
+ .binPack()
+ .execute();
+
+ // Should rewrite the newly added files
+ assertThat(resultWithoutMerger.rewrittenDataFilesCount()).isGreaterThan(0);
+ }
+
+ @TestTemplate
+ public void
testParquetFileMergerProduceConsistentRowLineageWithBinPackMerger()
+ throws IOException {
+ // Test that both binpack and ParquetFileMerger convert virtual row IDs to
physical
+ // and produce equivalent results for row lineage preservation
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ assumeThat(useParquetFileMerger).isTrue();
+
+ // Test binpack approach
+ Table binpackTable = createTable(4);
+ shouldHaveFiles(binpackTable, 4);
+ verifyInitialVirtualRowIds(binpackTable);
+ long binpackCountBefore = currentData().size();
+
+ RewriteDataFiles.Result binpackResult =
+ basicRewrite(binpackTable)
+ .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "false")
+ .binPack()
+ .execute();
+
+ assertThat(binpackResult.rewrittenDataFilesCount()).isEqualTo(4);
+ assertThat(binpackResult.addedDataFilesCount()).isGreaterThan(0);
+ assertThat(currentData()).hasSize((int) binpackCountBefore);
+ verifyPhysicalRowIdsAfterMerge(binpackTable, "Binpack");
+
+ // Test ParquetFileMerger approach with a different table location
+ String originalTableLocation = tableLocation;
+ tableLocation = new File(tableDir, "merger-table").toURI().toString();
+ Table mergerTable = createTable(4);
+ shouldHaveFiles(mergerTable, 4);
+ verifyInitialVirtualRowIds(mergerTable);
+ long mergerCountBefore = currentData().size();
+
+ RewriteDataFiles.Result mergerResult =
+ basicRewrite(mergerTable)
+ .option(RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE, "true")
+ .binPack()
+ .execute();
+
+ assertThat(mergerResult.rewrittenDataFilesCount()).isEqualTo(4);
+ assertThat(mergerResult.addedDataFilesCount()).isGreaterThan(0);
+ assertThat(currentData()).hasSize((int) mergerCountBefore);
+ verifyPhysicalRowIdsAfterMerge(mergerTable, "ParquetFileMerger");
+
+ // Restore original table location
+ tableLocation = originalTableLocation;
+
+ // Verify both approaches produce equivalent results
+ assertThat(binpackCountBefore)
+ .as("Both tables should have same initial record count")
+ .isEqualTo(mergerCountBefore);
+ assertThat(binpackResult.addedDataFilesCount())
+ .as("Both approaches should produce same number of output files")
+ .isEqualTo(mergerResult.addedDataFilesCount());
+ }
+
+ private void verifyInitialVirtualRowIds(Table table) throws IOException {
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ assertThat(dataFiles).isNotEmpty();
+ for (DataFile dataFile : dataFiles) {
+ assertThat(dataFile.firstRowId())
+ .as("Files should have virtual row IDs (first_row_id != null)")
+ .isNotNull();
+
+ // Verify files don't have physical _row_id column
+ ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(dataFile.path().toString()),
spark.sessionState().newHadoopConf()));
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ assertThat(schema.containsField("_row_id"))
+ .as("Files should not have physical _row_id column before merge")
+ .isFalse();
+ reader.close();
+ }
+ }
+
+ private void verifyPhysicalRowIdsAfterMerge(Table table, String approach)
throws IOException {
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ assertThat(dataFiles).isNotEmpty();
+ for (DataFile dataFile : dataFiles) {
+ assertThat(dataFile.firstRowId())
+ .as(approach + " should extract firstRowId from min(_row_id) column
statistics")
+ .isNotNull();
+
+ // Verify files have physical _row_id column
+ ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(dataFile.path().toString()),
spark.sessionState().newHadoopConf()));
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ assertThat(schema.containsField("_row_id"))
+ .as(approach + " should write physical _row_id column")
+ .isTrue();
+ reader.close();
+ }
+ }
+
+ @TestTemplate
+ public void testParquetFileMergerPreservesPhysicalRowIds() throws
IOException {
+ // Test scenario 2: Tables with physical _row_id column
+ // After merging, the physical _row_id should be preserved (not changed)
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ // Create a V3+ table and do an initial merge to create files with
physical row IDs
+ Table table = createTable(4);
+ shouldHaveFiles(table, 4);
+
+ // First merge: converts virtual row IDs to physical
+ RewriteDataFiles.Result firstMerge =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE,
String.valueOf(useParquetFileMerger))
+ .binPack()
+ .execute();
+
+ assertThat(firstMerge.rewrittenDataFilesCount()).isEqualTo(4);
+ assertThat(firstMerge.addedDataFilesCount()).isGreaterThan(0);
+
+ // Verify files have physical _row_id column and first_row_id set
+ // Note: For V3+ tables, firstRowId is required in metadata even with
physical _row_id column
+ List<DataFile> dataFilesAfterFirstMerge = TestHelpers.dataFiles(table);
+ assertThat(dataFilesAfterFirstMerge).isNotEmpty();
+ for (DataFile dataFile : dataFilesAfterFirstMerge) {
+ assertThat(dataFile.firstRowId())
+ .as("Files should have first_row_id set after first merge (required
for V3+)")
+ .isNotNull();
+
+ ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(dataFile.path().toString()),
spark.sessionState().newHadoopConf()));
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ assertThat(schema.containsField("_row_id"))
+ .as("Files should have physical _row_id column after first merge")
+ .isTrue();
+ reader.close();
+ }
+
+ // Add more data to create additional files
+ writeRecords(2, SCALE);
+ shouldHaveFiles(table, dataFilesAfterFirstMerge.size() + 2);
+
+ long countBefore = currentData().size();
+
+ // Second merge: should preserve physical row IDs via binary copy
+ RewriteDataFiles.Result secondMerge =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE,
String.valueOf(useParquetFileMerger))
+ .binPack()
+ .execute();
+
+ assertThat(secondMerge.rewrittenDataFilesCount()).isGreaterThan(0);
+ assertThat(secondMerge.addedDataFilesCount()).isGreaterThan(0);
+ assertThat(currentData()).hasSize((int) countBefore);
+
+ // Verify merged files still have physical _row_id column and firstRowId
from statistics
+ // Same as binpack approach: firstRowId is extracted from min(_row_id)
column statistics
+ List<DataFile> dataFilesAfterSecondMerge = TestHelpers.dataFiles(table);
+ assertThat(dataFilesAfterSecondMerge).isNotEmpty();
+ for (DataFile dataFile : dataFilesAfterSecondMerge) {
+ assertThat(dataFile.firstRowId())
+ .as(
+ "Merged files should have firstRowId extracted from _row_id
column statistics (same as binpack)")
+ .isNotNull();
+
+ // Verify files still have physical _row_id column
+ ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(dataFile.path().toString()),
spark.sessionState().newHadoopConf()));
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ assertThat(schema.containsField("_row_id"))
+ .as("Merged files should preserve physical _row_id column")
+ .isTrue();
+ reader.close();
+ }
+ }
+
+ @TestTemplate
+ public void testRowLineageWithPartitionedTable() throws IOException {
+ // Test that row lineage preservation works correctly with partitioned
tables
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ // Create a partitioned table with multiple files per partition
+ Table table = createTablePartitioned(4, 2); // 4 partitions, 2 files per
partition = 8 files
+ shouldHaveFiles(table, 8);
+ verifyInitialVirtualRowIds(table);
+
+ long countBefore = currentData().size();
+
+ // Merge using ParquetFileMerger - should handle all partitions correctly
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE,
String.valueOf(useParquetFileMerger))
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount()).isEqualTo(8);
+ assertThat(result.addedDataFilesCount()).isEqualTo(4); // One file per
partition
+ assertThat(currentData()).hasSize((int) countBefore);
+
+ // Verify row lineage is preserved across all partitions
+ verifyPhysicalRowIdsAfterMerge(table, "ParquetFileMerger with partitions");
+
+ // Verify each partition has exactly one file with physical row IDs
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ assertThat(dataFiles).hasSize(4);
+ for (DataFile dataFile : dataFiles) {
+ assertThat(dataFile.firstRowId()).isNotNull();
+ }
+ }
+
+ @TestTemplate
+ public void testRowLineageWithMultipleOutputFiles() throws IOException {
+ // Test that row lineage works when merge produces multiple output files
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ Table table = createTable(10); // 10 input files
+ shouldHaveFiles(table, 10);
+ verifyInitialVirtualRowIds(table);
+
+ long countBefore = currentData().size();
+ long totalSize = testDataSize(table);
+ // Set target size to force multiple output files (roughly 3-4 output
files)
+ long targetFileSize = totalSize / 3;
+
+ // Merge with target file size that produces multiple outputs
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
String.valueOf(targetFileSize))
+ .option(
+ RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE,
String.valueOf(useParquetFileMerger))
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount()).isEqualTo(10);
+ assertThat(result.addedDataFilesCount())
+ .as("Should produce multiple output files")
+ .isGreaterThan(1);
+ assertThat(currentData()).hasSize((int) countBefore);
+
+ // Verify all output files have physical row IDs
+ verifyPhysicalRowIdsAfterMerge(table, "ParquetFileMerger with multiple
outputs");
+
+ // Verify each output file has non-overlapping row ID ranges
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ List<Long> firstRowIds =
+
dataFiles.stream().map(DataFile::firstRowId).collect(Collectors.toList());
+
+ // All firstRowIds should be unique (non-overlapping ranges)
+ assertThat(firstRowIds).doesNotHaveDuplicates();
+ }
+
+ @TestTemplate
+ public void testRowLineageWithLargerScale() throws IOException {
+ // Test row lineage preservation with larger number of files
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ Table table = createTable(20); // 20 input files
+ shouldHaveFiles(table, 20);
+
+ long countBefore = currentData().size();
+
+ // Merge all files
+ RewriteDataFiles.Result result =
+ basicRewrite(table)
+ .option(
+ RewriteDataFiles.USE_PARQUET_ROW_GROUP_MERGE,
String.valueOf(useParquetFileMerger))
+ .binPack()
+ .execute();
+
+ assertThat(result.rewrittenDataFilesCount()).isEqualTo(20);
+ assertThat(result.addedDataFilesCount()).isGreaterThan(0);
+ assertThat(currentData()).hasSize((int) countBefore);
+
+ // Verify row lineage is preserved at scale
+ List<DataFile> dataFiles = TestHelpers.dataFiles(table);
+ for (DataFile dataFile : dataFiles) {
+ assertThat(dataFile.firstRowId())
+ .as("Large scale merge should preserve row lineage")
+ .isNotNull();
+
+ // Verify physical _row_id column exists
+ ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(dataFile.path().toString()),
spark.sessionState().newHadoopConf()));
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ assertThat(schema.containsField("_row_id"))
+ .as("Large scale merge should write physical _row_id column")
+ .isTrue();
+ reader.close();
+ }
+ }
+
+ @TestTemplate
+ public void testRowLineageConsistencyAcrossMultipleMerges() throws
IOException {
+ // Test that row lineage (row IDs) are preserved across multiple merge
operations
Review Comment:
I think it is enough to test:
- Generation of the _row_id and related columns
- Copy of the _row_id and related columns
- Copy of the data columns if _row_id is not needed
This kind of test seems like a duplication which uses time and resources for
limited coverage gain
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]