This is an automated email from the ASF dual-hosted git repository.
zhoujinsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new e75b4a0b3 [AMORO-4186] Add support for Parquet row-group merging in
IcebergRewriteExecutor (#4188)
e75b4a0b3 is described below
commit e75b4a0b3cdba96a24ca7af1cd86833a1167dc99
Author: WenLingzhang <[email protected]>
AuthorDate: Wed May 20 10:51:31 2026 +0800
[AMORO-4186] Add support for Parquet row-group merging in
IcebergRewriteExecutor (#4188)
* Add support for Parquet row-group merging in IcebergRewriteExecutor
* Enhance Parquet row-group merge support for Iceberg V2 tables and add
version checks
* Add cleanup for orphaned merged output files in Parquet row-group merge
* Refactor ParquetFileMergeRunner to simplify ParquetWriter initialization
* Implement Parquet schema context preparation for row-group merging in
IcebergRewriteExecutor
* Add minimum average row-group size threshold for Parquet row-group merge
* fixup
* fixup style
Removed unnecessary line breaks in the class documentation.
* fixup style
---------
Co-authored-by: 张文领 <[email protected]>
---
.../optimizing/AbstractRewriteFilesExecutor.java | 2 +-
.../amoro/optimizing/IcebergRewriteExecutor.java | 358 ++++++++++++++++++++
.../amoro/optimizing/ParquetFileMergeRunner.java | 198 +++++++++++
.../org/apache/amoro/table/TableProperties.java | 12 +
.../apache/iceberg/parquet/ParquetIOBridge.java | 40 +++
.../optimizing/IcebergRewriteExecutorTest.java | 369 +++++++++++++++++++++
docs/user-guides/configurations.md | 51 +--
7 files changed, 1004 insertions(+), 26 deletions(-)
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
index 4446619f9..823596416 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/AbstractRewriteFilesExecutor.java
@@ -145,7 +145,7 @@ public abstract class AbstractRewriteFilesExecutor
return posDeleteWriter.result().deleteFiles();
}
- private List<DataFile> rewriterDataFiles() throws Exception {
+ protected List<DataFile> rewriterDataFiles() throws Exception {
List<DataFile> result = Lists.newArrayList();
TaskWriter<Record> writer = dataWriter();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
index 606be5754..3aa05ac78 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/IcebergRewriteExecutor.java
@@ -18,29 +18,76 @@
package org.apache.amoro.optimizing;
+import static
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED;
+import static
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT;
+import static
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES;
+import static
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT;
+
import org.apache.amoro.io.reader.GenericCombinedIcebergDataReader;
import org.apache.amoro.io.writer.GenericIcebergPartitionedFanoutWriter;
import org.apache.amoro.io.writer.IcebergFanoutPosDeleteWriter;
import org.apache.amoro.table.MixedTable;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.DeleteWriteResult;
import org.apache.iceberg.io.FileWriter;
+import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.parquet.ParquetIOBridge;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
/** OptimizingExecutor for iceberg format. */
public class IcebergRewriteExecutor extends AbstractRewriteFilesExecutor {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergRewriteExecutor.class);
+ private static final String PARQUET_BLOOM_FILTER_ENABLED_PREFIX =
+ "write.parquet.bloom-filter-enabled";
+ private FileSchemaContext fileSchemaContext;
+
+ private static class FileSchemaContext {
+ private final MessageType parquetSchema;
+ private final Map<String, String> keyValueMetadata;
+ private final List<InputFile> inputFiles;
+ private final List<Long> fileSizes;
+ private final long totalRowGroupCount;
+
+ private FileSchemaContext(
+ MessageType parquetSchema,
+ Map<String, String> keyValueMetadata,
+ List<InputFile> inputFiles,
+ List<Long> fileSizes,
+ long totalRowGroupCount) {
+ this.parquetSchema = parquetSchema;
+ this.keyValueMetadata = keyValueMetadata;
+ this.inputFiles = inputFiles;
+ this.fileSizes = fileSizes;
+ this.totalRowGroupCount = totalRowGroupCount;
+ }
+ }
+
public IcebergRewriteExecutor(
RewriteFilesInput input, MixedTable table, Map<String, String>
properties) {
super(input, table, properties);
@@ -112,4 +159,315 @@ public class IcebergRewriteExecutor extends
AbstractRewriteFilesExecutor {
private PartitionSpec fileSpec() {
return table.asUnkeyedTable().specs().get(input.allFiles()[0].specId());
}
+
+ @Override
+ protected List<DataFile> rewriterDataFiles() throws Exception {
+ if (!canParquetRowGroupMerge()) {
+ return super.rewriterDataFiles();
+ }
+
+ try {
+ LOG.debug(
+ "Starting parquet row-group merge for {} files.",
input.rewrittenDataFiles().length);
+ List<DataFile> dataFiles = parquetRowGroupMergeFiles();
+ LOG.debug("Parquet row-group merge completed successfully");
+
+ return dataFiles;
+ } catch (Exception e) {
+ LOG.warn("Parquet row-group merge failed, falling back to row-based
rewrite", e);
+ return super.rewriterDataFiles();
+ }
+ }
+
+ protected boolean canParquetRowGroupMerge() {
+ fileSchemaContext = null;
+ return isParquetRowGroupMergeEnabled()
+ && isTableVersionAllowed()
+ && isTableUnsorted()
+ && isParquetFormat()
+ && hasNoEncryptedDataFiles()
+ && hasNoReadOnlyDeleteFiles()
+ && hasNoRewrittenDeleteFiles()
+ && hasNoBloomFilter()
+ && allFilesHaveCurrentSpecId()
+ && canPrepareParquetSchemaContext();
+ }
+
+ private boolean isTableVersionAllowed() {
+ if (!(table.asUnkeyedTable() instanceof HasTableOperations)) {
+ return checkCondition(false, "table operations are unavailable");
+ }
+
+ HasTableOperations tableWithOperations = (HasTableOperations)
table.asUnkeyedTable();
+ int formatVersion =
tableWithOperations.operations().current().formatVersion();
+
+ // Keep parquet row-group merge scoped to V2 tables for now. Iceberg V3
row-lineage
+ // semantics (_row_id, _last_updated_sequence_number) are still unsettled
here,
+ // so V3 tables fall back to row-based rewrite.
+ return checkCondition(formatVersion < 3, "table format version is " +
formatVersion);
+ }
+
+ private boolean isTableUnsorted() {
+ return checkCondition(!table.asUnkeyedTable().sortOrder().isSorted(),
"table has sort order");
+ }
+
+ private boolean hasNoEncryptedDataFiles() {
+ for (DataFile file : input.rewrittenDataFiles()) {
+ if (!checkCondition(
+ file.keyMetadata() == null, "input file is encrypted: " +
file.location())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check whether all input Parquet file schemas are consistent; if so, build
and return
+ * FileSchemaContext, otherwise return null.
+ */
+ private FileSchemaContext checkSchemaAndBuildContext() {
+ MessageType firstSchema = null;
+ Map<String, String> firstMetadata = null;
+ List<InputFile> inputFiles = new
ArrayList<>(input.rewrittenDataFiles().length);
+ List<Long> fileSizes = new ArrayList<>(input.rewrittenDataFiles().length);
+ long totalRowGroupCount = 0;
+
+ for (DataFile file : input.rewrittenDataFiles()) {
+ InputFile inputFile = io.newInputFile(file.location(),
file.fileSizeInBytes());
+ inputFiles.add(inputFile);
+ fileSizes.add(file.fileSizeInBytes());
+
+ try (ParquetFileReader reader =
ParquetFileReader.open(ParquetIOBridge.file(inputFile))) {
+ totalRowGroupCount += reader.getRowGroups().size();
+ // Get parquet schema from file metadata.
+ MessageType currentSchema = reader.getFileMetaData().getSchema();
+ if (firstSchema == null) {
+ firstSchema = currentSchema;
+ firstMetadata = new
HashMap<>(reader.getFileMetaData().getKeyValueMetaData());
+ continue;
+ }
+
+ if (!firstSchema.equals(currentSchema)) {
+ throw new IllegalStateException(
+ "The input parquet files have inconsistent schemas and cannot be
merged.");
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to read parquet file: " +
file.location(), e);
+ }
+ }
+
+ if (firstSchema == null) {
+ throw new IllegalStateException(
+ "No valid input parquet files are available for parquet row-group
merge.");
+ }
+
+ return new FileSchemaContext(
+ firstSchema, firstMetadata, inputFiles, fileSizes, totalRowGroupCount);
+ }
+
+ protected List<DataFile> parquetRowGroupMergeFiles() throws Exception {
+ List<DataFile> outputFiles = new ArrayList<>();
+ OutputFileFactory outputFileFactory = newRowGroupMergeOutputFileFactory();
+ if (fileSchemaContext == null) {
+ throw new IllegalStateException(
+ "Parquet row-group merge context is not prepared. Call
canParquetRowGroupMerge() first.");
+ }
+
+ long maxOutputSize = targetSize();
+ long currentOutputSize = 0L;
+ ParquetFileMergeRunner parquetFileMergeRunner = null;
+
+ try {
+ for (int i = 0; i < fileSchemaContext.inputFiles.size(); i++) {
+ InputFile inputFile = fileSchemaContext.inputFiles.get(i);
+ long fileSize = fileSchemaContext.fileSizes.get(i);
+ // Roll to a new output file when appending the next input would
exceed target size.
+ if (currentOutputSize > 0 && currentOutputSize + fileSize >
maxOutputSize) {
+ outputFiles.add(parquetFileMergeRunner.result());
+ parquetFileMergeRunner = null;
+ }
+ if (parquetFileMergeRunner == null) {
+ parquetFileMergeRunner =
+ new ParquetFileMergeRunner(
+ io,
+ fileSchemaContext.parquetSchema,
+ fileSchemaContext.keyValueMetadata,
+ table.spec(),
+ partition(),
+ parquetRowGroupSize(),
+ metricsConfig());
+ EncryptedOutputFile outputFile =
newRowGroupMergeOutputFile(outputFileFactory);
+ parquetFileMergeRunner.start(outputFile);
+ currentOutputSize = 0L;
+ }
+
+ parquetFileMergeRunner.appendFile(inputFile);
+ currentOutputSize += fileSize;
+ }
+
+ if (parquetFileMergeRunner != null) {
+ outputFiles.add(parquetFileMergeRunner.result());
+ parquetFileMergeRunner = null;
+ }
+
+ return outputFiles;
+ } catch (Exception e) {
+ // Clean up already finalized rollover outputs to avoid leaving orphan
files before fallback.
+ cleanupMergedOutputFiles(outputFiles);
+ throw e;
+ } finally {
+ fileSchemaContext = null;
+ if (parquetFileMergeRunner != null) {
+ parquetFileMergeRunner.close();
+ }
+ }
+ }
+
+ private boolean canPrepareParquetSchemaContext() {
+ try {
+ fileSchemaContext = checkSchemaAndBuildContext();
+ if (fileSchemaContext.totalRowGroupCount > 0) {
+ long totalInputSize =
fileSchemaContext.fileSizes.stream().mapToLong(Long::longValue).sum();
+ long avgRowGroupSize = totalInputSize /
fileSchemaContext.totalRowGroupCount;
+ long minAvgRowGroupSize =
+ PropertyUtil.propertyAsLong(
+ table.properties(),
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT);
+
+ if (avgRowGroupSize < minAvgRowGroupSize) {
+ LOG.debug(
+ "Skip parquet row-group merge: avg row-group size {} bytes ({}
input files, {} row groups) "
+ + "is below threshold {} bytes ({}), falling back to
row-level rewrite",
+ avgRowGroupSize,
+ fileSchemaContext.inputFiles.size(),
+ fileSchemaContext.totalRowGroupCount,
+ minAvgRowGroupSize,
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES);
+ return false;
+ }
+ }
+ return true;
+ } catch (IllegalStateException e) {
+ return checkCondition(false, e.getMessage());
+ }
+ }
+
+ private void cleanupMergedOutputFiles(List<DataFile> outputFiles) {
+ for (DataFile outputFile : outputFiles) {
+ try {
+ io.deleteFile(outputFile.location());
+ } catch (RuntimeException e) {
+ LOG.warn(
+ "Failed to delete merged parquet output during cleanup: {}",
outputFile.location(), e);
+ }
+ }
+ }
+
+ private boolean isParquetRowGroupMergeEnabled() {
+ return checkCondition(
+ PropertyUtil.propertyAsBoolean(
+ table.properties(),
+ SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED,
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT),
+ SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED + " is not
enabled");
+ }
+
+ /**
+ * check if all files are in parquet format, and the table default file
format is parquet. Only
+ * when both conditions are satisfied, we can do parquet row-group merge.
+ *
+ * @return result of the check
+ */
+ private boolean isParquetFormat() {
+ // 1、table default file format is parquet
+ if (!checkCondition(
+ dataFileFormat() == FileFormat.PARQUET, "table default file format is
not PARQUET")) {
+ return false;
+ }
+
+ // 2、all files are in parquet format
+ for (DataFile file : input.rewrittenDataFiles()) {
+ if (!checkCondition(
+ file.format() == FileFormat.PARQUET,
+ "input file format is not PARQUET: " + file.format())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private boolean hasNoReadOnlyDeleteFiles() {
+ return checkCondition(
+ ArrayUtils.isEmpty(input.readOnlyDeleteFiles()), "has read-only delete
files");
+ }
+
+ private boolean hasNoRewrittenDeleteFiles() {
+ return checkCondition(
+ ArrayUtils.isEmpty(input.rewrittenDeleteFiles()), "has rewritten
delete files");
+ }
+
+ private boolean hasNoBloomFilter() {
+ // Check if bloom filter is enabled by looking for properties
+ // with the prefix 'write.parquet.bloom-filter-enabled' and value 'true'
+ // such as 'write.parquet.bloom-filter-enabled.default' or
+ // 'write.parquet.bloom-filter-enabled.column.'.
+ for (Map.Entry<String, String> entry : table.properties().entrySet()) {
+ if (entry.getKey().startsWith(PARQUET_BLOOM_FILTER_ENABLED_PREFIX)
+ && Boolean.parseBoolean(entry.getValue())) {
+ return checkCondition(false, "table has bloom filter");
+ }
+ }
+
+ return true;
+ }
+
+ private long parquetRowGroupSize() {
+ return PropertyUtil.propertyAsLong(
+ table.properties(),
+ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+ }
+
+ private boolean allFilesHaveCurrentSpecId() {
+ int currentSpecId = table.spec().specId();
+ for (DataFile file : input.rewrittenDataFiles()) {
+ if (!checkCondition(
+ file.specId() == currentSpecId,
+ "file spec id does not match current spec id: file ="
+ + file.specId()
+ + ", current ="
+ + currentSpecId)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkCondition(boolean condition, String message) {
+ if (!condition) {
+ LOG.debug("Skip parquet row-group merge due to {}", message);
+ }
+ return condition;
+ }
+
+ private MetricsConfig metricsConfig() {
+ return table.isKeyedTable()
+ ? MetricsConfig.forTable(table.asKeyedTable().baseTable())
+ : MetricsConfig.forTable(table.asUnkeyedTable());
+ }
+
+ private OutputFileFactory newRowGroupMergeOutputFileFactory() {
+ return OutputFileFactory.builderFor(table.asUnkeyedTable(),
table.spec().specId(), 0)
+ .format(FileFormat.PARQUET)
+ .ioSupplier(() -> io)
+ .build();
+ }
+
+ private EncryptedOutputFile newRowGroupMergeOutputFile(OutputFileFactory
outputFileFactory) {
+ return table.spec().isUnpartitioned()
+ ? outputFileFactory.newOutputFile()
+ : outputFileFactory.newOutputFile(table.spec(), partition());
+ }
}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
new file mode 100644
index 000000000..2a46e535c
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/optimizing/ParquetFileMergeRunner.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizing;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.ParquetIOBridge;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This component merges one or more parquet files into a single output
parquet file without
+ * row-level decode/encode, then builds an Iceberg {@link
org.apache.iceberg.DataFile} for commit.
+ *
+ * <p>Lifecycle:
+ *
+ * <ol>
+ * <li>{@link #start(org.apache.iceberg.encryption.EncryptedOutputFile)} to
initialize writer
+ * <li>{@link #appendFile(org.apache.iceberg.io.InputFile)} for each source
file
+ * <li>{@link #result()} to finalize and build output {@link
org.apache.iceberg.DataFile}
+ * <li>{@link #close()} for safe cleanup (also supports failure cleanup)
+ * </ol>
+ *
+ * <p>Failure behavior:
+ *
+ * <ul>
+ * <li>If writer is not successful, {@link #close()} attempts to delete
output file
+ * <li>If writer close fails, the exception is propagated
+ * </ul>
+ */
+public final class ParquetFileMergeRunner implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ParquetFileMergeRunner.class);
+
+ private final MessageType parquetSchema;
+ private final Map<String, String> keyValueMetadata;
+ private final PartitionSpec spec;
+ private final StructLike partition;
+ private final MetricsConfig metricsConfig;
+ private final FileIO io;
+
+ private ParquetFileWriter writer;
+ private EncryptedOutputFile outputFile;
+ private boolean success;
+ private final long rowGroupSize;
+
+ public ParquetFileMergeRunner(
+ FileIO io,
+ MessageType parquetSchema,
+ Map<String, String> keyValueMetadata,
+ PartitionSpec spec,
+ StructLike partition,
+ long rowGroupSize,
+ MetricsConfig metricsConfig) {
+ this.io = io;
+ this.parquetSchema = parquetSchema;
+ this.keyValueMetadata =
+ keyValueMetadata == null ? new HashMap<>() : new
HashMap<>(keyValueMetadata);
+ this.spec = spec;
+ this.partition = partition;
+ this.rowGroupSize = rowGroupSize;
+ this.metricsConfig = metricsConfig;
+ }
+
+ /**
+ * Initializes the ParquetFileWriter for row-group merge.
+ *
+ * @param output encrypted output file destination
+ * @throws java.io.IOException if writer creation or writer start fails
+ */
+ public void start(EncryptedOutputFile output) throws IOException {
+ this.outputFile = output;
+ writer =
+ new ParquetFileWriter(
+ ParquetIOBridge.file(output.encryptingOutputFile()),
+ parquetSchema,
+ ParquetFileWriter.Mode.CREATE,
+ rowGroupSize,
+ 0);
+ writer.start();
+ }
+
+ public void appendFile(InputFile inputFile) throws IOException {
+ if (writer == null) {
+ throw new IllegalStateException("ParquetFileWriter not initialized. Call
start() first.");
+ }
+
+ org.apache.parquet.io.InputFile file = ParquetIOBridge.file(inputFile);
+ writer.appendFile(file);
+ }
+
+ public DataFile result() throws IOException {
+ if (writer == null) {
+ throw new IllegalStateException("ParquetFileWriter not initialized. Call
start() first.");
+ }
+ if (success) {
+ throw new IllegalStateException("result() already called.");
+ }
+
+ writer.end(keyValueMetadata);
+ ParquetMetadata footer = writer.getFooter();
+ writer.close();
+ writer = null;
+
+ Metrics metrics = ParquetUtil.footerMetrics(footer, Stream.empty(),
metricsConfig);
+ List<Long> splitOffsets =
+
footer.getBlocks().stream().map(BlockMetaData::getStartingPos).collect(Collectors.toList());
+ long recordCount =
footer.getBlocks().stream().mapToLong(BlockMetaData::getRowCount).sum();
+ long fileSizeInBytes =
+
io.newInputFile(outputFile.encryptingOutputFile().location()).getLength();
+
+ DataFile result =
+ DataFiles.builder(spec)
+ .withEncryptedOutputFile(outputFile)
+ .withFormat(FileFormat.PARQUET)
+ .withFileSizeInBytes(fileSizeInBytes)
+ .withMetrics(metrics)
+ .withPartition(partition)
+ .withSplitOffsets(splitOffsets)
+ .withRecordCount(recordCount)
+ .build();
+ success = true;
+ return result;
+ }
+
+ /**
+ * Closes writer and performs failure cleanup.
+ *
+ * <p>If writer did not succeed, this method attempts to delete the output
file to avoid leaving
+ * partial artifacts. Cleanup deletion failures are logged but not thrown.
+ *
+ * @throws java.io.IOException when closing writer fails
+ */
+ @Override
+ public void close() throws IOException {
+ IOException closeFailure = null;
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ closeFailure = e;
+ } finally {
+ writer = null;
+ }
+ }
+
+ // Clean up output file if writer did not succeed.
+ if (!success && outputFile != null) {
+ String location = outputFile.encryptingOutputFile().location();
+ try {
+ io.deleteFile(location);
+ } catch (RuntimeException e) {
+ LOG.warn("Failed to delete output file during cleanup: {}", location,
e);
+ }
+ }
+
+ if (closeFailure != null) {
+ throw closeFailure;
+ }
+ }
+}
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
index 0905acf42..6c17f6ad0 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java
@@ -173,6 +173,18 @@ public class TableProperties {
public static final long
SELF_OPTIMIZING_REFRESH_TABLE_ADAPTIVE_INCREASE_STEP_MS_DEFAULT =
30000; // 30s
+ public static final String
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED =
+ "self-optimizing.rewrite.use-parquet-row-group-merge.enabled";
+ public static final boolean
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED_DEFAULT =
+ false;
+
+ public static final String
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES
=
+
"self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-size-bytes";
+ public static final long
+
SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_DEFAULT
=
+ 8388608; // 8 MB
+
/**
* The retention period for snapshots created by Flink checkpoints.
Snapshots older than this
* duration may be cleaned up. Avoid keeping the last flink checkpoint
snapshot for too long, as
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
new file mode 100644
index 000000000..e35c6cbcc
--- /dev/null
+++
b/amoro-format-iceberg/src/main/java/org/apache/iceberg/parquet/ParquetIOBridge.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+
+/**
+ * Bridge for reusing Iceberg's package-scoped {@link ParquetIO} conversions
from parquet row-group
+ * merge path. This class lives in the same package as {@link ParquetIO} on
purpose to convert
+ * Iceberg {@link org.apache.iceberg.io.InputFile}/{@link
org.apache.iceberg.io.OutputFile}
+ * instances into Parquet file abstractions without re-implementing Iceberg's
adapter logic.
+ */
+public final class ParquetIOBridge {
+ private ParquetIOBridge() {}
+
+ public static org.apache.parquet.io.InputFile file(InputFile file) {
+ return ParquetIO.file(file);
+ }
+
+ public static org.apache.parquet.io.OutputFile file(OutputFile file) {
+ return ParquetIO.file(file);
+ }
+}
diff --git
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
index 37b79036c..71b4b16d2 100644
---
a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
+++
b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java
@@ -18,6 +18,8 @@
package org.apache.amoro.optimizing;
+import static
org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED;
+
import org.apache.amoro.BasicTableTestHelper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.catalog.BasicCatalogTestHelper;
@@ -28,11 +30,14 @@ import
org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.avro.Avro;
@@ -48,6 +53,7 @@ import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.junit.Assert;
import org.junit.Assume;
@@ -57,6 +63,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -208,6 +215,368 @@ public class IcebergRewriteExecutorTest extends
TableTestBase {
readAllData();
}
+ @Test
+ public void testParquetRowGroupMergeEnableCondition() throws IOException {
+ // parquet row-group merge should only be enabled for Parquet files.
+ if (fileFormat != FileFormat.PARQUET) {
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ return;
+ }
+
+ // All parquet row-group merge preconditions are satisfied.
+ prepareParquetRowGroupMergeTableConditions(true);
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if the table property is not
enabled.
+ prepareParquetRowGroupMergeTableConditions(false);
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if there are delete files.
+ prepareParquetRowGroupMergeTableConditions(true);
+ RewriteFilesInput rewrittenDeleteInput =
+ new RewriteFilesInput(
+ dataScanTask.rewrittenDataFiles(),
+ dataScanTask.rePosDeletedDataFiles(),
+ new DeleteFile[] {},
+ new DeleteFile[] {(DeleteFile) scanTask.readOnlyDeleteFiles()[0]},
+ getMixedTable());
+
Assert.assertFalse(newExecutor(rewrittenDeleteInput).canParquetRowGroupMerge());
+
+ rewrittenDeleteInput =
+ new RewriteFilesInput(
+ dataScanTask.rewrittenDataFiles(),
+ dataScanTask.rePosDeletedDataFiles(),
+ new DeleteFile[] {(DeleteFile) scanTask.readOnlyDeleteFiles()[0]},
+ new DeleteFile[] {},
+ getMixedTable());
+
Assert.assertFalse(newExecutor(rewrittenDeleteInput).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if the bloom filter is
enabled.
+ resetParquetRowGroupMergeTestState();
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateProperties()
+ .set(
+
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX +
"id",
+ "true")
+ .commit();
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if the global bloom filter
switch is enabled.
+ resetParquetRowGroupMergeTestState();
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateProperties()
+ .set("write.parquet.bloom-filter-enabled.default", "true")
+ .commit();
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if the table spec evolves so
that the source files
+ // no longer match the partition spec.
+ resetParquetRowGroupMergeTestState();
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
getMixedTable().asUnkeyedTable().updateSpec().addField(Expressions.month("op_time")).commit();
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ // parquet row-group merge should be disabled if table has a sort order.
+ resetParquetRowGroupMergeTestState();
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ getMixedTable().asUnkeyedTable().replaceSortOrder().asc("id").commit();
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ }
+
+ @Test
+ public void testParquetRowGroupMergeWithEncryptedDataFile() throws
IOException {
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+ prepareParquetRowGroupMergeTableConditions(true);
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ DataFile encryptedDataFile =
+ DataFiles.builder(getMixedTable().spec())
+ .copy(dataScanTask.rewrittenDataFiles()[0])
+ .withEncryptionKeyMetadata(ByteBuffer.wrap(new byte[] {1}))
+ .build();
+
+ RewriteFilesInput encryptedInput =
+ new RewriteFilesInput(
+ new DataFile[] {encryptedDataFile},
+ new DataFile[] {},
+ new DeleteFile[] {},
+ new DeleteFile[] {},
+ getMixedTable());
+ // parquet row-group merge should be disabled if the data file is
encrypted.
+ Assert.assertFalse(newExecutor(encryptedInput).canParquetRowGroupMerge());
+ }
+
+ @Test
+ public void testParquetRowGroupMergeDisabledForV3Table() throws IOException {
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+ prepareParquetRowGroupMergeTableConditions(true);
+ Assert.assertTrue(newExecutor(dataScanTask).canParquetRowGroupMerge());
+
+ HasTableOperations tableWithOperations = (HasTableOperations)
getMixedTable().asUnkeyedTable();
+ TableMetadata current = tableWithOperations.operations().current();
+ tableWithOperations.operations().commit(current,
current.upgradeToFormatVersion(3));
+ getMixedTable().asUnkeyedTable().refresh();
+
+ Assert.assertFalse(newExecutor(dataScanTask).canParquetRowGroupMerge());
+ }
+
+ @Test
+ public void testParquetRowGroupMergeExecuteSplitAndCommitDataIntegrity()
throws IOException {
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+ prepareParquetRowGroupMergeTableConditions(true);
+
+ // Create 5 data files, and each data file has 3 records.
+ List<List<Record>> inputRecordGroups =
parquetRowGroupMergeSourceRecordGroups();
+ List<DataFile> sourceDataFiles =
Lists.newArrayListWithExpectedSize(inputRecordGroups.size());
+ long txId = 1L;
+ for (List<Record> records : inputRecordGroups) {
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(getMixedTable(),
txId++, records, false));
+ }
+
+ // Set the targetSize
+ long splitTargetSize =
+ sourceDataFiles.get(0).fileSizeInBytes()
+ + sourceDataFiles.get(1).fileSizeInBytes()
+ + sourceDataFiles.get(2).fileSizeInBytes();
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateProperties()
+ .set(
+ org.apache.amoro.table.TableProperties.SELF_OPTIMIZING_TARGET_SIZE,
+ String.valueOf(splitTargetSize))
+ .commit();
+
+ // Build a rewrite input with only data files and no delete files.
+ RewriteFilesInput rewriteFilesInput = newDataOnlyInput(sourceDataFiles);
+ IcebergRewriteExecutor executor = newExecutor(rewriteFilesInput);
+
+ // Verify parquet row-group merge eligibility and execute the merge.
+ Assert.assertTrue(executor.canParquetRowGroupMerge());
+
+ RewriteFilesOutput output = executor.execute();
+ // With this target, the first 3 source files are grouped into one output
and
+ // the last 2 into another.
+ Assert.assertEquals(2, output.getDataFiles().length);
+
+ // Commit merge result and validate the iceberg table data contains all
records from the
+ // source files.
+ getMixedTable()
+ .asUnkeyedTable()
+ .newRewrite()
+ .rewriteFiles(Sets.newHashSet(sourceDataFiles),
Sets.newHashSet(output.getDataFiles()))
+ .commit();
+
+ // Get the source records
+ Map<Integer, Record> expectedRecordsById = new HashMap<>();
+ for (List<Record> recordGroup : inputRecordGroups) {
+ for (Record record : recordGroup) {
+ expectedRecordsById.put((Integer) record.get(0), record);
+ }
+ }
+
+ // Get the current records after merge and commit, and compare with the
source records.
+ List<Record> currentRecords =
+ MixedDataTestHelpers.readBaseStore(getMixedTable(),
Expressions.alwaysTrue());
+ Assert.assertEquals(15, currentRecords.size());
+ for (Record currentRecord : currentRecords) {
+ Integer id = (Integer) currentRecord.get(0);
+ Record expectedRecord = expectedRecordsById.get(id);
+ // Compare all fields to ensure the record is correctly merged.
+ Assert.assertEquals(expectedRecord.get(0), currentRecord.get(0));
+ Assert.assertEquals(expectedRecord.get(1), currentRecord.get(1));
+ Assert.assertEquals(expectedRecord.get(2), currentRecord.get(2));
+ Assert.assertEquals(expectedRecord.get(3), currentRecord.get(3));
+ }
+ }
+
+ @Test
+ public void testParquetRowGroupMergeWithSchemaDifferent() throws IOException
{
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+ prepareParquetRowGroupMergeTableConditions(true);
+
+ // Scenario 1: schema differs by adding new field.
+ List<DataFile> sourceDataFiles = Lists.newArrayList();
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 301L,
+ Collections.singletonList(
+ MixedDataTestHelpers.createRecord(301, "field_old", 11L,
"1970-01-03T08:00:00")),
+ false));
+ // add new field
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateSchema()
+ .addColumn("new_col", Types.StringType.get())
+ .commit();
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 302L,
+ Collections.singletonList(
+ MixedDataTestHelpers.createRecord(
+ getMixedTable().schema(), 302, "field_new", 12L,
"1970-01-04T08:00:00", "x")),
+ false));
+
+ RewriteFilesInput rewriteInput = newDataOnlyInput(sourceDataFiles);
+ IcebergRewriteExecutor executor = newExecutor(rewriteInput);
+
+ Assert.assertFalse(executor.canParquetRowGroupMerge());
+
+ // Scenario 2: schema differs by type promotion.
+ resetParquetRowGroupMergeTestState();
+ sourceDataFiles = Lists.newArrayList();
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 401L,
+ Collections.singletonList(
+ MixedDataTestHelpers.createRecord(401, "type_old", 21L,
"1970-01-05T08:00:00")),
+ false));
+ // change the type
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateSchema()
+ .updateColumn("id", Types.LongType.get())
+ .commit();
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 402L,
+ Collections.singletonList(
+ MixedDataTestHelpers.createRecord(
+ getMixedTable().schema(), 402L, "type_new", 22L,
"1970-01-06T08:00:00")),
+ false));
+
+ rewriteInput = newDataOnlyInput(sourceDataFiles);
+ executor = newExecutor(rewriteInput);
+
+ Assert.assertFalse(executor.canParquetRowGroupMerge());
+ }
+
+ @Test
+ public void testParquetRowGroupMergeMinAvgRowGroupSizeThreshold() throws
IOException {
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+ prepareParquetRowGroupMergeTableConditions(true);
+
+ List<DataFile> sourceDataFiles = Lists.newArrayList();
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 1L,
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(1, "a", 0L,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(2, "b", 1L,
"1970-01-01T08:00:00")),
+ false));
+ sourceDataFiles.addAll(
+ MixedDataTestHelpers.writeAndCommitBaseStore(
+ getMixedTable(),
+ 2L,
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(3, "c", 2L,
"1970-01-02T08:00:00"),
+ MixedDataTestHelpers.createRecord(4, "d", 3L,
"1970-01-02T08:00:00")),
+ false));
+
+ // We compute the actual avg row-group size from file metadata and use it
to set thresholds.
+ // Each small test file has exactly 1 row group, so avgRowGroupSize =
totalFileSize / 2.
+ long totalFileSize =
sourceDataFiles.stream().mapToLong(DataFile::fileSizeInBytes).sum();
+ int totalRowGroups = sourceDataFiles.size();
+ long avgRowGroupSize = totalFileSize / totalRowGroups;
+
+ RewriteFilesInput rewriteInput = newDataOnlyInput(sourceDataFiles);
+ // Case 1: threshold = avgRowGroupSize + 1 < threshold → merge disabled.
+ setMinAvgRowGroupSizeThreshold(avgRowGroupSize + 1);
+ boolean result = newExecutor(rewriteInput).canParquetRowGroupMerge();
+ Assert.assertFalse(result);
+
+ // Case 2: threshold = avgRowGroupSize >= threshold → merge enabled
(boundary).
+ setMinAvgRowGroupSizeThreshold(avgRowGroupSize);
+ result = newExecutor(rewriteInput).canParquetRowGroupMerge();
+ Assert.assertTrue(result);
+ }
+
+ private void setMinAvgRowGroupSizeThreshold(long threshold) {
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateProperties()
+ .set(
+ org.apache.amoro.table.TableProperties
+
.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+ String.valueOf(threshold))
+ .commit();
+ getMixedTable().asUnkeyedTable().refresh();
+ }
+
+ private RewriteFilesInput newDataOnlyInput(List<DataFile> dataFiles) {
+ DataFile[] wrappedDataFiles = new DataFile[dataFiles.size()];
+ for (int i = 0; i < dataFiles.size(); i++) {
+ wrappedDataFiles[i] =
+ MixedDataTestHelpers.wrapIcebergDataFile(dataFiles.get(i), (long) i
+ 1);
+ }
+
+ return new RewriteFilesInput(
+ wrappedDataFiles,
+ new DataFile[] {},
+ new DeleteFile[] {},
+ new DeleteFile[] {},
+ getMixedTable());
+ }
+
+ private IcebergRewriteExecutor newExecutor(RewriteFilesInput input) {
+ return new IcebergRewriteExecutor(input, getMixedTable(),
Collections.emptyMap());
+ }
+
+ private void resetParquetRowGroupMergeTestState() throws IOException {
+ dropTable();
+ setupTable();
+ initDataAndReader();
+ prepareParquetRowGroupMergeTableConditions(true);
+ }
+
+ private List<List<Record>> parquetRowGroupMergeSourceRecordGroups() {
+ return Arrays.asList(
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(11, "john", 0,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(21, "lily", 1,
"1970-01-01T08:00:00"),
+ MixedDataTestHelpers.createRecord(31, "sam", 2,
"1970-01-01T08:00:00")),
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(41, "tom", 3,
"1970-01-02T08:00:00"),
+ MixedDataTestHelpers.createRecord(51, "lucy", 4,
"1970-01-02T08:00:00"),
+ MixedDataTestHelpers.createRecord(61, "kate", 5,
"1970-01-02T08:00:00")),
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(71, "ben", 6,
"1970-01-03T08:00:00"),
+ MixedDataTestHelpers.createRecord(81, "mia", 7,
"1970-01-03T08:00:00"),
+ MixedDataTestHelpers.createRecord(91, "zoe", 8,
"1970-01-03T08:00:00")),
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(101, "max", 9,
"1970-01-04T08:00:00"),
+ MixedDataTestHelpers.createRecord(111, "eve", 10,
"1970-01-04T08:00:00"),
+ MixedDataTestHelpers.createRecord(121, "amy", 11,
"1970-01-04T08:00:00")),
+ Arrays.asList(
+ MixedDataTestHelpers.createRecord(131, "leo", 12,
"1970-01-05T08:00:00"),
+ MixedDataTestHelpers.createRecord(141, "ivy", 13,
"1970-01-05T08:00:00"),
+ MixedDataTestHelpers.createRecord(151, "jay", 14,
"1970-01-05T08:00:00")));
+ }
+
+ // Configure only the table properties that are relevant to parquet
row-group merge eligibility
+ // checks. The min-avg-row-group-size is set to 0 so that small test files
are not filtered
+ // out by the threshold gate.
+ private void prepareParquetRowGroupMergeTableConditions(boolean enabled) {
+ getMixedTable()
+ .asUnkeyedTable()
+ .updateProperties()
+ .set(SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_ENABLED,
String.valueOf(enabled))
+ .set(
+ org.apache.amoro.table.TableProperties
+
.SELF_OPTIMIZING_REWRITE_USE_PARQUET_ROW_GROUP_MERGE_MIN_AVG_ROW_GROUP_SIZE_BYTES,
+ "0")
+ .commit();
+ }
+
@Test
public void readOnlyData() throws IOException {
IcebergRewriteExecutor executor =
diff --git a/docs/user-guides/configurations.md
b/docs/user-guides/configurations.md
index 843823f2f..beb092022 100644
--- a/docs/user-guides/configurations.md
+++ b/docs/user-guides/configurations.md
@@ -43,35 +43,36 @@ modified through [Alter
Table](../using-tables/#modify-table) operations.
Self-optimizing configurations are applicable to both Iceberg Format and Mixed
streaming Format.
-| Key | Default |
Description
|
-|---------------------------------------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| self-optimizing.enabled | true |
Enables Self-optimizing
|
-| self-optimizing.allow-partial-commit | false |
Whether to allow partial commit when self-optimizing fails or process is
cancelled
|
-| self-optimizing.group | default |
Optimizer group for Self-optimizing
|
-| self-optimizing.quota | 0.5 |
Quota for Self-optimizing, indicating the optimizer resources the table can
take up
|
-| self-optimizing.execute.num-retries | 5 |
Number of retries after failure of Self-optimizing
|
-| self-optimizing.target-size | 134217728(128MB) |
Target size for Self-optimizing
|
-| self-optimizing.max-file-count | 10000 |
Maximum number of files processed by a Self-optimizing process
|
-| self-optimizing.max-task-size-bytes | 134217728(128MB) |
Maximum file size bytes in a single task for splitting tasks
|
-| self-optimizing.fragment-ratio | 8 |
The fragment file size threshold. We could divide self-optimizing.target-size
by this ratio to get the actual fragment file size
|
-| self-optimizing.min-target-size-ratio | 0.75 |
The undersized segment file size threshold. Segment files under this threshold
will be considered for rewriting
|
-| self-optimizing.minor.trigger.file-count | 12 |
The minimum number of files to trigger minor optimizing is determined by the
sum of fragment file count and equality delete file count
|
-| self-optimizing.minor.trigger.interval | 3600000(1 hour) |
The time interval in milliseconds to trigger minor optimizing
|
-| self-optimizing.major.trigger.duplicate-ratio | 0.1 |
The ratio of duplicate data of segment files to trigger major optimizing
|
-| self-optimizing.full.trigger.interval | -1(closed) |
The time interval in milliseconds to trigger full optimizing
|
-| self-optimizing.full.rewrite-all-files | true |
Whether full optimizing rewrites all files or skips files that do not need to
be optimized
|
-| self-optimizing.min-plan-interval | 60000 |
The minimum time interval between two self-optimizing planning action
|
-| self-optimizing.filter | NULL |
Filter conditions for self-optimizing, using SQL conditional expressions,
without supporting any functions. For the timestamp column condition, the ISO
date-time formatter must be used. For example: op_time > '2007-12-03T10:15:30'.
|
-| self-optimizing.refresh-table.adaptive.max-interval-ms | 0 |
The maximum time interval in milliseconds to refresh table metadata. 0 means
disable adaptive refresh. When enabled, the value must be greater than
'refresh-tables.interval' and may exceed
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will
be automatically disabled. |
-| self-optimizing.refresh-table.adaptive.increase-step-ms | 30000(30s) |
The time interval increase step in milliseconds to refresh table metadata
|
-
-## Data-cleaning configurations
-
+| Key
| Default | Description
|
+|----------------------------------------------------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| self-optimizing.enabled
| true | Enables Self-optimizing
|
+| self-optimizing.allow-partial-commit
| false | Whether to allow partial commit when self-optimizing
fails or process is cancelled
|
+| self-optimizing.group
| default | Optimizer group for Self-optimizing
|
+| self-optimizing.quota
| 0.5 | Quota for Self-optimizing, indicating the optimizer
resources the table can take up
|
+| self-optimizing.execute.num-retries
| 5 | Number of retries after failure of Self-optimizing
|
+| self-optimizing.target-size
| 134217728(128MB) | Target size for Self-optimizing
|
+| self-optimizing.max-file-count
| 10000 | Maximum number of files processed by a Self-optimizing
process
|
+| self-optimizing.max-task-size-bytes
| 134217728(128MB) | Maximum file size bytes in a single task for splitting
tasks
|
+| self-optimizing.fragment-ratio
| 8 | The fragment file size threshold. We could divide
self-optimizing.target-size by this ratio to get the actual fragment file size
|
+| self-optimizing.min-target-size-ratio
| 0.75 | The undersized segment file size threshold. Segment
files under this threshold will be considered for rewriting
|
+| self-optimizing.minor.trigger.file-count
| 12 | The minimum number of files to trigger minor
optimizing is determined by the sum of fragment file count and equality delete
file count
|
+| self-optimizing.minor.trigger.interval
| 3600000(1 hour) | The time interval in milliseconds to trigger minor
optimizing
|
+| self-optimizing.major.trigger.duplicate-ratio
| 0.1 | The ratio of duplicate data of segment files to
trigger major optimizing
|
+| self-optimizing.full.trigger.interval
| -1(closed) | The time interval in milliseconds to trigger full
optimizing
|
+| self-optimizing.full.rewrite-all-files
| true | Whether full optimizing rewrites all files or skips
files that do not need to be optimized
|
+| self-optimizing.min-plan-interval
| 60000 | The minimum time interval between two self-optimizing
planning action
|
+| self-optimizing.filter
| NULL | Filter conditions for self-optimizing, using SQL
conditional expressions, without supporting any functions. For the timestamp
column condition, the ISO date-time formatter must be used. For example:
op_time > '2007-12-03T10:15:30'.
|
+| self-optimizing.refresh-table.adaptive.max-interval-ms
| 0 | The maximum time interval in milliseconds to refresh
table metadata. 0 means disable adaptive refresh. When enabled, the value must
be greater than 'refresh-tables.interval' and may exceed
'self-optimizing.minor.trigger.interval' * 4/5; if not, adaptive refresh will
be automatically disabled. |
+| self-optimizing.refresh-table.adaptive.increase-step-ms
| 30000(30s) | The time interval increase step in milliseconds to
refresh table metadata
|
+| self-optimizing.rewrite.use-parquet-row-group-merge.enabled
| false | Whether to enable parquet row-group merge for eligible Iceberg V2
table rewrites of Parquet files without delete files, bloom filters, sort
order, schema incompatibilities, or spec mismatches; otherwise falls back to
row-based rewrite. |
+|
self-optimizing.rewrite.use-parquet-row-group-merge.min-avg-row-group-size-bytes
| 8388608(8MB) | The minimum average row-group size (in bytes) of input files
for parquet row-group merge. If the average row-group size across all input
files is below this threshold, it will fall back to row-level rewrite.
|
+
+## Data-cleaning configuratio
Data-cleaning configurations are applicable to both Iceberg Format and Mixed
streaming Format.
| Key | Default | Description
|
|---------------------------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| table-expire.enabled | true | Enables
periodically expire table
|
+| table-expire.enabled | true | Enables
periodically expire table
|1
| change.data.ttl.minutes | 10080(7 days) | Time to
live in minutes for data of ChangeStore
|
| snapshot.keep.duration | 720min(12 hours) |
Table-Expiration keeps the latest snapshots within a specified duration
|
| snapshot.keep.min-count | 1 | Minimum
number of snapshots retained for table expiration
|