MehulBatra commented on code in PR #1552:
URL: https://github.com/apache/fluss/pull/1552#discussion_r2291878341
##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws
IOException {
}
}
+ private void scheduleCompactionIfNeeded(int bucketId) {
+ if (!autoMaintenanceEnabled || compactionExecutor == null) {
+ return;
+ }
+
+ if (bucketPartitionFieldName == null) {
+ return;
+ }
+
+ try {
+ // Scan files for this bucket
+ List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+ // Check if compaction needed
+ if (shouldCompact(bucketFiles)) {
+
+ compactionFuture =
+ CompletableFuture.supplyAsync(
+ () -> compactFiles(bucketFiles, bucketId),
compactionExecutor);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to schedule compaction for
bucket: " + bucketId, e);
+ }
+ }
+
+ private List<DataFile> scanBucketFiles(int bucketId) {
+ List<DataFile> bucketFiles = new ArrayList<>();
+
+ try {
+ if (bucketPartitionFieldName == null) {
+ return bucketFiles;
+ }
+ // Scan files filtered by Iceberg bucket partition field (e.g.,
bucket_3_order_id)
+ CloseableIterable<FileScanTask> tasks =
+ icebergTable
+ .newScan()
+
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+ .planFiles();
+
+ try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+ for (FileScanTask task : scanTasks) {
+ bucketFiles.add(task.file());
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to scan files for bucket: " +
bucketId, e);
+ }
+
+ return bucketFiles;
+ }
+
+ private boolean shouldCompact(List<DataFile> files) {
+ if (files.isEmpty()) {
+ return false;
+ }
+ if (files.size() > MIN_FILES_TO_COMPACT) {
+ return true;
+ }
+
+ // Also compact if any file is smaller than threshold
+ boolean hasSmallFiles =
+ files.stream().anyMatch(f -> f.fileSizeInBytes() <
SMALL_FILE_THRESHOLD);
Review Comment:
Iceberg doesn’t expose a fixed “SMALL_FILE_THRESHOLD” knob. I kept local
SMALL_FILE_THRESHOLD as a pre-filter to decide when to trigger a rewrite for
your bucket, independent of Iceberg’s internal heuristics as we were keeping
the MinimumFilesToCompact buffer, but yes we can remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]