pvary commented on code in PR #14435:
URL: https://github.com/apache/iceberg/pull/14435#discussion_r2606470110


##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFileMerger.java:
##########
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import static java.util.Collections.emptyMap;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.LongUnaryOperator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.ValuesWriter;
+import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.crypto.InternalFileEncryptor;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+/**
+ * Utility class for performing strict schema validation and merging of 
Parquet files at the
+ * row-group level.
+ *
+ * <p>This class ensures that all input files have identical Parquet schemas 
before merging. The
+ * merge operation is performed by copying row groups directly without
+ * serialization/deserialization, providing significant performance benefits 
over traditional
+ * read-rewrite approaches.
+ *
+ * <p>This class works with any Iceberg FileIO implementation (HadoopFileIO, 
S3FileIO, GCSFileIO,
+ * etc.), making it cloud-agnostic.
+ *
+ * <p>TODO: Encrypted tables are not supported
+ *
+ * <p>Key features:
+ *
+ * <ul>
+ *   <li>Row group merging without deserialization using {@link 
ParquetFileWriter#appendFile}
+ *   <li>Strict schema validation - all files must have identical {@link 
MessageType}
+ *   <li>Metadata merging for Iceberg-specific footer data
+ *   <li>Works with any FileIO implementation (local, S3, GCS, Azure, etc.)
+ * </ul>
+ *
+ * <p>Restrictions:
+ *
+ * <ul>
+ *   <li>All files must have compatible schemas (identical {@link MessageType})
+ *   <li>Files must not be encrypted
+ *   <li>Files must not have associated delete files or delete vectors
+ *   <li>Table must not have a sort order (including z-ordered tables)
+ * </ul>
+ *
+ * <p>Typical usage:
+ *
+ * <pre>
+ * ValidationResult result = 
ParquetFileMerger.readAndValidateSchema(inputFiles);
+ * if (result != null) {
+ *   ParquetFileMerger.mergeFiles(
+ *       inputFiles, encryptedOutputFile, result.schema(), firstRowIds,
+ *       rowGroupSize, columnIndexTruncateLength, result.metadata());
+ * }
+ * </pre>
+ */
+public class ParquetFileMerger {
+  // Default buffer sizes for DeltaBinaryPackingValuesWriter
+  private static final int DEFAULT_INITIAL_BUFFER_SIZE = 64 * 1024; // 64KB
+  private static final int DEFAULT_PAGE_SIZE_FOR_ENCODING = 64 * 1024; // 64KB
+
+  private ParquetFileMerger() {
+    // Utility class - prevent instantiation
+  }
+
+  @VisibleForTesting
+  static MessageType canMergeAndGetSchema(List<InputFile> inputFiles) {
+    try {
+      if (inputFiles == null || inputFiles.isEmpty()) {
+        return null;
+      }
+
+      // Read schema from the first file
+      MessageType firstSchema = readSchema(inputFiles.get(0));
+
+      // Check if schema has physical row lineage columns
+      boolean hasRowIdColumn = 
firstSchema.containsField(MetadataColumns.ROW_ID.name());
+      boolean hasSeqNumColumn =
+          
firstSchema.containsField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
+
+      // Validate all files have the same schema
+      for (int i = 1; i < inputFiles.size(); i++) {
+        MessageType currentSchema = readSchema(inputFiles.get(i));
+
+        if (!firstSchema.equals(currentSchema)) {
+          return null;
+        }
+      }
+
+      // If there are physical row lineage columns, validate no nulls
+      if (hasRowIdColumn || hasSeqNumColumn) {
+        validateRowLineageColumnsHaveNoNulls(inputFiles);
+      }
+
+      return firstSchema;
+    } catch (RuntimeException | IOException e) {
+      // Returns null for:
+      // - Non-Parquet files (IOException when reading Parquet footer)
+      // - Encrypted files (ParquetCryptoRuntimeException extends 
RuntimeException)
+      // - Files with null row lineage values (IllegalArgumentException from
+      // validateRowLineageColumnsHaveNoNulls)
+      // - Any other validation failures
+      return null;
+    }
+  }
+
+  /**
+   * Validates that DataFiles can be merged and returns the Parquet schema if 
validation succeeds.
+   *
+   * <p>This method validates:
+   *
+   * <ul>
+   *   <li>All Parquet-specific requirements (via {@link 
#canMergeAndGetSchema(List)})
+   *   <li>All files have the same partition spec
+   *   <li>No files exceed the target output size (not splitting large files)
+   * </ul>
+   *
+   * <p>This validation is useful for compaction operations in Spark, Flink, 
or other engines that
+   * need to ensure files can be safely merged. The returned MessageType can 
be passed to {@link
+   * #mergeFiles} to avoid re-reading the schema.
+   *
+   * @param dataFiles List of DataFiles to validate
+   * @param fileIO FileIO to use for reading files
+   * @param targetOutputSize Maximum size for output file (files larger than 
this cannot be merged)
+   * @return MessageType schema if files can be merged, null otherwise
+   */
+  public static MessageType canMergeAndGetSchema(
+      List<DataFile> dataFiles, FileIO fileIO, long targetOutputSize) {
+    if (dataFiles == null || dataFiles.isEmpty()) {
+      return null;
+    }
+
+    // Single loop to check partition spec consistency, file sizes, and build 
InputFile list
+    int firstSpecId = dataFiles.get(0).specId();
+    List<InputFile> inputFiles = 
Lists.newArrayListWithCapacity(dataFiles.size());
+    for (DataFile dataFile : dataFiles) {
+      // Check partition spec consistency - all files must have the same spec
+      if (dataFile.specId() != firstSpecId) {
+        return null;
+      }
+
+      // Check file sizes - don't merge if splitting large files
+      if (dataFile.fileSizeInBytes() > targetOutputSize) {
+        return null;
+      }
+
+      inputFiles.add(fileIO.newInputFile(dataFile.path().toString()));
+    }
+
+    return canMergeAndGetSchema(inputFiles);
+  }
+
+  /**
+   * Reads the Parquet schema from an Iceberg InputFile.
+   *
+   * @param inputFile Iceberg input file to read schema from
+   * @return MessageType schema of the Parquet file
+   * @throws IOException if reading fails
+   */
+  private static MessageType readSchema(InputFile inputFile) throws 
IOException {
+    return ParquetFileReader.open(ParquetIO.file(inputFile))
+        .getFooter()
+        .getFileMetaData()
+        .getSchema();
+  }
+
+  /**
+   * Validates that all row lineage column values are non-null in the input 
files.
+   *
+   * <p>When files already have physical row lineage columns and we're doing 
row lineage processing,
+   * we cannot automatically calculate null values during binary merge. This 
method ensures all
+   * values in both _row_id and _last_updated_sequence_number columns are 
present.
+   *
+   * @param inputFiles List of input files to validate
+   * @throws IllegalArgumentException if any row lineage column contains null 
values
+   * @throws IOException if reading file metadata fails
+   */
+  private static void validateRowLineageColumnsHaveNoNulls(List<InputFile> 
inputFiles)
+      throws IOException {
+    for (InputFile inputFile : inputFiles) {
+      try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(inputFile))) {
+        List<BlockMetaData> rowGroups = reader.getFooter().getBlocks();
+
+        for (BlockMetaData rowGroup : rowGroups) {
+          for (ColumnChunkMetaData columnChunk : rowGroup.getColumns()) {
+            String columnPath = columnChunk.getPath().toDotString();
+
+            // Check if this is the _row_id column
+            if (columnPath.equals(MetadataColumns.ROW_ID.name())) {
+              Statistics<?> stats = columnChunk.getStatistics();
+              if (stats != null && stats.getNumNulls() > 0) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        Locale.ROOT,
+                        "File %s contains null values in _row_id column (row 
group has %d nulls). "
+                            + "Cannot merge files with null _row_id values 
using binary copy.",
+                        inputFile.location(),
+                        stats.getNumNulls()));
+              }
+            }
+
+            // Check if this is the _last_updated_sequence_number column
+            if 
(columnPath.equals(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name())) {
+              Statistics<?> stats = columnChunk.getStatistics();
+              if (stats != null && stats.getNumNulls() > 0) {
+                throw new IllegalArgumentException(
+                    String.format(
+                        Locale.ROOT,
+                        "File %s contains null values in 
_last_updated_sequence_number column "
+                            + "(row group has %d nulls). Cannot merge files 
with null "
+                            + "_last_updated_sequence_number values using 
binary copy.",
+                        inputFile.location(),
+                        stats.getNumNulls()));
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /** Internal method to merge files when schema is already known. */
+  private static void mergeFilesWithSchema(
+      List<InputFile> inputFiles,
+      OutputFile outputFile,
+      MessageType schema,
+      long rowGroupSize,
+      int columnIndexTruncateLength)
+      throws IOException {
+    try (ParquetFileWriter writer =
+        new ParquetFileWriter(
+            ParquetIO.file(outputFile),
+            schema,
+            ParquetFileWriter.Mode.CREATE,
+            rowGroupSize,
+            0, // maxPaddingSize - hardcoded to 0 (same as ParquetWriter)
+            columnIndexTruncateLength,
+            ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+            ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            (InternalFileEncryptor) null)) {
+
+      Map<String, String> extraMetadata = null;
+      writer.start();
+      for (InputFile inputFile : inputFiles) {
+        try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(inputFile))) {
+          // Read metadata from first file
+          if (extraMetadata == null) {
+            extraMetadata = 
reader.getFooter().getFileMetaData().getKeyValueMetaData();
+          }
+
+          reader.appendTo(writer);
+        }
+      }
+
+      writer.end(extraMetadata != null ? extraMetadata : emptyMap());
+    }
+  }
+
+  /**
+   * Internal method to merge files with row lineage columns when base schema 
is already known. Adds
+   * both _row_id and _last_updated_sequence_number columns to the output file.
+   */
+  private static void mergeFilesWithRowLineageAndSchema(
+      List<InputFile> inputFiles,
+      OutputFile outputFile,
+      List<Long> firstRowIds,
+      List<Long> dataSequenceNumbers,
+      MessageType baseSchema,
+      long rowGroupSize,
+      int columnIndexTruncateLength)
+      throws IOException {
+    // Extend schema to include _row_id and _last_updated_sequence_number 
columns
+    MessageType extendedSchema = addRowLineageColumns(baseSchema);
+
+    // Create output writer with extended schema
+    try (ParquetFileWriter writer =
+        new ParquetFileWriter(
+            ParquetIO.file(outputFile),
+            extendedSchema,
+            ParquetFileWriter.Mode.CREATE,
+            rowGroupSize,
+            0, // maxPaddingSize - hardcoded to 0 (same as ParquetWriter)
+            columnIndexTruncateLength,
+            ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
+            ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
+            (InternalFileEncryptor) null)) {
+
+      writer.start();
+
+      // Get column descriptors for row lineage columns from extended schema
+      ColumnDescriptor rowIdDescriptor =
+          extendedSchema.getColumnDescription(new String[] 
{MetadataColumns.ROW_ID.name()});
+      ColumnDescriptor seqNumDescriptor =
+          extendedSchema.getColumnDescription(
+              new String[] 
{MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()});
+
+      Map<String, String> extraMetadata = null;
+
+      // Process each input file
+      for (int fileIdx = 0; fileIdx < inputFiles.size(); fileIdx++) {
+        InputFile inputFile = inputFiles.get(fileIdx);
+        long currentRowId = firstRowIds.get(fileIdx);
+        long dataSequenceNumber = dataSequenceNumbers.get(fileIdx);
+
+        try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIO.file(inputFile))) {
+          // Read metadata from first file
+          if (extraMetadata == null) {
+            extraMetadata = 
reader.getFooter().getFileMetaData().getKeyValueMetaData();
+          }
+
+          List<BlockMetaData> rowGroups = reader.getFooter().getBlocks();
+
+          for (BlockMetaData rowGroup : rowGroups) {
+            long rowCount = rowGroup.getRowCount();
+            writer.startBlock(rowCount);
+
+            // Copy all existing column chunks (binary copy)
+            copyColumnChunks(writer, baseSchema, inputFile, rowGroup);
+
+            // Extract compression codec from existing columns to use for row 
lineage columns
+            CompressionCodecName codec = 
rowGroup.getColumns().get(0).getCodec();
+
+            // Write new _row_id column chunk (DELTA_BINARY_PACKED encoded, 
then compressed with
+            // codec)
+            // Write sequential values: currentRowId, currentRowId+1, 
currentRowId+2, ...
+            long startRowId = currentRowId;
+            writeLongColumnChunk(
+                writer,
+                rowIdDescriptor,
+                rowCount,
+                codec,
+                startRowId,
+                startRowId + rowCount - 1,
+                i -> startRowId + i);
+            currentRowId += rowCount;
+
+            // Write new _last_updated_sequence_number column chunk
+            // Write constant value for all rows: dataSequenceNumber, 
dataSequenceNumber, ...
+            writeLongColumnChunk(
+                writer,
+                seqNumDescriptor,
+                rowCount,
+                codec,
+                dataSequenceNumber,
+                dataSequenceNumber,
+                i -> dataSequenceNumber);
+
+            writer.endBlock();
+          }
+        }
+      }
+
+      writer.end(extraMetadata != null ? extraMetadata : emptyMap());
+    }
+  }
+
+  /**
+   * Merges multiple Parquet data files with optional row lineage preservation.
+   *
+   * <p>This method intelligently handles row lineage based on the input files 
and metadata:
+   *
+   * <ul>
+   *   <li>If DataFiles have null firstRowId/dataSequenceNumber: performs 
simple binary copy merge
+   *   <li>If files already have physical row lineage columns: performs simple 
binary copy merge
+   *   <li>Otherwise: synthesizes physical row lineage columns from DataFile 
metadata
+   * </ul>
+   *
+   * <p>Row lineage consists of two columns:
+   *
+   * <ul>
+   *   <li>_row_id: unique identifier for each row, synthesized from 
firstRowId + row position
+   *   <li>_last_updated_sequence_number: data sequence number when row was 
last updated
+   * </ul>
+   *
+   * <p>The provided schema parameter should be obtained from {@link 
#canMergeAndGetSchema(List,
+   * FileIO, long)} to avoid redundant file reads. All files must have 
compatible schemas (as
+   * validated by {@link #canMergeAndGetSchema}).
+   *
+   * @param dataFiles List of Iceberg DataFiles to merge
+   * @param fileIO FileIO to use for reading input files
+   * @param outputFile Output file for the merged result (caller handles 
encryption if needed)
+   * @param schema Parquet schema from canMergeAndGetSchema (avoids re-reading 
first file)
+   * @param rowGroupSize Target row group size in bytes
+   * @param spec PartitionSpec for the output file
+   * @param partition Partition data for the output file (null for 
unpartitioned tables)
+   * @return DataFile representing the merged output file with complete 
metadata
+   * @throws IOException if I/O error occurs during merge operation
+   */
+  public static DataFile mergeFiles(
+      List<DataFile> dataFiles,
+      FileIO fileIO,
+      OutputFile outputFile,
+      MessageType schema,
+      long rowGroupSize,
+      PartitionSpec spec,
+      StructLike partition)
+      throws IOException {
+    // Convert DataFiles to InputFiles and extract row lineage metadata
+    List<InputFile> inputFiles = 
Lists.newArrayListWithCapacity(dataFiles.size());
+    List<Long> firstRowIds = Lists.newArrayListWithCapacity(dataFiles.size());
+    List<Long> dataSequenceNumbers = 
Lists.newArrayListWithCapacity(dataFiles.size());
+    boolean hasRowLineage = false;
+
+    for (DataFile dataFile : dataFiles) {
+      inputFiles.add(fileIO.newInputFile(dataFile.path().toString()));
+
+      Long firstRowId = dataFile.firstRowId();
+      Long dataSequenceNumber = dataFile.dataSequenceNumber();
+      firstRowIds.add(firstRowId);
+      dataSequenceNumbers.add(dataSequenceNumber);
+
+      if (firstRowId != null && dataSequenceNumber != null) {
+        hasRowLineage = true;
+      }
+    }
+
+    // Initialize columnIndexTruncateLength following the same pattern as 
Parquet.java
+    Configuration conf =
+        outputFile instanceof HadoopOutputFile
+            ? new Configuration(((HadoopOutputFile) outputFile).getConf())
+            : new Configuration();
+    int columnIndexTruncateLength =
+        conf.getInt(
+            ParquetOutputFormat.COLUMN_INDEX_TRUNCATE_LENGTH,
+            ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+
+    // Check if we need to synthesize physical row lineage columns from 
virtual metadata
+    boolean shouldSynthesizeRowLineage =
+        hasRowLineage
+            && !schema.containsField(MetadataColumns.ROW_ID.name())
+            && 
!schema.containsField(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name());
+
+    if (shouldSynthesizeRowLineage) {
+      // Files have virtual row lineage - synthesize physical columns
+      mergeFilesWithRowLineageAndSchema(
+          inputFiles,
+          outputFile,
+          firstRowIds,
+          dataSequenceNumbers,
+          schema,
+          rowGroupSize,
+          columnIndexTruncateLength);
+    } else {
+      // Use simple binary copy (either no row lineage, or files already have 
physical columns)
+      mergeFilesWithSchema(inputFiles, outputFile, schema, rowGroupSize, 
columnIndexTruncateLength);
+    }
+
+    // Build DataFile with metrics and metadata
+    String outputPath = outputFile.location();
+    long fileSize = fileIO.newInputFile(outputPath).getLength();
+
+    // Extract metrics from the merged Parquet file
+    MetricsConfig metricsConfig = MetricsConfig.getDefault();
+    Metrics metrics = ParquetUtil.fileMetrics(fileIO.newInputFile(outputPath), 
metricsConfig);
+
+    // Build the DataFile
+    DataFiles.Builder builder =
+        DataFiles.builder(spec)
+            .withPath(outputPath)
+            .withFormat(FileFormat.PARQUET)
+            .withFileSizeInBytes(fileSize)
+            .withMetrics(metrics);
+
+    // Add partition if present
+    if (partition != null) {
+      builder.withPartition(partition);
+    }
+
+    // Extract firstRowId from Parquet column statistics (for V3+ tables with 
row lineage)
+    // The min value of _row_id column becomes firstRowId
+    if (metrics.lowerBounds() != null) {
+      ByteBuffer rowIdLowerBound = 
metrics.lowerBounds().get(MetadataColumns.ROW_ID.fieldId());
+      if (rowIdLowerBound != null) {
+        Long firstRowId = Conversions.fromByteBuffer(LongType.get(), 
rowIdLowerBound);
+        builder.withFirstRowId(firstRowId);
+      }
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Extends a Parquet schema by adding the row lineage metadata columns.
+   *
+   * <p>Row lineage consists of two columns:
+   *
+   * <ul>
+   *   <li>_row_id: unique identifier for each row
+   *   <li>_last_updated_sequence_number: data sequence number when row was 
last updated
+   * </ul>
+   *
+   * @param baseSchema Original Parquet schema
+   * @return Extended schema with row lineage columns added
+   */

Review Comment:
   remove, or simplify to something like this:
   ```
     /** Extends a Parquet schema by adding the row lineage metadata columns: 
_row_id, ._last_updated_sequence_number */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to