This is an automated email from the ASF dual-hosted git repository.

zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new e75b4a0b3 [AMORO-4186] Add support for Parquet row-group merging in 
IcebergRewriteExecutor (#4188)
e75b4a0b3 is described below

commit e75b4a0b3cdba96a24ca7af1cd86833a1167dc99
Author: WenLingzhang <[email protected]>
AuthorDate: Wed May 20 10:51:31 2026 +0800

    [AMORO-4186] Add support for Parquet row-group merging in 
IcebergRewriteExecutor (#4188)
    
    * Add support for Parquet row-group merging in IcebergRewriteExecutor
    
    * Enhance Parquet row-group merge support for Iceberg V2 tables and add 
version checks
    
    * Add cleanup for orphaned merged output files in Parquet row-group merge
    
    * Refactor ParquetFileMergeRunner to simplify ParquetWriter initialization
    
    * Implement Parquet schema context preparation for row-group merging in 
IcebergRewriteExecutor
    
    * Add minimum average row-group size threshold for Parquet row-group merge
    
    * fixup
    
    * fixup style
    
    Removed unnecessary line breaks in the class documentation.
    
    * fixup style
    
    ---------
    
    Co-authored-by: 张文领 <[email protected]>
---
 .../optimizing/AbstractRewriteFilesExecutor.java   |   2 +-
 .../amoro/optimizing/IcebergRewriteExecutor.java   | 358 ++++++++++++++++++++
 .../amoro/optimizing/ParquetFileMergeRunner.java   | 198 +++++++++++
 .../org/apache/amoro/table/TableProperties.java    |  12 +
 .../apache/iceberg/parquet/ParquetIOBridge.java    |  40 +++
 .../optimizing/IcebergRewriteExecutorTest.java     | 369 +++++++++++++++++++++
 docs/user-guides/configurations.md                 |  51 +--
 7 files changed, 1004 insertions(+), 26 deletions(-)

diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
index 4446619f9..823596416 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
@@ -145,7 +145,7 @@ public abstract class AbstractRewriteFilesExecutor
     return posDeleteWriter.result().deleteFiles();
   }
 
-  private List<DataFile> rewriterDataFiles() throws Exception {
+  protected List<DataFile> rewriterDataFiles() throws Exception {
     List<DataFile> result = Lists.newArrayList();
     TaskWriter<Record> writer = dataWriter();
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
index 606be5754..3aa05ac78 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
@@ -18,29 +18,76 @@
 
 package org.apache.amoro.optimizing;
 
+import static 
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED;
+import static 
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT;
+import static 
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES;
+import static 
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT;
+
 import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
 import org.apache.amoro.io.writer.GenericIcebergPartitionedFanoutWriter;
 import org.apache.amoro.io.writer.IcebergFanoutPosDeleteWriter;
 import org.apache.amoro.table.MixedTable;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.IdentityPartitionConverters;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.DeleteWriteResult;
 import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.parquet.ParquetIOBridge;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 /** OptimizingExecutor for iceberg format. */
 public class IcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRewriteExecutor.class);
+  private static final String PARQUET_BLOOM_FILTER_ENABLED_PREFIX =
+      "write.parquet.bloom-filter-enabled";
+  private FileSchemaContext fileSchemaContext;
+
+  private static class FileSchemaContext {
+    private final MessageType parquetSchema;
+    private final Map<String, String> keyValueMetadata;
+    private final List<InputFile> inputFiles;
+    private final List<Long> fileSizes;
+    private final long totalRowGroupCount;
+
+    private FileSchemaContext(
+        MessageType parquetSchema,
+        Map<String, String> keyValueMetadata,
+        List<InputFile> inputFiles,
+        List<Long> fileSizes,
+        long totalRowGroupCount) {
+      this.parquetSchema = parquetSchema;
+      this.keyValueMetadata = keyValueMetadata;
+      this.inputFiles = inputFiles;
+      this.fileSizes = fileSizes;
+      this.totalRowGroupCount = totalRowGroupCount;
+    }
+  }
+
   public IcebergRewriteExecutor(
       RewriteFilesInput input, MixedTable table, Map<String, String> 
properties) {
     super(input, table, properties);
@@ -112,4 +159,315 @@ public class IcebergRewriteExecutor extends 
AbstractRewriteFilesExecutor {
   private PartitionSpec fileSpec() {
     return table.asUnkeyedTable().specs().get(input.allFiles()[0].specId());
   }
+
+  @Override
+  protected List<DataFile> rewriterDataFiles() throws Exception {
+    if (!canParquetRowGroupMerge()) {
+      return super.rewriterDataFiles();
+    }
+
+    try {
+      LOG.debug(
+          "Starting parquet row-group merge for {} files.", 
input.rewrittenDataFiles().length);
+      List<DataFile> dataFiles = parquetRowGroupMergeFiles();
+      LOG.debug("Parquet row-group merge completed successfully");
+
+      return dataFiles;
+    } catch (Exception e) {
+      LOG.warn("Parquet row-group merge failed, falling back to row-based 
rewrite", e);
+      return super.rewriterDataFiles();
+    }
+  }
+
+  protected boolean canParquetRowGroupMerge() {
+    fileSchemaContext = null;
+    return isParquetRowGroupMergeEnabled()
+        && isTableVersionAllowed()
+        && isTableUnsorted()
+        && isParquetFormat()
+        && hasNoEncryptedDataFiles()
+        && hasNoReadOnlyDeleteFiles()
+        && hasNoRewrittenDeleteFiles()
+        && hasNoBloomFilter()
+        && allFilesHaveCurrentSpecId()
+        && canPrepareParquetSchemaContext();
+  }
+
+  private boolean isTableVersionAllowed() {
+    if (!(table.asUnkeyedTable() instanceof HasTableOperations)) {
+      return checkCondition(false, "table operations are unavailable");
+    }
+
+    HasTableOperations tableWithOperations = (HasTableOperations) 
table.asUnkeyedTable();
+    int formatVersion = 
tableWithOperations.operations().current().formatVersion();
+
+    // Keep parquet row-group merge scoped to V2 tables for now. Iceberg V3 
row-lineage
+    // semantics (_row_id, _last_updated_sequence_number) are still unsettled 
here,
+    // so V3 tables fall back to row-based rewrite.
+    return checkCondition(formatVersion < 3, "table format version is " + 
formatVersion);
+  }
+
+  private boolean isTableUnsorted() {
+    return checkCondition(!table.asUnkeyedTable().sortOrder().isSorted(), 
"table has sort order");
+  }
+
+  private boolean hasNoEncryptedDataFiles() {
+    for (DataFile file : input.rewrittenDataFiles()) {
+      if (!checkCondition(
+          file.keyMetadata() == null, "input file is encrypted: " + 
file.location())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check whether all input Parquet file schemas are consistent; if so, build 
and return
+   * FileSchemaContext, otherwise return null.
+   */
+  private FileSchemaContext checkSchemaAndBuildContext() {
+    MessageType firstSchema = null;
+    Map<String, String> firstMetadata = null;
+    List<InputFile> inputFiles = new 
ArrayList<>(input.rewrittenDataFiles().length);
+    List<Long> fileSizes = new ArrayList<>(input.rewrittenDataFiles().length);
+    long totalRowGroupCount = 0;
+
+    for (DataFile file : input.rewrittenDataFiles()) {
+      InputFile inputFile = io.newInputFile(file.location(), 
file.fileSizeInBytes());
+      inputFiles.add(inputFile);
+      fileSizes.add(file.fileSizeInBytes());
+
+      try (ParquetFileReader reader = 
ParquetFileReader.open(ParquetIOBridge.file(inputFile))) {
+        totalRowGroupCount += reader.getRowGroups().size();
+        // Get parquet schema from file metadata.
+        MessageType currentSchema = reader.getFileMetaData().getSchema();
+        if (firstSchema == null) {
+          firstSchema = currentSchema;
+          firstMetadata = new 
HashMap<>(reader.getFileMetaData().getKeyValueMetaData());
+          continue;
+        }
+
+        if (!firstSchema.equals(currentSchema)) {
+          throw new IllegalStateException(
+              "The input parquet files have inconsistent schemas and cannot be 
merged.");
+        }
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to read parquet file: " + 
file.location(), e);
+      }
+    }
+
+    if (firstSchema == null) {
+      throw new IllegalStateException(
+          "No valid input parquet files are available for parquet row-group 
merge.");
+    }
+
+    return new FileSchemaContext(
+        firstSchema, firstMetadata, inputFiles, fileSizes, totalRowGroupCount);
+  }
+
+  protected List<DataFile> parquetRowGroupMergeFiles() throws Exception {
+    List<DataFile> outputFiles = new ArrayList<>();
+    OutputFileFactory outputFileFactory = newRowGroupMergeOutputFileFactory();
+    if (fileSchemaContext == null) {
+      throw new IllegalStateException(
+          "Parquet row-group merge context is not prepared. Call 
canParquetRowGroupMerge() first.");
+    }
+
+    long maxOutputSize = targetSize();
+    long currentOutputSize = 0L;
+    ParquetFileMergeRunner parquetFileMergeRunner = null;
+
+    try {
+      for (int i = 0; i < fileSchemaContext.inputFiles.size(); i++) {
+        InputFile inputFile = fileSchemaContext.inputFiles.get(i);
+        long fileSize = fileSchemaContext.fileSizes.get(i);
+        // Roll to a new output file when appending the next input would 
exceed target size.
+        if (currentOutputSize > 0 && currentOutputSize + fileSize > 
maxOutputSize) {
+          outputFiles.add(parquetFileMergeRunner.result());
+          parquetFileMergeRunner = null;
+        }
+        if (parquetFileMergeRunner == null) {
+          parquetFileMergeRunner =
+              new ParquetFileMergeRunner(
+                  io,
+                  fileSchemaContext.parquetSchema,
+                  fileSchemaContext.keyValueMetadata,
+                  table.spec(),
+                  partition(),
+                  parquetRowGroupSize(),
+                  metricsConfig());
+          EncryptedOutputFile outputFile = 
newRowGroupMergeOutputFile(outputFileFactory);
+          parquetFileMergeRunner.start(outputFile);
+          currentOutputSize = 0L;
+        }
+
+        parquetFileMergeRunner.appendFile(inputFile);
+        currentOutputSize += fileSize;
+      }
+
+      if (parquetFileMergeRunner != null) {
+        outputFiles.add(parquetFileMergeRunner.result());
+        parquetFileMergeRunner = null;
+      }
+
+      return outputFiles;
+    } catch (Exception e) {
+      // Clean up already finalized rollover outputs to avoid leaving orphan 
files before fallback.
+      cleanupMergedOutputFiles(outputFiles);
+      throw e;
+    } finally {
+      fileSchemaContext = null;
+      if (parquetFileMergeRunner != null) {
+        parquetFileMergeRunner.close();
+      }
+    }
+  }
+
+  private boolean canPrepareParquetSchemaContext() {
+    try {
+      fileSchemaContext = checkSchemaAndBuildContext();
+      if (fileSchemaContext.totalRowGroupCount > 0) {
+        long totalInputSize = 
fileSchemaContext.fileSizes.stream().mapToLong(Long::longValue).sum();
+        long avgRowGroupSize = totalInputSize / 
fileSchemaContext.totalRowGroupCount;
+        long minAvgRowGroupSize =
+            PropertyUtil.propertyAsLong(
+                table.properties(),
+                
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+                
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT);
+
+        if (avgRowGroupSize < minAvgRowGroupSize) {
+          LOG.debug(
+              "Skip parquet row-group merge: avg row-group size {} bytes ({} 
input files, {} row groups) "
+                  + "is below threshold {} bytes ({}), falling back to 
row-level rewrite",
+              avgRowGroupSize,
+              fileSchemaContext.inputFiles.size(),
+              fileSchemaContext.totalRowGroupCount,
+              minAvgRowGroupSize,
+              
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES);
+          return false;
+        }
+      }
+      return true;
+    } catch (IllegalStateException e) {
+      return checkCondition(false, e.getMessage());
+    }
+  }
+
+  private void cleanupMergedOutputFiles(List<DataFile> outputFiles) {
+    for (DataFile outputFile : outputFiles) {
+      try {
+        io.deleteFile(outputFile.location());
+      } catch (RuntimeException e) {
+        LOG.warn(
+            "Failed to delete merged parquet output during cleanup: {}", 
outputFile.location(), e);
+      }
+    }
+  }
+
+  private boolean isParquetRowGroupMergeEnabled() {
+    return checkCondition(
+        PropertyUtil.propertyAsBoolean(
+            table.properties(),
+            SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED,
+            
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT),
+        SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED + " is not 
enabled");
+  }
+
+  /**
+   * check if all files are in parquet format, and the table default file 
format is parquet. Only
+   * when both conditions are satisfied, we can do parquet row-group merge.
+   *
+   * @return result of the check
+   */
+  private boolean isParquetFormat() {
+    // 1、table default file format is parquet
+    if (!checkCondition(
+        dataFileFormat() == FileFormat.PARQUET, "table default file format is 
not PARQUET")) {
+      return false;
+    }
+
+    // 2、all files are in parquet format
+    for (DataFile file : input.rewrittenDataFiles()) {
+      if (!checkCondition(
+          file.format() == FileFormat.PARQUET,
+          "input file format is not PARQUET: " + file.format())) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private boolean hasNoReadOnlyDeleteFiles() {
+    return checkCondition(
+        ArrayUtils.isEmpty(input.readOnlyDeleteFiles()), "has read-only delete 
files");
+  }
+
+  private boolean hasNoRewrittenDeleteFiles() {
+    return checkCondition(
+        ArrayUtils.isEmpty(input.rewrittenDeleteFiles()), "has rewritten 
delete files");
+  }
+
+  private boolean hasNoBloomFilter() {
+    // Check if bloom filter is enabled by looking for properties
+    // with the prefix 'write.parquet.bloom-filter-enabled' and value 'true'
+    // such as 'write.parquet.bloom-filter-enabled.default' or
+    // 'write.parquet.bloom-filter-enabled.column.'.
+    for (Map.Entry<String, String> entry : table.properties().entrySet()) {
+      if (entry.getKey().startsWith(PARQUET_BLOOM_FILTER_ENABLED_PREFIX)
+          && Boolean.parseBoolean(entry.getValue())) {
+        return checkCondition(false, "table has bloom filter");
+      }
+    }
+
+    return true;
+  }
+
+  private long parquetRowGroupSize() {
+    return PropertyUtil.propertyAsLong(
+        table.properties(),
+        TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+        TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+  }
+
+  private boolean allFilesHaveCurrentSpecId() {
+    int currentSpecId = table.spec().specId();
+    for (DataFile file : input.rewrittenDataFiles()) {
+      if (!checkCondition(
+          file.specId() == currentSpecId,
+          "file spec id does not match current spec id: file ="
+              + file.specId()
+              + ", current ="
+              + currentSpecId)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean checkCondition(boolean condition, String message) {
+    if (!condition) {
+      LOG.debug("Skip parquet row-group merge due to {}", message);
+    }
+    return condition;
+  }
+
+  private MetricsConfig metricsConfig() {
+    return table.isKeyedTable()
+        ? MetricsConfig.forTable(table.asKeyedTable().baseTable())
+        : MetricsConfig.forTable(table.asUnkeyedTable());
+  }
+
+  private OutputFileFactory newRowGroupMergeOutputFileFactory() {
+    return OutputFileFactory.builderFor(table.asUnkeyedTable(), 
table.spec().specId(), 0)
+        .format(FileFormat.PARQUET)
+        .ioSupplier(() -> io)
+        .build();
+  }
+
+  private EncryptedOutputFile newRowGroupMergeOutputFile(OutputFileFactory 
outputFileFactory) {
+    return table.spec().isUnpartitioned()
+        ? outputFileFactory.newOutputFile()
+        : outputFileFactory.newOutputFile(table.spec(), partition());
+  }
 }
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
new file mode 100644
index 000000000..2a46e535c
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetIOBridge;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This component merges one or more parquet files into a single output 
parquet file without
+ * row-level decode/encode, then builds an Iceberg {@link 
org.apache.iceberg.DataFile} for commit.
+ *
+ * <p>Lifecycle:
+ *
+ * <ol>
+ *   <li>{@link #start(org.apache.iceberg.encryption.EncryptedOutputFile)} to 
initialize writer
+ *   <li>{@link #appendFile(org.apache.iceberg.io.InputFile)} for each source 
file
+ *   <li>{@link #result()} to finalize and build output {@link 
org.apache.iceberg.DataFile}
+ *   <li>{@link #close()} for safe cleanup (also supports failure cleanup)
+ * </ol>
+ *
+ * <p>Failure behavior:
+ *
+ * <ul>
+ *   <li>If writer is not successful, {@link #close()} attempts to delete 
output file
+ *   <li>If writer close fails, the exception is propagated
+ * </ul>
+ */
+public final class ParquetFileMergeRunner implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParquetFileMergeRunner.class);
+
+  private final MessageType parquetSchema;
+  private final Map<String, String> keyValueMetadata;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+  private final MetricsConfig metricsConfig;
+  private final FileIO io;
+
+  private ParquetFileWriter writer;
+  private EncryptedOutputFile outputFile;
+  private boolean success;
+  private final long rowGroupSize;
+
+  public ParquetFileMergeRunner(
+      FileIO io,
+      MessageType parquetSchema,
+      Map<String, String> keyValueMetadata,
+      PartitionSpec spec,
+      StructLike partition,
+      long rowGroupSize,
+      MetricsConfig metricsConfig) {
+    this.io = io;
+    this.parquetSchema = parquetSchema;
+    this.keyValueMetadata =
+        keyValueMetadata == null ? new HashMap<>() : new 
HashMap<>(keyValueMetadata);
+    this.spec = spec;
+    this.partition = partition;
+    this.rowGroupSize = rowGroupSize;
+    this.metricsConfig = metricsConfig;
+  }
+
+  /**
+   * Initializes the ParquetFileWriter for row-group merge.
+   *
+   * @param output encrypted output file destination
+   * @throws java.io.IOException if writer creation or writer start fails
+   */
+  public void start(EncryptedOutputFile output) throws IOException {
+    this.outputFile = output;
+    writer =
+        new ParquetFileWriter(
+            ParquetIOBridge.file(output.encryptingOutputFile()),
+            parquetSchema,
+            ParquetFileWriter.Mode.CREATE,
+            rowGroupSize,
+            0);
+    writer.start();
+  }
+
+  public void appendFile(InputFile inputFile) throws IOException {
+    if (writer == null) {
+      throw new IllegalStateException("ParquetFileWriter not initialized. Call 
start() first.");
+    }
+
+    org.apache.parquet.io.InputFile file = ParquetIOBridge.file(inputFile);
+    writer.appendFile(file);
+  }
+
+  public DataFile result() throws IOException {
+    if (writer == null) {
+      throw new IllegalStateException("ParquetFileWriter not initialized. Call 
start() first.");
+    }
+    if (success) {
+      throw new IllegalStateException("result() already called.");
+    }
+
+    writer.end(keyValueMetadata);
+    ParquetMetadata footer = writer.getFooter();
+    writer.close();
+    writer = null;
+
+    Metrics metrics = ParquetUtil.footerMetrics(footer, Stream.empty(), 
metricsConfig);
+    List<Long> splitOffsets =
+        
footer.getBlocks().stream().map(BlockMetaData::getStartingPos).collect(Collectors.toList());
+    long recordCount = 
footer.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum();
+    long fileSizeInBytes =
+        
io.newInputFile(outputFile.encryptingOutputFile().location()).getLength();
+
+    DataFile result =
+        DataFiles.builder(spec)
+            .withEncryptedOutputFile(outputFile)
+            .withFormat(FileFormat.PARQUET)
+            .withFileSizeInBytes(fileSizeInBytes)
+            .withMetrics(metrics)
+            .withPartition(partition)
+            .withSplitOffsets(splitOffsets)
+            .withRecordCount(recordCount)
+            .build();
+    success = true;
+    return result;
+  }
+
+  /**
+   * Closes writer and performs failure cleanup.
+   *
+   * <p>If writer did not succeed, this method attempts to delete the output 
file to avoid leaving
+   * partial artifacts. Cleanup deletion failures are logged but not thrown.
+   *
+   * @throws java.io.IOException when closing writer fails
+   */
+  @Override
+  public void close() throws IOException {
+    IOException closeFailure = null;
+    if (writer != null) {
+      try {
+        writer.close();
+      } catch (IOException e) {
+        closeFailure = e;
+      } finally {
+        writer = null;
+      }
+    }
+
+    // Clean up output file if writer did not succeed.
+    if (!success && outputFile != null) {
+      String location = outputFile.encryptingOutputFile().location();
+      try {
+        io.deleteFile(location);
+      } catch (RuntimeException e) {
+        LOG.warn("Failed to delete output file during cleanup: {}", location, 
e);
+      }
+    }
+
+    if (closeFailure != null) {
+      throw closeFailure;
+    }
+  }
+}
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 0905acf42..6c17f6ad0 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -173,6 +173,18 @@ public class TableProperties {
   public static final long 
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT =
       30000; // 30s
 
+  public static final String 
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED =
+      "self-optimizing.rewrite.use-parquet-row-group-merge.enabled";
+  public static final boolean 
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT =
+      false;
+
+  public static final String
+      
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES
 =
+          
"self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-size-bytes";
+  public static final long
+      
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT
 =
+          8388608; // 8 MB
+
   /**
    * The retention period for snapshots created by Flink checkpoints. 
Snapshots older than this
    * duration may be cleaned up. Avoid keeping the last flink checkpoint 
snapshot for too long, as
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
new file mode 100644
index 000000000..e35c6cbcc
--- /dev/null
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+
+/**
+ * Bridge for reusing Iceberg's package-scoped {@link ParquetIO} conversions 
from parquet row-group
+ * merge path. This class lives in the same package as {@link ParquetIO} on 
purpose to convert
+ * Iceberg {@link org.apache.iceberg.io.InputFile}/{@link 
org.apache.iceberg.io.OutputFile}
+ * instances into Parquet file abstractions without re-implementing Iceberg's 
adapter logic.
+ */
+public final class ParquetIOBridge {
+  private ParquetIOBridge() {}
+
+  public static org.apache.parquet.io.InputFile file(InputFile file) {
+    return ParquetIO.file(file);
+  }
+
+  public static org.apache.parquet.io.OutputFile file(OutputFile file) {
+    return ParquetIO.file(file);
+  }
+}
diff --git 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
index 37b79036c..71b4b16d2 100644
--- 
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
+++ 
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.amoro.optimizing;
 
+import static 
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED;
+
 import org.apache.amoro.BasicTableTestHelper;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.catalog.BasicCatalogTestHelper;
@@ -28,11 +30,14 @@ import 
org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.avro.Avro;
@@ -48,6 +53,7 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.Pair;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -57,6 +63,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -208,6 +215,368 @@ public class IcebergRewriteExecutorTest extends 
TableTestBase {
     readAllData();
   }
 
+  @Test
+  public void testParquetRowGroupMergeEnableCondition() throws IOException {
+    // parquet row-group merge should only be enabled for Parquet files.
+    if (fileFormat != FileFormat.PARQUET) {
+      Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+      return;
+    }
+
+    // All parquet row-group merge preconditions are satisfied.
+    prepareParquetRowGroupMergeTableConditions(true);
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if the table property is not 
enabled.
+    prepareParquetRowGroupMergeTableConditions(false);
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if there are delete files.
+    prepareParquetRowGroupMergeTableConditions(true);
+    RewriteFilesInput rewrittenDeleteInput =
+        new RewriteFilesInput(
+            dataScanTask.rewrittenDataFiles(),
+            dataScanTask.rePosDeletedDataFiles(),
+            new DeleteFile[] {},
+            new DeleteFile[] {(DeleteFile) scanTask.readOnlyDeleteFiles()[0]},
+            getMixedTable());
+    
Assert.assertFalse(newExecutor(rewrittenDeleteInput).canParquetRowGroupMerge());
+
+    rewrittenDeleteInput =
+        new RewriteFilesInput(
+            dataScanTask.rewrittenDataFiles(),
+            dataScanTask.rePosDeletedDataFiles(),
+            new DeleteFile[] {(DeleteFile) scanTask.readOnlyDeleteFiles()[0]},
+            new DeleteFile[] {},
+            getMixedTable());
+    
Assert.assertFalse(newExecutor(rewrittenDeleteInput).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if the bloom filter is 
enabled.
+    resetParquetRowGroupMergeTestState();
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateProperties()
+        .set(
+            
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + 
"id",
+            "true")
+        .commit();
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if the global bloom filter 
switch is enabled.
+    resetParquetRowGroupMergeTestState();
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateProperties()
+        .set("write.parquet.bloom-filter-enabled.default", "true")
+        .commit();
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if the table spec evolves so 
that the source files
+    // no longer match the partition spec.
+    resetParquetRowGroupMergeTestState();
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+    
getMixedTable().asUnkeyedTable().updateSpec().addField(Expressions.month("op_time")).commit();
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    // parquet row-group merge should be disabled if table has a sort order.
+    resetParquetRowGroupMergeTestState();
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+    getMixedTable().asUnkeyedTable().replaceSortOrder().asc("id").commit();
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+  }
+
+  @Test
+  public void testParquetRowGroupMergeWithEncryptedDataFile() throws 
IOException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+    prepareParquetRowGroupMergeTableConditions(true);
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    DataFile encryptedDataFile =
+        DataFiles.builder(getMixedTable().spec())
+            .copy(dataScanTask.rewrittenDataFiles()[0])
+            .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {1}))
+            .build();
+
+    RewriteFilesInput encryptedInput =
+        new RewriteFilesInput(
+            new DataFile[] {encryptedDataFile},
+            new DataFile[] {},
+            new DeleteFile[] {},
+            new DeleteFile[] {},
+            getMixedTable());
+    // parquet row-group merge should be disabled if the data file is 
encrypted.
+    Assert.assertFalse(newExecutor(encryptedInput).canParquetRowGroupMerge());
+  }
+
+  @Test
+  public void testParquetRowGroupMergeDisabledForV3Table() throws IOException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+    prepareParquetRowGroupMergeTableConditions(true);
+    Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+    HasTableOperations tableWithOperations = (HasTableOperations) 
getMixedTable().asUnkeyedTable();
+    TableMetadata current = tableWithOperations.operations().current();
+    tableWithOperations.operations().commit(current, 
current.upgradeToFormatVersion(3));
+    getMixedTable().asUnkeyedTable().refresh();
+
+    Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+  }
+
+  @Test
+  public void testParquetRowGroupMergeExecuteSplitAndCommitDataIntegrity() 
throws IOException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+    prepareParquetRowGroupMergeTableConditions(true);
+
+    // Create 5 data files, and each data file has 3 records.
+    List<List<Record>> inputRecordGroups = 
parquetRowGroupMergeSourceRecordGroups();
+    List<DataFile> sourceDataFiles = 
Lists.newArrayListWithExpectedSize(inputRecordGroups.size());
+    long txId = 1L;
+    for (List<Record> records : inputRecordGroups) {
+      sourceDataFiles.addAll(
+          MixedDataTestHelpers.writeAndCommitBaseStore(getMixedTable(), 
txId++, records, false));
+    }
+
+    // Set the targetSize
+    long splitTargetSize =
+        sourceDataFiles.get(0).fileSizeInBytes()
+            + sourceDataFiles.get(1).fileSizeInBytes()
+            + sourceDataFiles.get(2).fileSizeInBytes();
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateProperties()
+        .set(
+            org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
+            String.valueOf(splitTargetSize))
+        .commit();
+
+    // Build a rewrite input with only data files and no delete files.
+    RewriteFilesInput rewriteFilesInput = newDataOnlyInput(sourceDataFiles);
+    IcebergRewriteExecutor executor = newExecutor(rewriteFilesInput);
+
+    // Verify parquet row-group merge eligibility and execute the merge.
+    Assert.assertTrue(executor.canParquetRowGroupMerge());
+
+    RewriteFilesOutput output = executor.execute();
+    // With this target, the first 3 source files are grouped into one output 
and
+    // the last 2 into another.
+    Assert.assertEquals(2, output.getDataFiles().length);
+
+    // Commit merge result and validate the iceberg table data contains all 
records from the
+    // source files.
+    getMixedTable()
+        .asUnkeyedTable()
+        .newRewrite()
+        .rewriteFiles(Sets.newHashSet(sourceDataFiles), 
Sets.newHashSet(output.getDataFiles()))
+        .commit();
+
+    // Get the source records
+    Map<Integer, Record> expectedRecordsById = new HashMap<>();
+    for (List<Record> recordGroup : inputRecordGroups) {
+      for (Record record : recordGroup) {
+        expectedRecordsById.put((Integer) record.get(0), record);
+      }
+    }
+
+    // Get the current records after merge and commit, and compare with the 
source records.
+    List<Record> currentRecords =
+        MixedDataTestHelpers.readBaseStore(getMixedTable(), 
Expressions.alwaysTrue());
+    Assert.assertEquals(15, currentRecords.size());
+    for (Record currentRecord : currentRecords) {
+      Integer id = (Integer) currentRecord.get(0);
+      Record expectedRecord = expectedRecordsById.get(id);
+      // Compare all fields to ensure the record is correctly merged.
+      Assert.assertEquals(expectedRecord.get(0), currentRecord.get(0));
+      Assert.assertEquals(expectedRecord.get(1), currentRecord.get(1));
+      Assert.assertEquals(expectedRecord.get(2), currentRecord.get(2));
+      Assert.assertEquals(expectedRecord.get(3), currentRecord.get(3));
+    }
+  }
+
+  @Test
+  public void testParquetRowGroupMergeWithSchemaDifferent() throws IOException 
{
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+    prepareParquetRowGroupMergeTableConditions(true);
+
+    // Scenario 1: schema differs by adding new field.
+    List<DataFile> sourceDataFiles = Lists.newArrayList();
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            301L,
+            Collections.singletonList(
+                MixedDataTestHelpers.createRecord(301, "field_old", 11L, 
"1970-01-03T08:00:00")),
+            false));
+    // add new field
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateSchema()
+        .addColumn("new_col", Types.StringType.get())
+        .commit();
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            302L,
+            Collections.singletonList(
+                MixedDataTestHelpers.createRecord(
+                    getMixedTable().schema(), 302, "field_new", 12L, 
"1970-01-04T08:00:00", "x")),
+            false));
+
+    RewriteFilesInput rewriteInput = newDataOnlyInput(sourceDataFiles);
+    IcebergRewriteExecutor executor = newExecutor(rewriteInput);
+
+    Assert.assertFalse(executor.canParquetRowGroupMerge());
+
+    // Scenario 2: schema differs by type promotion.
+    resetParquetRowGroupMergeTestState();
+    sourceDataFiles = Lists.newArrayList();
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            401L,
+            Collections.singletonList(
+                MixedDataTestHelpers.createRecord(401, "type_old", 21L, 
"1970-01-05T08:00:00")),
+            false));
+    // change the type
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateSchema()
+        .updateColumn("id", Types.LongType.get())
+        .commit();
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            402L,
+            Collections.singletonList(
+                MixedDataTestHelpers.createRecord(
+                    getMixedTable().schema(), 402L, "type_new", 22L, 
"1970-01-06T08:00:00")),
+            false));
+
+    rewriteInput = newDataOnlyInput(sourceDataFiles);
+    executor = newExecutor(rewriteInput);
+
+    Assert.assertFalse(executor.canParquetRowGroupMerge());
+  }
+
+  @Test
+  public void testParquetRowGroupMergeMinAvgRowGroupSizeThreshold() throws 
IOException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+    prepareParquetRowGroupMergeTableConditions(true);
+
+    List<DataFile> sourceDataFiles = Lists.newArrayList();
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            1L,
+            Arrays.asList(
+                MixedDataTestHelpers.createRecord(1, "a", 0L, 
"1970-01-01T08:00:00"),
+                MixedDataTestHelpers.createRecord(2, "b", 1L, 
"1970-01-01T08:00:00")),
+            false));
+    sourceDataFiles.addAll(
+        MixedDataTestHelpers.writeAndCommitBaseStore(
+            getMixedTable(),
+            2L,
+            Arrays.asList(
+                MixedDataTestHelpers.createRecord(3, "c", 2L, 
"1970-01-02T08:00:00"),
+                MixedDataTestHelpers.createRecord(4, "d", 3L, 
"1970-01-02T08:00:00")),
+            false));
+
+    // We compute the actual avg row-group size from file metadata and use it 
to set thresholds.
+    // Each small test file has exactly 1 row group, so avgRowGroupSize = 
totalFileSize / 2.
+    long totalFileSize = 
sourceDataFiles.stream().mapToLong(DataFile::fileSizeInBytes).sum();
+    int totalRowGroups = sourceDataFiles.size();
+    long avgRowGroupSize = totalFileSize / totalRowGroups;
+
+    RewriteFilesInput rewriteInput = newDataOnlyInput(sourceDataFiles);
+    // Case 1: threshold = avgRowGroupSize + 1 < threshold → merge disabled.
+    setMinAvgRowGroupSizeThreshold(avgRowGroupSize + 1);
+    boolean result = newExecutor(rewriteInput).canParquetRowGroupMerge();
+    Assert.assertFalse(result);
+
+    // Case 2: threshold = avgRowGroupSize >= threshold → merge enabled 
(boundary).
+    setMinAvgRowGroupSizeThreshold(avgRowGroupSize);
+    result = newExecutor(rewriteInput).canParquetRowGroupMerge();
+    Assert.assertTrue(result);
+  }
+
+  private void setMinAvgRowGroupSizeThreshold(long threshold) {
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateProperties()
+        .set(
+            org.apache.amoro.table.TableProperties
+                
.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+            String.valueOf(threshold))
+        .commit();
+    getMixedTable().asUnkeyedTable().refresh();
+  }
+
+  private RewriteFilesInput newDataOnlyInput(List<DataFile> dataFiles) {
+    DataFile[] wrappedDataFiles = new DataFile[dataFiles.size()];
+    for (int i = 0; i < dataFiles.size(); i++) {
+      wrappedDataFiles[i] =
+          MixedDataTestHelpers.wrapIcebergDataFile(dataFiles.get(i), (long) i 
+ 1);
+    }
+
+    return new RewriteFilesInput(
+        wrappedDataFiles,
+        new DataFile[] {},
+        new DeleteFile[] {},
+        new DeleteFile[] {},
+        getMixedTable());
+  }
+
+  private IcebergRewriteExecutor newExecutor(RewriteFilesInput input) {
+    return new IcebergRewriteExecutor(input, getMixedTable(), 
Collections.emptyMap());
+  }
+
+  private void resetParquetRowGroupMergeTestState() throws IOException {
+    dropTable();
+    setupTable();
+    initDataAndReader();
+    prepareParquetRowGroupMergeTableConditions(true);
+  }
+
+  private List<List<Record>> parquetRowGroupMergeSourceRecordGroups() {
+    return Arrays.asList(
+        Arrays.asList(
+            MixedDataTestHelpers.createRecord(11, "john", 0, 
"1970-01-01T08:00:00"),
+            MixedDataTestHelpers.createRecord(21, "lily", 1, 
"1970-01-01T08:00:00"),
+            MixedDataTestHelpers.createRecord(31, "sam", 2, 
"1970-01-01T08:00:00")),
+        Arrays.asList(
+            MixedDataTestHelpers.createRecord(41, "tom", 3, 
"1970-01-02T08:00:00"),
+            MixedDataTestHelpers.createRecord(51, "lucy", 4, 
"1970-01-02T08:00:00"),
+            MixedDataTestHelpers.createRecord(61, "kate", 5, 
"1970-01-02T08:00:00")),
+        Arrays.asList(
+            MixedDataTestHelpers.createRecord(71, "ben", 6, 
"1970-01-03T08:00:00"),
+            MixedDataTestHelpers.createRecord(81, "mia", 7, 
"1970-01-03T08:00:00"),
+            MixedDataTestHelpers.createRecord(91, "zoe", 8, 
"1970-01-03T08:00:00")),
+        Arrays.asList(
+            MixedDataTestHelpers.createRecord(101, "max", 9, 
"1970-01-04T08:00:00"),
+            MixedDataTestHelpers.createRecord(111, "eve", 10, 
"1970-01-04T08:00:00"),
+            MixedDataTestHelpers.createRecord(121, "amy", 11, 
"1970-01-04T08:00:00")),
+        Arrays.asList(
+            MixedDataTestHelpers.createRecord(131, "leo", 12, 
"1970-01-05T08:00:00"),
+            MixedDataTestHelpers.createRecord(141, "ivy", 13, 
"1970-01-05T08:00:00"),
+            MixedDataTestHelpers.createRecord(151, "jay", 14, 
"1970-01-05T08:00:00")));
+  }
+
+  // Configure only the table properties that are relevant to parquet 
row-group merge eligibility
+  // checks. The min-avg-row-group-size is set to 0 so that small test files 
are not filtered
+  // out by the threshold gate.
+  private void prepareParquetRowGroupMergeTableConditions(boolean enabled) {
+    getMixedTable()
+        .asUnkeyedTable()
+        .updateProperties()
+        .set(SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED, 
String.valueOf(enabled))
+        .set(
+            org.apache.amoro.table.TableProperties
+                
.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+            "0")
+        .commit();
+  }
+
   @Test
   public void readOnlyData() throws IOException {
     IcebergRewriteExecutor executor =
diff --git a/docs/user-guides/configurations.md 
b/docs/user-guides/configurations.md
index 843823f2f..beb092022 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -43,35 +43,36 @@ modified through [Alter 
Table](../using-tables/#modify-table) operations.
 
 Self-optimizing configurations are applicable to both Iceberg Format and Mixed 
streaming Format.
 
-| Key                                                     | Default          | 
Description                                                                     
                                                                                
                                                                                
                                                      |
-|---------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled                                 | true             | 
Enables Self-optimizing                                                         
                                                                                
                                                                                
                                                      |
-| self-optimizing.allow-partial-commit                    | false            | 
Whether to allow partial commit when self-optimizing fails or process is 
cancelled                                                                       
                                                                                
                                                             |
-| self-optimizing.group                                   | default          | 
Optimizer group for Self-optimizing                                             
                                                                                
                                                                                
                                                      |
-| self-optimizing.quota                                   | 0.5              | 
Quota for Self-optimizing, indicating the optimizer resources the table can 
take up                                                                         
                                                                                
                                                          |
-| self-optimizing.execute.num-retries                     | 5                | 
Number of retries after failure of Self-optimizing                              
                                                                                
                                                                                
                                                      |
-| self-optimizing.target-size                             | 134217728(128MB) | 
Target size for Self-optimizing                                                 
                                                                                
                                                                                
                                                      |
-| self-optimizing.max-file-count                          | 10000            | 
Maximum number of files processed by a Self-optimizing process                  
                                                                                
                                                                                
                                                      |
-| self-optimizing.max-task-size-bytes                     | 134217728(128MB) | 
Maximum file size bytes in a single task for splitting tasks                    
                                                                                
                                                                                
                                                      |
-| self-optimizing.fragment-ratio                          | 8                | 
The fragment file size threshold. We could divide self-optimizing.target-size 
by this ratio to get the actual fragment file size                              
                                                                                
                                                        |
-| self-optimizing.min-target-size-ratio                   | 0.75             | 
The undersized segment file size threshold. Segment files under this threshold 
will be considered for rewriting                                                
                                                                                
                                                       |
-| self-optimizing.minor.trigger.file-count                | 12               | 
The minimum number of files to trigger minor optimizing is determined by the 
sum of fragment file count and equality delete file count                       
                                                                                
                                                         |
-| self-optimizing.minor.trigger.interval                  | 3600000(1 hour)  | 
The time interval in milliseconds to trigger minor optimizing                   
                                                                                
                                                                                
                                                      |
-| self-optimizing.major.trigger.duplicate-ratio           | 0.1              | 
The ratio of duplicate data of segment files to trigger major optimizing        
                                                                                
                                                                                
                                                      |
-| self-optimizing.full.trigger.interval                   | -1(closed)       | 
The time interval in milliseconds to trigger full optimizing                    
                                                                                
                                                                                
                                                      |
-| self-optimizing.full.rewrite-all-files                  | true             | 
Whether full optimizing rewrites all files or skips files that do not need to 
be optimized                                                                    
                                                                                
                                                        |
-| self-optimizing.min-plan-interval                       | 60000            | 
The minimum time interval between two self-optimizing planning action           
                                                                                
                                                                                
                                                      |
-| self-optimizing.filter                                  | NULL             | 
Filter conditions for self-optimizing, using SQL conditional expressions, 
without supporting any functions. For the timestamp column condition, the ISO 
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'. 
                                                              |
-| self-optimizing.refresh-table.adaptive.max-interval-ms  | 0                | 
The maximum time interval in milliseconds to refresh table metadata. 0 means 
disable adaptive refresh. When enabled, the value must be greater than 
'refresh-tables.interval' and may exceed 
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will 
be automatically disabled. |
-| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s)       | 
The time interval increase step in milliseconds to refresh table metadata       
                                                                                
                                                                                
                                                      |
-
-## Data-cleaning configurations
-
+| Key                                                                          
    | Default          | Description                                            
                                                                                
                                                                                
                                                                              |
+|----------------------------------------------------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled                                                      
    | true             | Enables Self-optimizing                                
                                                                                
                                                                                
                                                                              |
+| self-optimizing.allow-partial-commit                                         
    | false            | Whether to allow partial commit when self-optimizing 
fails or process is cancelled                                                   
                                                                                
                                                                                
|
+| self-optimizing.group                                                        
    | default          | Optimizer group for Self-optimizing                    
                                                                                
                                                                                
                                                                              |
+| self-optimizing.quota                                                        
    | 0.5              | Quota for Self-optimizing, indicating the optimizer 
resources the table can take up                                                 
                                                                                
                                                                                
 |
+| self-optimizing.execute.num-retries                                          
    | 5                | Number of retries after failure of Self-optimizing     
                                                                                
                                                                                
                                                                              |
+| self-optimizing.target-size                                                  
    | 134217728(128MB) | Target size for Self-optimizing                        
                                                                                
                                                                                
                                                                              |
+| self-optimizing.max-file-count                                               
    | 10000            | Maximum number of files processed by a Self-optimizing 
process                                                                         
                                                                                
                                                                              |
+| self-optimizing.max-task-size-bytes                                          
    | 134217728(128MB) | Maximum file size bytes in a single task for splitting 
tasks                                                                           
                                                                                
                                                                              |
+| self-optimizing.fragment-ratio                                               
    | 8                | The fragment file size threshold. We could divide 
self-optimizing.target-size by this ratio to get the actual fragment file size  
                                                                                
                                                                                
   |
+| self-optimizing.min-target-size-ratio                                        
    | 0.75             | The undersized segment file size threshold. Segment 
files under this threshold will be considered for rewriting                     
                                                                                
                                                                                
 |
+| self-optimizing.minor.trigger.file-count                                     
    | 12               | The minimum number of files to trigger minor 
optimizing is determined by the sum of fragment file count and equality delete 
file count                                                                      
                                                                                
         |
+| self-optimizing.minor.trigger.interval                                       
    | 3600000(1 hour)  | The time interval in milliseconds to trigger minor 
optimizing                                                                      
                                                                                
                                                                                
  |
+| self-optimizing.major.trigger.duplicate-ratio                                
    | 0.1              | The ratio of duplicate data of segment files to 
trigger major optimizing                                                        
                                                                                
                                                                                
     |
+| self-optimizing.full.trigger.interval                                        
    | -1(closed)       | The time interval in milliseconds to trigger full 
optimizing                                                                      
                                                                                
                                                                                
   |
+| self-optimizing.full.rewrite-all-files                                       
    | true             | Whether full optimizing rewrites all files or skips 
files that do not need to be optimized                                          
                                                                                
                                                                                
 |
+| self-optimizing.min-plan-interval                                            
    | 60000            | The minimum time interval between two self-optimizing 
planning action                                                                 
                                                                                
                                                                               |
+| self-optimizing.filter                                                       
    | NULL             | Filter conditions for self-optimizing, using SQL 
conditional expressions, without supporting any functions. For the timestamp 
column condition, the ISO date-time formatter must be used. For example: 
op_time > '2007-12-03T10:15:30'.                                                
              |
+| self-optimizing.refresh-table.adaptive.max-interval-ms                       
    | 0                | The maximum time interval in milliseconds to refresh 
table metadata. 0 means disable adaptive refresh. When enabled, the value must 
be greater than 'refresh-tables.interval' and may exceed 
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will 
be automatically disabled. |
+| self-optimizing.refresh-table.adaptive.increase-step-ms                      
    | 30000(30s)       | The time interval increase step in milliseconds to 
refresh table metadata                                                          
                                                                                
                                                                                
  |
+| self-optimizing.rewrite.use-parquet-row-group-merge.enabled                  
    | false | Whether to enable parquet row-group merge for eligible Iceberg V2 
table rewrites of Parquet files without delete files, bloom filters, sort 
order, schema incompatibilities, or spec mismatches; otherwise falls back to 
row-based rewrite.                                                          |
+| 
self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-size-bytes
 | 8388608(8MB) | The minimum average row-group size (in bytes) of input files 
for parquet row-group merge. If the average row-group size across all input 
files is below this threshold, it will fall back to row-level rewrite.          
                                                                            |
+
+## Data-cleaning configuratio
 Data-cleaning configurations are applicable to both Iceberg Format and Mixed 
streaming Format.
 
 | Key                                         | Default          | Description 
                                                                                
                                                                                
                                                                                
          |
 
|---------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| table-expire.enabled                        | true             | Enables 
periodically expire table                                                       
                                                                                
                                                                                
              |
+| table-expire.enabled                        | true             | Enables 
periodically expire table                                                       
                                                                                
                                                                                
              |1
 | change.data.ttl.minutes                     | 10080(7 days)    | Time to 
live in minutes for data of ChangeStore                                         
                                                                                
                                                                                
              |
 | snapshot.keep.duration                      | 720min(12 hours) | 
Table-Expiration keeps the latest snapshots within a specified duration         
                                                                                
                                                                                
                      |
 | snapshot.keep.min-count                     | 1                | Minimum 
number of snapshots retained for table expiration                               
                                                                                
                                                                                
              |

Reply via email to