luoyuxia commented on code in PR #1552: URL: https://github.com/apache/fluss/pull/1552#discussion_r2290334811
########## .aiassistant/rules/Fluss Rules.md: ########## @@ -0,0 +1,136 @@ +--- +apply: always Review Comment: What is the file for? Can we remove this file? ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/RewriteDataFiles.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.expressions.Expression; + +/** + * Interface for rewriting data files in Iceberg tables. Simplified version adapted for Fluss's use + * case. + */ +public interface RewriteDataFiles { Review Comment: Let's remove this interface, we don't need currently. Let's keep it simple now. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to scan files for bucket: " + bucketId, e); + } + + return bucketFiles; + } + + private boolean shouldCompact(List<DataFile> files) { + if (files.isEmpty()) { + return false; + } + if (files.size() > MIN_FILES_TO_COMPACT) { + return true; + } + + // Also compact if any file is smaller than threshold + boolean hasSmallFiles = + files.stream().anyMatch(f -> f.fileSizeInBytes() < SMALL_FILE_THRESHOLD); + + return hasSmallFiles; + } + + private RewriteDataFilesActionResult compactFiles(List<DataFile> files, int bucketId) { + if (files.size() < 2) { + // No effective rewrite necessary + return null; + } + + try { + // Use IcebergRewriteDataFiles to perform the actual compaction + IcebergRewriteDataFiles rewriter = new IcebergRewriteDataFiles(icebergTable); + + RewriteDataFilesActionResult result = + rewriter.filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .targetSizeInBytes(COMPACTION_TARGET_SIZE) Review Comment: Then, it'll always `COMPACTION_TARGET_SIZE` which is 128 MB and user have no way to configure it. As iceberg `BaseRewriteDataFilesAction`, please use ``` long splitSize = PropertyUtil.propertyAsLong( table.properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); long targetFileSize = PropertyUtil.propertyAsLong( table.properties(), TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); this.targetSizeInBytes = Math.min(splitSize, targetFileSize); ``` to get target size to align to iceberg strategy ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -45,19 +59,49 @@ /** Implementation of {@link LakeWriter} for Iceberg. */ public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> { + private static final String AUTO_MAINTENANCE_KEY = "table.datalake.auto-maintenance"; + private static final int MIN_FILES_TO_COMPACT = 5; + private static final long SMALL_FILE_THRESHOLD = 50 * 1024 * 1024; // 50MB + private static final long COMPACTION_TARGET_SIZE = 128 * 1024 * 1024; // 128MB + private final Catalog icebergCatalog; private final Table icebergTable; private final RecordWriter recordWriter; + private final TableBucket tableBucket; + private final boolean autoMaintenanceEnabled; + @Nullable private final String bucketPartitionFieldName; + + @Nullable private final ExecutorService compactionExecutor; + @Nullable private CompletableFuture<RewriteDataFilesActionResult> compactionFuture; public IcebergLakeWriter( IcebergCatalogProvider icebergCatalogProvider, WriterInitContext writerInitContext) throws IOException { this.icebergCatalog = icebergCatalogProvider.get(); this.icebergTable = getTable(writerInitContext.tablePath()); + this.tableBucket = writerInitContext.tableBucket(); + this.bucketPartitionFieldName = resolveBucketPartitionFieldName(icebergTable); + + // Check auto-maintenance from table properties + this.autoMaintenanceEnabled = + Boolean.parseBoolean( + icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false")); - // Create record writer based on table type - // For now, only supporting non-partitioned append-only tables + // Create a record writer this.recordWriter = createRecordWriter(writerInitContext); + + if (autoMaintenanceEnabled) { + this.compactionExecutor = + Executors.newSingleThreadExecutor( Review Comment: nit: use ``` this.compactionExecutor = Executors.newSingleThreadExecutor( new ExecutorThreadFactory("iceberg-compact-" + tableBucket)); ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { Review Comment: not need to use ``` if (!autoMaintenanceEnabled || compactionExecutor == null) ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to scan files for bucket: " + bucketId, e); + } + + return bucketFiles; + } + + private boolean shouldCompact(List<DataFile> files) { + if (files.isEmpty()) { + return false; + } + if (files.size() > MIN_FILES_TO_COMPACT) { + return true; + } + + // Also compact if any file is smaller than threshold + boolean hasSmallFiles = + files.stream().anyMatch(f -> f.fileSizeInBytes() < SMALL_FILE_THRESHOLD); Review Comment: does iceberg also have the `SMALL_FILE_THRESHOLD`? if not, why we need this SMALL_FILE_THRESHOLD logic? ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to scan files for bucket: " + bucketId, e); + } + + return bucketFiles; + } + + private boolean shouldCompact(List<DataFile> files) { + if (files.isEmpty()) { + return false; + } + if (files.size() > MIN_FILES_TO_COMPACT) { Review Comment: why it shouldn't compact when `files.size()` <= MIN_FILES_TO_COMPACT, IIUC, iceberg compact if `files.size()` > 1, see `BaseRewriteDataFilesAction` in iceberg. ########## fluss-lake/fluss-lake-iceberg/pom.xml: ########## @@ -140,6 +140,18 @@ <scope>test</scope> </dependency> + <dependency> Review Comment: What will happen if not include `parquet-column` and `parquet-hadoop`. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to scan files for bucket: " + bucketId, e); + } + + return bucketFiles; + } + + private boolean shouldCompact(List<DataFile> files) { + if (files.isEmpty()) { + return false; + } + if (files.size() > MIN_FILES_TO_COMPACT) { + return true; + } + + // Also compact if any file is smaller than threshold + boolean hasSmallFiles = + files.stream().anyMatch(f -> f.fileSizeInBytes() < SMALL_FILE_THRESHOLD); + + return hasSmallFiles; + } + + private RewriteDataFilesActionResult compactFiles(List<DataFile> files, int bucketId) { + if (files.size() < 2) { + // No effective rewrite necessary + return null; + } + + try { + // Use IcebergRewriteDataFiles to perform the actual compaction + IcebergRewriteDataFiles rewriter = new IcebergRewriteDataFiles(icebergTable); + + RewriteDataFilesActionResult result = + rewriter.filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .targetSizeInBytes(COMPACTION_TARGET_SIZE) + .binPack() + .execute(); + + return result; + + } catch (Exception e) { + return null; + } + } + + @Nullable + private static String resolveBucketPartitionFieldName(Table table) { + try { + for (PartitionField f : table.spec().fields()) { + String name = f.name(); + // Heuristic: Iceberg uses "bucket_<N>_<sourceCol>" by default Review Comment: Only primary key tables use "bucket_<N>_<sourceCol>", log tables use identify `__bucket` column as partition. Since this pr only consider non-partitioned table, you can just pick the first `PartitionField` name as partition name. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -98,6 +142,16 @@ public void write(LogRecord record) throws IOException { public IcebergWriteResult complete() throws IOException { try { WriteResult writeResult = recordWriter.complete(); + + if (compactionFuture != null && !compactionFuture.isDone()) { Review Comment: nit: change to this, 30s is to short which will cause exception thrown frequently. ``` RewriteDataFilesActionResult rewriteDataFilesActionResult; try { rewriteDataFilesActionResult = compactionFuture.get(); } catch (CancellationException e) { rewriteDataFilesActionResult = null; } ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg integration. Handles + * bin-packing compaction of small files into larger ones. + */ +public class IcebergRewriteDataFiles implements RewriteDataFiles { + + private final Table table; + private Expression filter = Expressions.alwaysTrue(); + private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default + + public IcebergRewriteDataFiles(Table table) { + this.table = table; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + this.filter = expression; + return this; + } + + @Override + public RewriteDataFiles targetSizeInBytes(long targetSize) { + this.targetSizeInBytes = targetSize; + return this; + } + + @Override + public RewriteDataFiles binPack() { + return this; + } + + @Override + public RewriteDataFilesActionResult execute() { + try { + // Scan files matching the filter + List<DataFile> filesToRewrite = scanFilesToRewrite(); + + if (filesToRewrite.size() < 2) { + return new RewriteDataFilesActionResult(new ArrayList<>(), new ArrayList<>()); + } + + // Group files using bin-packing strategy + List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite); + + List<DataFile> allNewFiles = new ArrayList<>(); + List<DataFile> allOldFiles = new ArrayList<>(); + + // Rewrite each group + for (List<DataFile> group : fileGroups) { + if (group.size() < 2) { + continue; + } + + DataFile newFile = rewriteFileGroup(group); + if (newFile != null) { + allNewFiles.add(newFile); + allOldFiles.addAll(group); + } + } + + if (!allNewFiles.isEmpty()) { + commitChanges(allOldFiles, allNewFiles); + } + + return new RewriteDataFilesActionResult(allNewFiles, allOldFiles); + + } catch (Exception e) { + throw new RuntimeException("Compaction failed", e); + } + } + + private List<DataFile> scanFilesToRewrite() throws IOException { + List<DataFile> files = new ArrayList<>(); + + try (CloseableIterable<FileScanTask> tasks = table.newScan().filter(filter).planFiles()) { + + for (FileScanTask task : tasks) { + files.add(task.file()); + } + } catch (IOException e) { + throw new IOException("Failed to scan files", e); + } + return files; + } + + private List<List<DataFile>> binPackFiles(List<DataFile> files) { + List<List<DataFile>> groups = new ArrayList<>(); + List<DataFile> currentGroup = new ArrayList<>(); + long currentSize = 0; + + // Sort files by size for better bin-packing (smallest first) + files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), f2.fileSizeInBytes())); + + for (DataFile file : files) { + if (currentSize + file.fileSizeInBytes() > targetSizeInBytes + && !currentGroup.isEmpty()) { + // Start a new group + groups.add(new ArrayList<>(currentGroup)); + currentGroup.clear(); + currentSize = 0; + } + currentGroup.add(file); + currentSize += file.fileSizeInBytes(); + } + + // Add the last group + if (!currentGroup.isEmpty()) { + groups.add(currentGroup); + } + return groups; + } + + private DataFile rewriteFileGroup(List<DataFile> group) { + try { + // Generate output file path + String fileName = String.format("%s.parquet", UUID.randomUUID()); Review Comment: Not to hard code to parquet format. The table may be a orc table. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg integration. Handles + * bin-packing compaction of small files into larger ones. + */ +public class IcebergRewriteDataFiles implements RewriteDataFiles { + + private final Table table; + private Expression filter = Expressions.alwaysTrue(); + private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default + + public IcebergRewriteDataFiles(Table table) { + this.table = table; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + this.filter = expression; + return this; + } + + @Override + public RewriteDataFiles targetSizeInBytes(long targetSize) { + this.targetSizeInBytes = targetSize; + return this; + } + + @Override + public RewriteDataFiles binPack() { + return this; + } + + @Override + public RewriteDataFilesActionResult execute() { + try { + // Scan files matching the filter + List<DataFile> filesToRewrite = scanFilesToRewrite(); + + if (filesToRewrite.size() < 2) { + return new RewriteDataFilesActionResult(new ArrayList<>(), new ArrayList<>()); + } + + // Group files using bin-packing strategy + List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite); + + List<DataFile> allNewFiles = new ArrayList<>(); + List<DataFile> allOldFiles = new ArrayList<>(); + + // Rewrite each group + for (List<DataFile> group : fileGroups) { + if (group.size() < 2) { + continue; + } + + DataFile newFile = rewriteFileGroup(group); + if (newFile != null) { + allNewFiles.add(newFile); + allOldFiles.addAll(group); + } + } + + if (!allNewFiles.isEmpty()) { + commitChanges(allOldFiles, allNewFiles); + } + + return new RewriteDataFilesActionResult(allNewFiles, allOldFiles); + + } catch (Exception e) { + throw new RuntimeException("Compaction failed", e); + } + } + + private List<DataFile> scanFilesToRewrite() throws IOException { + List<DataFile> files = new ArrayList<>(); + + try (CloseableIterable<FileScanTask> tasks = table.newScan().filter(filter).planFiles()) { + + for (FileScanTask task : tasks) { + files.add(task.file()); + } + } catch (IOException e) { + throw new IOException("Failed to scan files", e); + } + return files; + } + + private List<List<DataFile>> binPackFiles(List<DataFile> files) { + List<List<DataFile>> groups = new ArrayList<>(); + List<DataFile> currentGroup = new ArrayList<>(); + long currentSize = 0; + + // Sort files by size for better bin-packing (smallest first) + files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), f2.fileSizeInBytes())); + + for (DataFile file : files) { + if (currentSize + file.fileSizeInBytes() > targetSizeInBytes + && !currentGroup.isEmpty()) { + // Start a new group + groups.add(new ArrayList<>(currentGroup)); + currentGroup.clear(); + currentSize = 0; + } + currentGroup.add(file); + currentSize += file.fileSizeInBytes(); + } + + // Add the last group + if (!currentGroup.isEmpty()) { + groups.add(currentGroup); + } + return groups; + } + + private DataFile rewriteFileGroup(List<DataFile> group) { + try { + // Generate output file path + String fileName = String.format("%s.parquet", UUID.randomUUID()); + OutputFile outputFile = + table.io().newOutputFile(table.locationProvider().newDataLocation(fileName)); + + // Create Parquet writer + FileAppenderFactory<Record> appenderFactory = + new GenericAppenderFactory(table.schema()); + + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite(); + + // Use Iceberg FileAppender to write, then build a DataFile manually + Metrics metrics; + long recordCount = 0L; + org.apache.iceberg.io.FileAppender<Record> writer = null; + try { + writer = writeBuilder.build(); + for (DataFile file : group) { + try (CloseableIterable<Record> records = readDataFile(file)) { + for (Record record : records) { + writer.add(record); + recordCount++; + } + } + } + } finally { + if (writer != null) { + writer.close(); + } + } + metrics = writer.metrics(); + + // Assumes all files in group share the same partition + PartitionSpec spec = table.spec(); + DataFile sample = group.get(0); + + String location = outputFile.location(); + InputFile inputFile = table.io().newInputFile(location); + long fileSizeInBytes = inputFile.getLength(); + + DataFile newFile = + DataFiles.builder(spec) + .withPath(location) + .withFileSizeInBytes(fileSizeInBytes) + .withFormat(FileFormat.PARQUET) + .withRecordCount(recordCount) + .withMetrics(metrics) + .withPartition(sample.partition()) // no-op for unpartitioned specs + .build(); + + return newFile; + + } catch (Exception e) { + throw new RuntimeException("Failed to rewrite file group", e); + } + } + + private CloseableIterable<Record> readDataFile(DataFile file) throws IOException { + return Parquet.read(table.io().newInputFile(file.path().toString())) + .project(table.schema()) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(table.schema(), fileSchema)) + .build(); + } + + private void commitChanges(List<DataFile> oldFiles, List<DataFile> newFiles) { Review Comment: not commit in here, we should commit in `CommitterOpertor`. By design, only one task which is in `CommitterOpertor` can commit these change to iceberg. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -98,6 +142,16 @@ public void write(LogRecord record) throws IOException { public IcebergWriteResult complete() throws IOException { try { WriteResult writeResult = recordWriter.complete(); + + if (compactionFuture != null && !compactionFuture.isDone()) { + try { + compactionFuture.get(30, TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException( + "Failed to wait for compaction result for bucket: " + tableBucket, e); + } + } + return new IcebergWriteResult(writeResult); Review Comment: You'll need to include `rewriteDataFilesActionResult` in `IcebergWriteResult` to make committer to commit the rewritten files to iceberg. Otherwise, the rewritten files is useless.. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg integration. Handles + * bin-packing compaction of small files into larger ones. + */ +public class IcebergRewriteDataFiles implements RewriteDataFiles { + + private final Table table; + private Expression filter = Expressions.alwaysTrue(); + private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default + + public IcebergRewriteDataFiles(Table table) { + this.table = table; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + this.filter = expression; + return this; + } + + @Override + public RewriteDataFiles targetSizeInBytes(long targetSize) { + this.targetSizeInBytes = targetSize; + return this; + } + + @Override + public RewriteDataFiles binPack() { + return this; + } + + @Override + public RewriteDataFilesActionResult execute() { + try { + // Scan files matching the filter + List<DataFile> filesToRewrite = scanFilesToRewrite(); + + if (filesToRewrite.size() < 2) { + return new RewriteDataFilesActionResult(new ArrayList<>(), new ArrayList<>()); Review Comment: nit: ```suggestion return new RewriteDataFilesActionResult( Collections.emptyList(), Collections.emptyList()); ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); Review Comment: But it's fine to only consider append-only table in this pr. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg integration. Handles + * bin-packing compaction of small files into larger ones. + */ +public class IcebergRewriteDataFiles implements RewriteDataFiles { + + private final Table table; + private Expression filter = Expressions.alwaysTrue(); + private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default + + public IcebergRewriteDataFiles(Table table) { + this.table = table; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + this.filter = expression; + return this; + } + + @Override + public RewriteDataFiles targetSizeInBytes(long targetSize) { + this.targetSizeInBytes = targetSize; + return this; + } + + @Override + public RewriteDataFiles binPack() { + return this; + } + + @Override + public RewriteDataFilesActionResult execute() { + try { + // Scan files matching the filter + List<DataFile> filesToRewrite = scanFilesToRewrite(); + + if (filesToRewrite.size() < 2) { + return new RewriteDataFilesActionResult(new ArrayList<>(), new ArrayList<>()); + } + + // Group files using bin-packing strategy + List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite); + + List<DataFile> allNewFiles = new ArrayList<>(); + List<DataFile> allOldFiles = new ArrayList<>(); + + // Rewrite each group + for (List<DataFile> group : fileGroups) { + if (group.size() < 2) { + continue; + } + + DataFile newFile = rewriteFileGroup(group); + if (newFile != null) { + allNewFiles.add(newFile); + allOldFiles.addAll(group); + } + } + + if (!allNewFiles.isEmpty()) { + commitChanges(allOldFiles, allNewFiles); + } + + return new RewriteDataFilesActionResult(allNewFiles, allOldFiles); + + } catch (Exception e) { + throw new RuntimeException("Compaction failed", e); + } + } + + private List<DataFile> scanFilesToRewrite() throws IOException { + List<DataFile> files = new ArrayList<>(); + + try (CloseableIterable<FileScanTask> tasks = table.newScan().filter(filter).planFiles()) { + + for (FileScanTask task : tasks) { + files.add(task.file()); + } + } catch (IOException e) { + throw new IOException("Failed to scan files", e); + } + return files; + } + + private List<List<DataFile>> binPackFiles(List<DataFile> files) { + List<List<DataFile>> groups = new ArrayList<>(); + List<DataFile> currentGroup = new ArrayList<>(); + long currentSize = 0; + + // Sort files by size for better bin-packing (smallest first) + files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), f2.fileSizeInBytes())); + + for (DataFile file : files) { + if (currentSize + file.fileSizeInBytes() > targetSizeInBytes + && !currentGroup.isEmpty()) { + // Start a new group + groups.add(new ArrayList<>(currentGroup)); + currentGroup.clear(); + currentSize = 0; + } + currentGroup.add(file); + currentSize += file.fileSizeInBytes(); + } + + // Add the last group + if (!currentGroup.isEmpty()) { + groups.add(currentGroup); + } + return groups; + } + + private DataFile rewriteFileGroup(List<DataFile> group) { + try { + // Generate output file path + String fileName = String.format("%s.parquet", UUID.randomUUID()); + OutputFile outputFile = + table.io().newOutputFile(table.locationProvider().newDataLocation(fileName)); + + // Create Parquet writer + FileAppenderFactory<Record> appenderFactory = Review Comment: For writing record, you can consider use `GenericRecordAppendOnlyWriter` which already consider different formats. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); Review Comment: `scanBucketFiles` should also be done in the `compactionExecutor` ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -126,6 +189,112 @@ private Table getTable(TablePath tablePath) throws IOException { } } + private void scheduleCompactionIfNeeded(int bucketId) { + if (!autoMaintenanceEnabled || compactionExecutor == null) { + return; + } + + if (bucketPartitionFieldName == null) { + return; + } + + try { + // Scan files for this bucket + List<DataFile> bucketFiles = scanBucketFiles(bucketId); + + // Check if compaction needed + if (shouldCompact(bucketFiles)) { + + compactionFuture = + CompletableFuture.supplyAsync( + () -> compactFiles(bucketFiles, bucketId), compactionExecutor); + } + } catch (Exception e) { + throw new RuntimeException("Failed to schedule compaction for bucket: " + bucketId, e); + } + } + + private List<DataFile> scanBucketFiles(int bucketId) { + List<DataFile> bucketFiles = new ArrayList<>(); + + try { + if (bucketPartitionFieldName == null) { + return bucketFiles; + } + // Scan files filtered by Iceberg bucket partition field (e.g., bucket_3_order_id) + CloseableIterable<FileScanTask> tasks = + icebergTable + .newScan() + .filter(Expressions.equal(bucketPartitionFieldName, bucketId)) + .planFiles(); + + try (CloseableIterable<FileScanTask> scanTasks = tasks) { + for (FileScanTask task : scanTasks) { + bucketFiles.add(task.file()); Review Comment: Now, it only works in append-only table since it didn't consider delete file. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/actions/IcebergRewriteDataFiles.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.actions; + +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Concrete implementation of {@link RewriteDataFiles} for Fluss's Iceberg integration. Handles + * bin-packing compaction of small files into larger ones. + */ +public class IcebergRewriteDataFiles implements RewriteDataFiles { + + private final Table table; + private Expression filter = Expressions.alwaysTrue(); + private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default + + public IcebergRewriteDataFiles(Table table) { + this.table = table; + } + + @Override + public RewriteDataFiles filter(Expression expression) { + this.filter = expression; + return this; + } + + @Override + public RewriteDataFiles targetSizeInBytes(long targetSize) { + this.targetSizeInBytes = targetSize; + return this; + } + + @Override + public RewriteDataFiles binPack() { + return this; + } + + @Override + public RewriteDataFilesActionResult execute() { + try { + // Scan files matching the filter + List<DataFile> filesToRewrite = scanFilesToRewrite(); + + if (filesToRewrite.size() < 2) { + return new RewriteDataFilesActionResult(new ArrayList<>(), new ArrayList<>()); + } + + // Group files using bin-packing strategy + List<List<DataFile>> fileGroups = binPackFiles(filesToRewrite); + + List<DataFile> allNewFiles = new ArrayList<>(); + List<DataFile> allOldFiles = new ArrayList<>(); + + // Rewrite each group + for (List<DataFile> group : fileGroups) { + if (group.size() < 2) { + continue; + } + + DataFile newFile = rewriteFileGroup(group); + if (newFile != null) { + allNewFiles.add(newFile); + allOldFiles.addAll(group); + } + } + + if (!allNewFiles.isEmpty()) { + commitChanges(allOldFiles, allNewFiles); + } + + return new RewriteDataFilesActionResult(allNewFiles, allOldFiles); + + } catch (Exception e) { + throw new RuntimeException("Compaction failed", e); + } + } + + private List<DataFile> scanFilesToRewrite() throws IOException { + List<DataFile> files = new ArrayList<>(); + + try (CloseableIterable<FileScanTask> tasks = table.newScan().filter(filter).planFiles()) { + + for (FileScanTask task : tasks) { + files.add(task.file()); + } + } catch (IOException e) { + throw new IOException("Failed to scan files", e); + } + return files; + } + + private List<List<DataFile>> binPackFiles(List<DataFile> files) { + List<List<DataFile>> groups = new ArrayList<>(); + List<DataFile> currentGroup = new ArrayList<>(); + long currentSize = 0; + + // Sort files by size for better bin-packing (smallest first) + files.sort((f1, f2) -> Long.compare(f1.fileSizeInBytes(), f2.fileSizeInBytes())); + + for (DataFile file : files) { + if (currentSize + file.fileSizeInBytes() > targetSizeInBytes + && !currentGroup.isEmpty()) { + // Start a new group + groups.add(new ArrayList<>(currentGroup)); + currentGroup.clear(); + currentSize = 0; + } + currentGroup.add(file); + currentSize += file.fileSizeInBytes(); + } + + // Add the last group + if (!currentGroup.isEmpty()) { + groups.add(currentGroup); + } + return groups; + } + + private DataFile rewriteFileGroup(List<DataFile> group) { + try { + // Generate output file path + String fileName = String.format("%s.parquet", UUID.randomUUID()); + OutputFile outputFile = + table.io().newOutputFile(table.locationProvider().newDataLocation(fileName)); + + // Create Parquet writer + FileAppenderFactory<Record> appenderFactory = + new GenericAppenderFactory(table.schema()); + + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite(); + + // Use Iceberg FileAppender to write, then build a DataFile manually + Metrics metrics; + long recordCount = 0L; + org.apache.iceberg.io.FileAppender<Record> writer = null; + try { + writer = writeBuilder.build(); + for (DataFile file : group) { + try (CloseableIterable<Record> records = readDataFile(file)) { + for (Record record : records) { + writer.add(record); + recordCount++; + } + } + } + } finally { + if (writer != null) { + writer.close(); + } + } + metrics = writer.metrics(); + + // Assumes all files in group share the same partition + PartitionSpec spec = table.spec(); + DataFile sample = group.get(0); + + String location = outputFile.location(); + InputFile inputFile = table.io().newInputFile(location); + long fileSizeInBytes = inputFile.getLength(); + + DataFile newFile = + DataFiles.builder(spec) + .withPath(location) + .withFileSizeInBytes(fileSizeInBytes) + .withFormat(FileFormat.PARQUET) + .withRecordCount(recordCount) + .withMetrics(metrics) + .withPartition(sample.partition()) // no-op for unpartitioned specs + .build(); + + return newFile; + + } catch (Exception e) { + throw new RuntimeException("Failed to rewrite file group", e); + } + } + + private CloseableIterable<Record> readDataFile(DataFile file) throws IOException { + return Parquet.read(table.io().newInputFile(file.path().toString())) Review Comment: It can only handle parquet files, but some table may with orc formats. You can use `org.apache.iceberg.data.GenericReader` to read for records. But `GenericReader` is package private, we can't access this class in outer package. So, we need create a same package in fluss project, and create a `IcebergGenericReader` to extend `GenericReader`. ``` package org.apache.iceberg.data; import org.apache.iceberg.TableScan; public class IcebergGenericReader extends GenericReader { IcebergGenericReader(TableScan scan, boolean reuseContainers) { super(scan, reuseContainers); } } ``` Then you can call method `IcebergGenericReader#open(FileScanTask task) ` to read the records. ########## fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java: ########## @@ -107,6 +161,15 @@ public IcebergWriteResult complete() throws IOException { @Override public void close() throws IOException { try { + if (compactionFuture != null && !compactionFuture.isDone()) { + compactionFuture.cancel(true); + } + + if (compactionExecutor != null) { + compactionExecutor.shutdownNow(); + compactionExecutor.awaitTermination(5, TimeUnit.SECONDS); Review Comment: nit: ``` if (!compactionExecutor.awaitTermination(30, TimeUnit.SECONDS)) { LOG.warn("Fail to close compactionExecutor."); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
