MehulBatra commented on code in PR #1552:
URL: https://github.com/apache/fluss/pull/1552#discussion_r2291866008


##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws 
IOException {
         }
     }
 
+    private void scheduleCompactionIfNeeded(int bucketId) {
+        if (!autoMaintenanceEnabled || compactionExecutor == null) {
+            return;
+        }
+
+        if (bucketPartitionFieldName == null) {
+            return;
+        }
+
+        try {
+            // Scan files for this bucket
+            List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+            // Check if compaction needed
+            if (shouldCompact(bucketFiles)) {
+
+                compactionFuture =
+                        CompletableFuture.supplyAsync(
+                                () -> compactFiles(bucketFiles, bucketId), 
compactionExecutor);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to schedule compaction for 
bucket: " + bucketId, e);
+        }
+    }
+
+    private List<DataFile> scanBucketFiles(int bucketId) {
+        List<DataFile> bucketFiles = new ArrayList<>();
+
+        try {
+            if (bucketPartitionFieldName == null) {
+                return bucketFiles;
+            }
+            // Scan files filtered by Iceberg bucket partition field (e.g., 
bucket_3_order_id)
+            CloseableIterable<FileScanTask> tasks =
+                    icebergTable
+                            .newScan()
+                            
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+                            .planFiles();
+
+            try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+                for (FileScanTask task : scanTasks) {
+                    bucketFiles.add(task.file());
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to scan files for bucket: " + 
bucketId, e);
+        }
+
+        return bucketFiles;
+    }
+
+    private boolean shouldCompact(List<DataFile> files) {
+        if (files.isEmpty()) {
+            return false;
+        }
+        if (files.size() > MIN_FILES_TO_COMPACT) {

Review Comment:
   Its a choice to trade fewer, larger rewrites for lower operational overhead. 
Iceberg can rewrite whenever there are >1 files, but doing so too aggressively 
can cause unnecessary churn without meaningful benefit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to