MehulBatra commented on code in PR #1552:
URL: https://github.com/apache/fluss/pull/1552#discussion_r2291887228
##########
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java:
##########
@@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws
IOException {
}
}
+ private void scheduleCompactionIfNeeded(int bucketId) {
+ if (!autoMaintenanceEnabled || compactionExecutor == null) {
+ return;
+ }
+
+ if (bucketPartitionFieldName == null) {
+ return;
+ }
+
+ try {
+ // Scan files for this bucket
+ List<DataFile> bucketFiles = scanBucketFiles(bucketId);
+
+ // Check if compaction needed
+ if (shouldCompact(bucketFiles)) {
+
+ compactionFuture =
+ CompletableFuture.supplyAsync(
+ () -> compactFiles(bucketFiles, bucketId),
compactionExecutor);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to schedule compaction for
bucket: " + bucketId, e);
+ }
+ }
+
+ private List<DataFile> scanBucketFiles(int bucketId) {
+ List<DataFile> bucketFiles = new ArrayList<>();
+
+ try {
+ if (bucketPartitionFieldName == null) {
+ return bucketFiles;
+ }
+ // Scan files filtered by Iceberg bucket partition field (e.g.,
bucket_3_order_id)
+ CloseableIterable<FileScanTask> tasks =
+ icebergTable
+ .newScan()
+
.filter(Expressions.equal(bucketPartitionFieldName, bucketId))
+ .planFiles();
+
+ try (CloseableIterable<FileScanTask> scanTasks = tasks) {
+ for (FileScanTask task : scanTasks) {
+ bucketFiles.add(task.file());
Review Comment:
Sure I can create a separate Issue to address for primaryKeyTable, to
address delete files aswell
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]