luoyuxia commented on code in PR #1552:
URL: https://github.com/apache/fluss/pull/1552#discussion_r2290334811


##########
.aiassistant/rules/Fluss Rules.md:
##########
@@ -0,0 +1,136 @@
+---
+apply: always

Review Comment:
   What is the file for? Can we remove this file?



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/RewriteDataFiles.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.expressions.Expression;
+
+/**
+ * Interface for rewriting data files in Iceberg tables. Simplified version 
adapted for Fluss's use
+ * case.
+ */
+public interface RewriteDataFiles {

Review Comment:
   Let's remove this interface, we don't need currently. Let's keep it simple 
now.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to scan files for bucket: " + 
bucketId, e);
+        }
+
+        return bucketFiles;
+    }
+
+    private boolean shouldCompact(List<DataFile> files) {
+        if (files.isEmpty()) {
+            return false;
+        }
+        if (files.size() > MIN_FILES_TO_COMPACT) {
+            return true;
+        }
+
+        // Also compact if any file is smaller than threshold
+        boolean hasSmallFiles =
+                files.stream().anyMatch(f -> f.fileSizeInBytes() < 
SMALL_FILE_THRESHOLD);
+
+        return hasSmallFiles;
+    }
+
+    private RewriteDataFilesActionResult compactFiles(List<DataFile> files, 
int bucketId) {
+        if (files.size() < 2) {
+            // No effective rewrite necessary
+            return null;
+        }
+
+        try {
+            // Use IcebergRewriteDataFiles to perform the actual compaction
+            IcebergRewriteDataFiles rewriter = new 
IcebergRewriteDataFiles(icebergTable);
+
+            RewriteDataFilesActionResult result =
+                    
rewriter.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .targetSizeInBytes(COMPACTION_TARGET_SIZE)

Review Comment:
   Then, it'll always `COMPACTION_TARGET_SIZE` which is 128 MB and user have no 
way to configure it. 
   As iceberg `BaseRewriteDataFilesAction`, please use 
   ```
    long splitSize =
           PropertyUtil.propertyAsLong(
               table.properties(), TableProperties.SPLIT_SIZE, 
TableProperties.SPLIT_SIZE_DEFAULT);
       long targetFileSize =
           PropertyUtil.propertyAsLong(
               table.properties(),
               TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
               TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
       this.targetSizeInBytes = Math.min(splitSize, targetFileSize);
   ```
   to get target size to align to iceberg strategy



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -45,19 +59,49 @@
 /** Implementation of {@link LakeWriter} for Iceberg. */
 public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
 
+    private static final String AUTO_MAINTENANCE_KEY = 
"table.datalake.auto-maintenance";
+    private static final int MIN_FILES_TO_COMPACT = 5;
+    private static final long SMALL_FILE_THRESHOLD = 50 * 1024 * 1024; // 50MB
+    private static final long COMPACTION_TARGET_SIZE = 128 * 1024 * 1024; // 
128MB
+
     private final Catalog icebergCatalog;
     private final Table icebergTable;
     private final RecordWriter recordWriter;
+    private final TableBucket tableBucket;
+    private final boolean autoMaintenanceEnabled;
+    @Nullable private final String bucketPartitionFieldName;
+
+    @Nullable private final ExecutorService compactionExecutor;
+    @Nullable private CompletableFuture<RewriteDataFilesActionResult> 
compactionFuture;
 
     public IcebergLakeWriter(
             IcebergCatalogProvider icebergCatalogProvider, WriterInitContext 
writerInitContext)
             throws IOException {
         this.icebergCatalog = icebergCatalogProvider.get();
         this.icebergTable = getTable(writerInitContext.tablePath());
+        this.tableBucket = writerInitContext.tableBucket();
+        this.bucketPartitionFieldName = 
resolveBucketPartitionFieldName(icebergTable);
+
+        // Check auto-maintenance from table properties
+        this.autoMaintenanceEnabled =
+                Boolean.parseBoolean(
+                        
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
 
-        // Create record writer based on table type
-        // For now, only supporting non-partitioned append-only tables
+        // Create a record writer
         this.recordWriter = createRecordWriter(writerInitContext);
+
+        if (autoMaintenanceEnabled) {
+            this.compactionExecutor =
+                    Executors.newSingleThreadExecutor(

Review Comment:
   nit: use 
   ```
   this.compactionExecutor =
                       Executors.newSingleThreadExecutor(
                               new ExecutorThreadFactory("iceberg-compact-" + 
tableBucket));
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {

Review Comment:
   not need to use 
   ```
   if (!autoMaintenanceEnabled || compactionExecutor == null)
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to scan files for bucket: " + 
bucketId, e);
+        }
+
+        return bucketFiles;
+    }
+
+    private boolean shouldCompact(List<DataFile> files) {
+        if (files.isEmpty()) {
+            return false;
+        }
+        if (files.size() > MIN_FILES_TO_COMPACT) {
+            return true;
+        }
+
+        // Also compact if any file is smaller than threshold
+        boolean hasSmallFiles =
+                files.stream().anyMatch(f -> f.fileSizeInBytes() < 
SMALL_FILE_THRESHOLD);

Review Comment:
   does iceberg also have the `SMALL_FILE_THRESHOLD`?  if not, why we need this 
SMALL_FILE_THRESHOLD logic?



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to scan files for bucket: " + 
bucketId, e);
+        }
+
+        return bucketFiles;
+    }
+
+    private boolean shouldCompact(List<DataFile> files) {
+        if (files.isEmpty()) {
+            return false;
+        }
+        if (files.size() > MIN_FILES_TO_COMPACT) {

Review Comment:
   why it shouldn't compact when `files.size()` <= MIN_FILES_TO_COMPACT, IIUC, 
iceberg compact if  `files.size()` > 1, 
   see `BaseRewriteDataFilesAction` in iceberg.



##########
fluss-lake/fluss-lake-iceberg/pom.xml:
##########
@@ -140,6 +140,18 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>

Review Comment:
   What will happen if not include `parquet-column` and `parquet-hadoop`.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to scan files for bucket: " + 
bucketId, e);
+        }
+
+        return bucketFiles;
+    }
+
+    private boolean shouldCompact(List<DataFile> files) {
+        if (files.isEmpty()) {
+            return false;
+        }
+        if (files.size() > MIN_FILES_TO_COMPACT) {
+            return true;
+        }
+
+        // Also compact if any file is smaller than threshold
+        boolean hasSmallFiles =
+                files.stream().anyMatch(f -> f.fileSizeInBytes() < 
SMALL_FILE_THRESHOLD);
+
+        return hasSmallFiles;
+    }
+
+    private RewriteDataFilesActionResult compactFiles(List<DataFile> files, 
int bucketId) {
+        if (files.size() < 2) {
+            // No effective rewrite necessary
+            return null;
+        }
+
+        try {
+            // Use IcebergRewriteDataFiles to perform the actual compaction
+            IcebergRewriteDataFiles rewriter = new 
IcebergRewriteDataFiles(icebergTable);
+
+            RewriteDataFilesActionResult result =
+                    
rewriter.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .targetSizeInBytes(COMPACTION_TARGET_SIZE)
+                            .binPack()
+                            .execute();
+
+            return result;
+
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    @Nullable
+    private static String resolveBucketPartitionFieldName(Table table) {
+        try {
+            for (PartitionField f : table.spec().fields()) {
+                String name = f.name();
+                // Heuristic: Iceberg uses "bucket_<N>_<sourceCol>" by default

Review Comment:
   Only primary key tables use "bucket_<N>_<sourceCol>",  log tables use 
identify `__bucket` column as partition. Since this pr only consider 
non-partitioned table, you can just pick the  first `PartitionField` name as 
partition name.
   
   



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -98,6 +142,16 @@ public void write(LogRecord record) throws IOException {
     public IcebergWriteResult complete() throws IOException {
         try {
             WriteResult writeResult = recordWriter.complete();
+
+            if (compactionFuture != null && !compactionFuture.isDone()) {

Review Comment:
   nit: change to this, 30s is to short which will cause exception thrown 
frequently. 
   ```
    RewriteDataFilesActionResult rewriteDataFilesActionResult;
                   try {
                       rewriteDataFilesActionResult = compactionFuture.get();
                   } catch (CancellationException e) {
                       rewriteDataFilesActionResult = null;
                   }
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg 
integration. Handles
+ * bin-packing compaction of small files into larger ones.
+ */
+public class IcebergRewriteDataFiles implements RewriteDataFiles {
+
+    private final Table table;
+    private Expression filter = Expressions.alwaysTrue();
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table) {
+        this.table = table;
+    }
+
+    @Override
+    public RewriteDataFiles filter(Expression expression) {
+        this.filter = expression;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles binPack() {
+        return this;
+    }
+
+    @Override
+    public RewriteDataFilesActionResult execute() {
+        try {
+            // Scan files matching the filter
+            List<DataFile> filesToRewrite = scanFilesToRewrite();
+
+            if (filesToRewrite.size() < 2) {
+                return new RewriteDataFilesActionResult(new ArrayList<>(), new 
ArrayList<>());
+            }
+
+            // Group files using bin-packing strategy
+            List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite);
+
+            List<DataFile> allNewFiles = new ArrayList<>();
+            List<DataFile> allOldFiles = new ArrayList<>();
+
+            // Rewrite each group
+            for (List<DataFile> group : fileGroups) {
+                if (group.size() < 2) {
+                    continue;
+                }
+
+                DataFile newFile = rewriteFileGroup(group);
+                if (newFile != null) {
+                    allNewFiles.add(newFile);
+                    allOldFiles.addAll(group);
+                }
+            }
+
+            if (!allNewFiles.isEmpty()) {
+                commitChanges(allOldFiles, allNewFiles);
+            }
+
+            return new RewriteDataFilesActionResult(allNewFiles, allOldFiles);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Compaction failed", e);
+        }
+    }
+
+    private List<DataFile> scanFilesToRewrite() throws IOException {
+        List<DataFile> files = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().filter(filter).planFiles()) {
+
+            for (FileScanTask task : tasks) {
+                files.add(task.file());
+            }
+        } catch (IOException e) {
+            throw new IOException("Failed to scan files", e);
+        }
+        return files;
+    }
+
+    private List<List<DataFile>> binPackFiles(List<DataFile> files) {
+        List<List<DataFile>> groups = new ArrayList<>();
+        List<DataFile> currentGroup = new ArrayList<>();
+        long currentSize = 0;
+
+        // Sort files by size for better bin-packing (smallest first)
+        files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), 
f2.fileSizeInBytes()));
+
+        for (DataFile file : files) {
+            if (currentSize + file.fileSizeInBytes() > targetSizeInBytes
+                    && !currentGroup.isEmpty()) {
+                // Start a new group
+                groups.add(new ArrayList<>(currentGroup));
+                currentGroup.clear();
+                currentSize = 0;
+            }
+            currentGroup.add(file);
+            currentSize += file.fileSizeInBytes();
+        }
+
+        // Add the last group
+        if (!currentGroup.isEmpty()) {
+            groups.add(currentGroup);
+        }
+        return groups;
+    }
+
+    private DataFile rewriteFileGroup(List<DataFile> group) {
+        try {
+            // Generate output file path
+            String fileName = String.format("%s.parquet", UUID.randomUUID());

Review Comment:
   Not to hard code to parquet format. The table may be a orc table.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg 
integration. Handles
+ * bin-packing compaction of small files into larger ones.
+ */
+public class IcebergRewriteDataFiles implements RewriteDataFiles {
+
+    private final Table table;
+    private Expression filter = Expressions.alwaysTrue();
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table) {
+        this.table = table;
+    }
+
+    @Override
+    public RewriteDataFiles filter(Expression expression) {
+        this.filter = expression;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles binPack() {
+        return this;
+    }
+
+    @Override
+    public RewriteDataFilesActionResult execute() {
+        try {
+            // Scan files matching the filter
+            List<DataFile> filesToRewrite = scanFilesToRewrite();
+
+            if (filesToRewrite.size() < 2) {
+                return new RewriteDataFilesActionResult(new ArrayList<>(), new 
ArrayList<>());
+            }
+
+            // Group files using bin-packing strategy
+            List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite);
+
+            List<DataFile> allNewFiles = new ArrayList<>();
+            List<DataFile> allOldFiles = new ArrayList<>();
+
+            // Rewrite each group
+            for (List<DataFile> group : fileGroups) {
+                if (group.size() < 2) {
+                    continue;
+                }
+
+                DataFile newFile = rewriteFileGroup(group);
+                if (newFile != null) {
+                    allNewFiles.add(newFile);
+                    allOldFiles.addAll(group);
+                }
+            }
+
+            if (!allNewFiles.isEmpty()) {
+                commitChanges(allOldFiles, allNewFiles);
+            }
+
+            return new RewriteDataFilesActionResult(allNewFiles, allOldFiles);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Compaction failed", e);
+        }
+    }
+
+    private List<DataFile> scanFilesToRewrite() throws IOException {
+        List<DataFile> files = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().filter(filter).planFiles()) {
+
+            for (FileScanTask task : tasks) {
+                files.add(task.file());
+            }
+        } catch (IOException e) {
+            throw new IOException("Failed to scan files", e);
+        }
+        return files;
+    }
+
+    private List<List<DataFile>> binPackFiles(List<DataFile> files) {
+        List<List<DataFile>> groups = new ArrayList<>();
+        List<DataFile> currentGroup = new ArrayList<>();
+        long currentSize = 0;
+
+        // Sort files by size for better bin-packing (smallest first)
+        files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), 
f2.fileSizeInBytes()));
+
+        for (DataFile file : files) {
+            if (currentSize + file.fileSizeInBytes() > targetSizeInBytes
+                    && !currentGroup.isEmpty()) {
+                // Start a new group
+                groups.add(new ArrayList<>(currentGroup));
+                currentGroup.clear();
+                currentSize = 0;
+            }
+            currentGroup.add(file);
+            currentSize += file.fileSizeInBytes();
+        }
+
+        // Add the last group
+        if (!currentGroup.isEmpty()) {
+            groups.add(currentGroup);
+        }
+        return groups;
+    }
+
+    private DataFile rewriteFileGroup(List<DataFile> group) {
+        try {
+            // Generate output file path
+            String fileName = String.format("%s.parquet", UUID.randomUUID());
+            OutputFile outputFile =
+                    
table.io().newOutputFile(table.locationProvider().newDataLocation(fileName));
+
+            // Create Parquet writer
+            FileAppenderFactory<Record> appenderFactory =
+                    new GenericAppenderFactory(table.schema());
+
+            Parquet.WriteBuilder writeBuilder =
+                    Parquet.write(outputFile)
+                            .schema(table.schema())
+                            
.createWriterFunc(GenericParquetWriter::buildWriter)
+                            .overwrite();
+
+            // Use Iceberg FileAppender to write, then build a DataFile 
manually
+            Metrics metrics;
+            long recordCount = 0L;
+            org.apache.iceberg.io.FileAppender<Record> writer = null;
+            try {
+                writer = writeBuilder.build();
+                for (DataFile file : group) {
+                    try (CloseableIterable<Record> records = 
readDataFile(file)) {
+                        for (Record record : records) {
+                            writer.add(record);
+                            recordCount++;
+                        }
+                    }
+                }
+            } finally {
+                if (writer != null) {
+                    writer.close();
+                }
+            }
+            metrics = writer.metrics();
+
+            // Assumes all files in group share the same partition
+            PartitionSpec spec = table.spec();
+            DataFile sample = group.get(0);
+
+            String location = outputFile.location();
+            InputFile inputFile = table.io().newInputFile(location);
+            long fileSizeInBytes = inputFile.getLength();
+
+            DataFile newFile =
+                    DataFiles.builder(spec)
+                            .withPath(location)
+                            .withFileSizeInBytes(fileSizeInBytes)
+                            .withFormat(FileFormat.PARQUET)
+                            .withRecordCount(recordCount)
+                            .withMetrics(metrics)
+                            .withPartition(sample.partition()) // no-op for 
unpartitioned specs
+                            .build();
+
+            return newFile;
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rewrite file group", e);
+        }
+    }
+
+    private CloseableIterable<Record> readDataFile(DataFile file) throws 
IOException {
+        return Parquet.read(table.io().newInputFile(file.path().toString()))
+                .project(table.schema())
+                .createReaderFunc(
+                        fileSchema -> 
GenericParquetReaders.buildReader(table.schema(), fileSchema))
+                .build();
+    }
+
+    private void commitChanges(List<DataFile> oldFiles, List<DataFile> 
newFiles) {

Review Comment:
   not commit in here, we should commit in `CommitterOpertor`. By design, only 
one task which is in `CommitterOpertor` can commit these change to iceberg.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -98,6 +142,16 @@ public void write(LogRecord record) throws IOException {
     public IcebergWriteResult complete() throws IOException {
         try {
             WriteResult writeResult = recordWriter.complete();
+
+            if (compactionFuture != null && !compactionFuture.isDone()) {
+                try {
+                    compactionFuture.get(30, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    throw new RuntimeException(
+                            "Failed to wait for compaction result for bucket: 
" + tableBucket, e);
+                }
+            }
+
             return new IcebergWriteResult(writeResult);

Review Comment:
   You'll need to include `rewriteDataFilesActionResult` in 
`IcebergWriteResult` to make committer to commit the rewritten files to 
iceberg. Otherwise, the rewritten files is useless..



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg 
integration. Handles
+ * bin-packing compaction of small files into larger ones.
+ */
+public class IcebergRewriteDataFiles implements RewriteDataFiles {
+
+    private final Table table;
+    private Expression filter = Expressions.alwaysTrue();
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table) {
+        this.table = table;
+    }
+
+    @Override
+    public RewriteDataFiles filter(Expression expression) {
+        this.filter = expression;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles binPack() {
+        return this;
+    }
+
+    @Override
+    public RewriteDataFilesActionResult execute() {
+        try {
+            // Scan files matching the filter
+            List<DataFile> filesToRewrite = scanFilesToRewrite();
+
+            if (filesToRewrite.size() < 2) {
+                return new RewriteDataFilesActionResult(new ArrayList<>(), new 
ArrayList<>());

Review Comment:
   nit:
   ```suggestion
                   return new RewriteDataFilesActionResult(
                           Collections.emptyList(), Collections.emptyList());
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());

Review Comment:
   But it's fine to only consider append-only table in this pr.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg 
integration. Handles
+ * bin-packing compaction of small files into larger ones.
+ */
+public class IcebergRewriteDataFiles implements RewriteDataFiles {
+
+    private final Table table;
+    private Expression filter = Expressions.alwaysTrue();
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table) {
+        this.table = table;
+    }
+
+    @Override
+    public RewriteDataFiles filter(Expression expression) {
+        this.filter = expression;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles binPack() {
+        return this;
+    }
+
+    @Override
+    public RewriteDataFilesActionResult execute() {
+        try {
+            // Scan files matching the filter
+            List<DataFile> filesToRewrite = scanFilesToRewrite();
+
+            if (filesToRewrite.size() < 2) {
+                return new RewriteDataFilesActionResult(new ArrayList<>(), new 
ArrayList<>());
+            }
+
+            // Group files using bin-packing strategy
+            List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite);
+
+            List<DataFile> allNewFiles = new ArrayList<>();
+            List<DataFile> allOldFiles = new ArrayList<>();
+
+            // Rewrite each group
+            for (List<DataFile> group : fileGroups) {
+                if (group.size() < 2) {
+                    continue;
+                }
+
+                DataFile newFile = rewriteFileGroup(group);
+                if (newFile != null) {
+                    allNewFiles.add(newFile);
+                    allOldFiles.addAll(group);
+                }
+            }
+
+            if (!allNewFiles.isEmpty()) {
+                commitChanges(allOldFiles, allNewFiles);
+            }
+
+            return new RewriteDataFilesActionResult(allNewFiles, allOldFiles);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Compaction failed", e);
+        }
+    }
+
+    private List<DataFile> scanFilesToRewrite() throws IOException {
+        List<DataFile> files = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().filter(filter).planFiles()) {
+
+            for (FileScanTask task : tasks) {
+                files.add(task.file());
+            }
+        } catch (IOException e) {
+            throw new IOException("Failed to scan files", e);
+        }
+        return files;
+    }
+
+    private List<List<DataFile>> binPackFiles(List<DataFile> files) {
+        List<List<DataFile>> groups = new ArrayList<>();
+        List<DataFile> currentGroup = new ArrayList<>();
+        long currentSize = 0;
+
+        // Sort files by size for better bin-packing (smallest first)
+        files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), 
f2.fileSizeInBytes()));
+
+        for (DataFile file : files) {
+            if (currentSize + file.fileSizeInBytes() > targetSizeInBytes
+                    && !currentGroup.isEmpty()) {
+                // Start a new group
+                groups.add(new ArrayList<>(currentGroup));
+                currentGroup.clear();
+                currentSize = 0;
+            }
+            currentGroup.add(file);
+            currentSize += file.fileSizeInBytes();
+        }
+
+        // Add the last group
+        if (!currentGroup.isEmpty()) {
+            groups.add(currentGroup);
+        }
+        return groups;
+    }
+
+    private DataFile rewriteFileGroup(List<DataFile> group) {
+        try {
+            // Generate output file path
+            String fileName = String.format("%s.parquet", UUID.randomUUID());
+            OutputFile outputFile =
+                    
table.io().newOutputFile(table.locationProvider().newDataLocation(fileName));
+
+            // Create Parquet writer
+            FileAppenderFactory<Record> appenderFactory =

Review Comment:
   For writing record, you can consider use `GenericRecordAppendOnlyWriter` 
which already consider different formats.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);

Review Comment:
   `scanBucketFiles` should also be done in the `compactionExecutor`



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());

Review Comment:
   Now, it only works in append-only table since it didn't consider delete file.



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.actions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewriteDataFilesActionResult;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg 
integration. Handles
+ * bin-packing compaction of small files into larger ones.
+ */
+public class IcebergRewriteDataFiles implements RewriteDataFiles {
+
+    private final Table table;
+    private Expression filter = Expressions.alwaysTrue();
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table) {
+        this.table = table;
+    }
+
+    @Override
+    public RewriteDataFiles filter(Expression expression) {
+        this.filter = expression;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    @Override
+    public RewriteDataFiles binPack() {
+        return this;
+    }
+
+    @Override
+    public RewriteDataFilesActionResult execute() {
+        try {
+            // Scan files matching the filter
+            List<DataFile> filesToRewrite = scanFilesToRewrite();
+
+            if (filesToRewrite.size() < 2) {
+                return new RewriteDataFilesActionResult(new ArrayList<>(), new 
ArrayList<>());
+            }
+
+            // Group files using bin-packing strategy
+            List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite);
+
+            List<DataFile> allNewFiles = new ArrayList<>();
+            List<DataFile> allOldFiles = new ArrayList<>();
+
+            // Rewrite each group
+            for (List<DataFile> group : fileGroups) {
+                if (group.size() < 2) {
+                    continue;
+                }
+
+                DataFile newFile = rewriteFileGroup(group);
+                if (newFile != null) {
+                    allNewFiles.add(newFile);
+                    allOldFiles.addAll(group);
+                }
+            }
+
+            if (!allNewFiles.isEmpty()) {
+                commitChanges(allOldFiles, allNewFiles);
+            }
+
+            return new RewriteDataFilesActionResult(allNewFiles, allOldFiles);
+
+        } catch (Exception e) {
+            throw new RuntimeException("Compaction failed", e);
+        }
+    }
+
+    private List<DataFile> scanFilesToRewrite() throws IOException {
+        List<DataFile> files = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().filter(filter).planFiles()) {
+
+            for (FileScanTask task : tasks) {
+                files.add(task.file());
+            }
+        } catch (IOException e) {
+            throw new IOException("Failed to scan files", e);
+        }
+        return files;
+    }
+
+    private List<List<DataFile>> binPackFiles(List<DataFile> files) {
+        List<List<DataFile>> groups = new ArrayList<>();
+        List<DataFile> currentGroup = new ArrayList<>();
+        long currentSize = 0;
+
+        // Sort files by size for better bin-packing (smallest first)
+        files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), 
f2.fileSizeInBytes()));
+
+        for (DataFile file : files) {
+            if (currentSize + file.fileSizeInBytes() > targetSizeInBytes
+                    && !currentGroup.isEmpty()) {
+                // Start a new group
+                groups.add(new ArrayList<>(currentGroup));
+                currentGroup.clear();
+                currentSize = 0;
+            }
+            currentGroup.add(file);
+            currentSize += file.fileSizeInBytes();
+        }
+
+        // Add the last group
+        if (!currentGroup.isEmpty()) {
+            groups.add(currentGroup);
+        }
+        return groups;
+    }
+
+    private DataFile rewriteFileGroup(List<DataFile> group) {
+        try {
+            // Generate output file path
+            String fileName = String.format("%s.parquet", UUID.randomUUID());
+            OutputFile outputFile =
+                    
table.io().newOutputFile(table.locationProvider().newDataLocation(fileName));
+
+            // Create Parquet writer
+            FileAppenderFactory<Record> appenderFactory =
+                    new GenericAppenderFactory(table.schema());
+
+            Parquet.WriteBuilder writeBuilder =
+                    Parquet.write(outputFile)
+                            .schema(table.schema())
+                            
.createWriterFunc(GenericParquetWriter::buildWriter)
+                            .overwrite();
+
+            // Use Iceberg FileAppender to write, then build a DataFile 
manually
+            Metrics metrics;
+            long recordCount = 0L;
+            org.apache.iceberg.io.FileAppender<Record> writer = null;
+            try {
+                writer = writeBuilder.build();
+                for (DataFile file : group) {
+                    try (CloseableIterable<Record> records = 
readDataFile(file)) {
+                        for (Record record : records) {
+                            writer.add(record);
+                            recordCount++;
+                        }
+                    }
+                }
+            } finally {
+                if (writer != null) {
+                    writer.close();
+                }
+            }
+            metrics = writer.metrics();
+
+            // Assumes all files in group share the same partition
+            PartitionSpec spec = table.spec();
+            DataFile sample = group.get(0);
+
+            String location = outputFile.location();
+            InputFile inputFile = table.io().newInputFile(location);
+            long fileSizeInBytes = inputFile.getLength();
+
+            DataFile newFile =
+                    DataFiles.builder(spec)
+                            .withPath(location)
+                            .withFileSizeInBytes(fileSizeInBytes)
+                            .withFormat(FileFormat.PARQUET)
+                            .withRecordCount(recordCount)
+                            .withMetrics(metrics)
+                            .withPartition(sample.partition()) // no-op for 
unpartitioned specs
+                            .build();
+
+            return newFile;
+
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rewrite file group", e);
+        }
+    }
+
+    private CloseableIterable<Record> readDataFile(DataFile file) throws 
IOException {
+        return Parquet.read(table.io().newInputFile(file.path().toString()))

Review Comment:
   It can only handle parquet files, but some table may with orc formats. You 
can use `org.apache.iceberg.data.GenericReader` to read for records. But 
`GenericReader` is package private, we can't access this class in outer 
package. So, we need create a same package in fluss project, and create a 
`IcebergGenericReader` to extend `GenericReader`. 
   ```
   
   package org.apache.iceberg.data;
   
   import org.apache.iceberg.TableScan;
   
   public class IcebergGenericReader extends GenericReader {
   
       IcebergGenericReader(TableScan scan, boolean reuseContainers) {
           super(scan, reuseContainers);
       }
   }
   ```
   Then you can call method `IcebergGenericReader#open(FileScanTask task) ` to 
read the records. 
   



##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -107,6 +161,15 @@ public IcebergWriteResult complete() throws IOException {
     @Override
     public void close() throws IOException {
         try {
+            if (compactionFuture != null && !compactionFuture.isDone()) {
+                compactionFuture.cancel(true);
+            }
+
+            if (compactionExecutor != null) {
+                compactionExecutor.shutdownNow();
+                compactionExecutor.awaitTermination(5, TimeUnit.SECONDS);

Review Comment:
   nit:
   ```
   if (!compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                       LOG.warn("Fail to close compactionExecutor.");
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to