This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push: new 1965b06dd [lake] Introduce table.datalake.auto-compaction options (#1612) 1965b06dd is described below commit 1965b06ddb35379fb8e5d8db8948cb153461f97d Author: xx789 <xx7896...@163.com> AuthorDate: Fri Aug 29 19:00:43 2025 +0800 [lake] Introduce table.datalake.auto-compaction options (#1612) --- .../org/apache/fluss/config/ConfigOptions.java | 7 +++ .../java/org/apache/fluss/config/TableConfig.java | 5 ++ .../fluss/lake/writer/WriterInitContext.java | 20 ++----- .../flink/tiering/source/TieringSplitReader.java | 3 +- .../tiering/source/TieringWriterInitContext.java | 22 ++----- .../tiering/writer/AppendOnlyTaskWriter.java | 2 +- .../iceberg/tiering/writer/DeltaTaskWriter.java | 2 +- .../lake/iceberg/tiering/IcebergTieringTest.java | 37 +++++++----- .../fluss/lake/lance/tiering/LanceLakeWriter.java | 4 +- .../fluss/lake/lance/tiering/LanceTieringTest.java | 29 ++++----- .../lake/paimon/tiering/PaimonLakeWriter.java | 18 +++++- .../lake/paimon/tiering/PaimonTieringTest.java | 68 +++++++++++++++++----- website/docs/engine-flink/options.md | 1 + website/docs/maintenance/operations/upgrading.md | 23 +++++++- 14 files changed, 161 insertions(+), 80 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java index d739d1ab3..99e99405a 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java @@ -1290,6 +1290,13 @@ public class ConfigOptions { + "Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. " + "If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs."); + public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_COMPACTION = + key("table.datalake.auto-compaction") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default."); + public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE = key("table.merge-engine") .enumType(MergeEngineType.class) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 910a3d8b1..a1f8ac9cc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -94,6 +94,11 @@ public class TableConfig { return config.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); } + /** Whether auto compaction is enabled. */ + public boolean isDataLakeAutoCompaction() { + return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION); + } + /** Gets the optional merge engine type of the table. */ public Optional<MergeEngineType> getMergeEngineType() { return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE); diff --git a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java index 4e9a3d290..6c9c2f408 100644 --- a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java @@ -18,17 +18,16 @@ package org.apache.fluss.lake.writer; import org.apache.fluss.annotation.PublicEvolving; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import javax.annotation.Nullable; -import java.util.Map; - /** * The WriterInitContext interface provides the context needed to create a LakeWriter. It includes - * methods to obtain the table path, table bucket, and an optional partition. + * methods to obtain the table path, table bucket, an optional partition, and table metadata + * information. * * @since 0.7 */ @@ -58,16 +57,9 @@ public interface WriterInitContext { String partition(); /** - * Returns the table schema. - * - * @return the table schema - */ - Schema schema(); - - /** - * Returns the table custom properties. + * Returns the Fluss table info. * - * @return the table custom properties + * @return the Fluss table info */ - Map<String, String> customProperties(); + TableInfo tableInfo(); } diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java index a2e4061b0..80e2e4c6f 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java @@ -309,8 +309,7 @@ public class TieringSplitReader<WriteResult> currentTablePath, bucket, partitionName, - currentTable.getTableInfo().getSchema(), - currentTable.getTableInfo().getCustomProperties().toMap())); + currentTable.getTableInfo())); lakeWriters.put(bucket, lakeWriter); } return lakeWriter; diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java index b6598e46d..121c4fb9c 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java @@ -18,34 +18,29 @@ package org.apache.fluss.flink.tiering.source; import org.apache.fluss.lake.writer.WriterInitContext; -import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import javax.annotation.Nullable; -import java.util.Map; - /** The implementation of {@link WriterInitContext}. */ public class TieringWriterInitContext implements WriterInitContext { private final TablePath tablePath; private final TableBucket tableBucket; - private final Schema schema; @Nullable private final String partition; - private final Map<String, String> customProperties; + private final TableInfo tableInfo; public TieringWriterInitContext( TablePath tablePath, TableBucket tableBucket, @Nullable String partition, - Schema schema, - Map<String, String> customProperties) { + TableInfo tableInfo) { this.tablePath = tablePath; this.tableBucket = tableBucket; this.partition = partition; - this.schema = schema; - this.customProperties = customProperties; + this.tableInfo = tableInfo; } @Override @@ -65,12 +60,7 @@ public class TieringWriterInitContext implements WriterInitContext { } @Override - public Schema schema() { - return schema; - } - - @Override - public Map<String, String> customProperties() { - return customProperties; + public TableInfo tableInfo() { + return tableInfo; } } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java index 53ccb3a80..57db9a5db 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java @@ -35,7 +35,7 @@ public class AppendOnlyTaskWriter extends RecordWriter { super( taskWriter, icebergTable.schema(), - writerInitContext.schema().getRowType(), + writerInitContext.tableInfo().getRowType(), writerInitContext.tableBucket()); } diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java index cbab8fdf9..cea23d891 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java @@ -35,7 +35,7 @@ public class DeltaTaskWriter extends RecordWriter { super( taskWriter, icebergTable.schema(), - writerInitContext.schema().getRowType(), + writerInitContext.tableInfo().getRowType(), writerInitContext.tableBucket()); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index 0e3600fab..c81c331f1 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -17,12 +17,15 @@ package org.apache.fluss.lake.iceberg.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; import org.apache.fluss.record.GenericRecord; @@ -119,6 +122,19 @@ class IcebergTieringTest { isPartitionedTable ? "partitioned" : "unpartitioned")); createTable(tablePath, isPrimaryKeyTable, isPartitionedTable); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()) + .build()) + .distributedBy(BUCKET_NUM) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>(); @@ -144,7 +160,7 @@ class IcebergTieringTest { for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { String partition = entry.getValue(); try (LakeWriter<IcebergWriteResult> writer = - createLakeWriter(tablePath, bucket, partition, entry.getKey())) { + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords = isPrimaryKeyTable @@ -198,7 +214,11 @@ class IcebergTieringTest { } private LakeWriter<IcebergWriteResult> createLakeWriter( - TablePath tablePath, int bucket, @Nullable String partition, @Nullable Long partitionId) + TablePath tablePath, + int bucket, + @Nullable String partition, + @Nullable Long partitionId, + TableInfo tableInfo) throws IOException { return icebergLakeTieringFactory.createLakeWriter( new WriterInitContext() { @@ -219,17 +239,8 @@ class IcebergTieringTest { } @Override - public Map<String, String> customProperties() { - return Collections.emptyMap(); - } - - @Override - public org.apache.fluss.metadata.Schema schema() { - return org.apache.fluss.metadata.Schema.newBuilder() - .column("c1", DataTypes.INT()) - .column("c2", DataTypes.STRING()) - .column("c3", DataTypes.STRING()) - .build(); + public TableInfo tableInfo() { + return tableInfo; } }); } diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java index e3f3e9b40..699a528cc 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java @@ -45,7 +45,7 @@ public class LanceLakeWriter implements LakeWriter<LanceWriteResult> { LanceConfig config = LanceConfig.from( options.toMap(), - writerInitContext.customProperties(), + writerInitContext.tableInfo().getCustomProperties().toMap(), writerInitContext.tablePath().getDatabaseName(), writerInitContext.tablePath().getTableName()); int batchSize = LanceConfig.getBatchSize(config); @@ -56,7 +56,7 @@ public class LanceLakeWriter implements LakeWriter<LanceWriteResult> { this.arrowWriter = LanceDatasetAdapter.getArrowWriter( - schema.get(), batchSize, writerInitContext.schema().getRowType()); + schema.get(), batchSize, writerInitContext.tableInfo().getRowType()); WriteParams params = LanceConfig.genWriteParamsFromConfig(config); Callable<List<FragmentMetadata>> fragmentCreator = diff --git a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java index 3017d38c2..9571f651a 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.lake.lance.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; import org.apache.fluss.lake.committer.LakeCommitter; @@ -28,6 +29,8 @@ import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; import org.apache.fluss.record.GenericRecord; @@ -96,6 +99,15 @@ class LanceTieringTest { tablePath.getTableName()); Schema schema = createTable(config); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + List<LanceWriteResult> lanceWriteResults = new ArrayList<>(); SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer = lanceLakeTieringFactory.getWriteResultSerializer(); @@ -126,7 +138,7 @@ class LanceTieringTest { for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { String partition = entry.getValue(); try (LakeWriter<LanceWriteResult> lakeWriter = - createLakeWriter(tablePath, bucket, partition, schema, customProperties)) { + createLakeWriter(tablePath, bucket, partition, tableInfo)) { Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords = genLogTableRecords(partition, bucket, 10); @@ -239,11 +251,7 @@ class LanceTieringTest { } private LakeWriter<LanceWriteResult> createLakeWriter( - TablePath tablePath, - int bucket, - @Nullable String partition, - Schema schema, - Map<String, String> customProperties) + TablePath tablePath, int bucket, @Nullable String partition, TableInfo tableInfo) throws IOException { return lanceLakeTieringFactory.createLakeWriter( new WriterInitContext() { @@ -265,13 +273,8 @@ class LanceTieringTest { } @Override - public org.apache.fluss.metadata.Schema schema() { - return schema; - } - - @Override - public Map<String, String> customProperties() { - return customProperties; + public TableInfo tableInfo() { + return tableInfo; } }); } diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java index 383177535..8472b825b 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java @@ -24,12 +24,15 @@ import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogRecord; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon; @@ -43,7 +46,10 @@ public class PaimonLakeWriter implements LakeWriter<PaimonWriteResult> { PaimonCatalogProvider paimonCatalogProvider, WriterInitContext writerInitContext) throws IOException { this.paimonCatalog = paimonCatalogProvider.get(); - FileStoreTable fileStoreTable = getTable(writerInitContext.tablePath()); + FileStoreTable fileStoreTable = + getTable( + writerInitContext.tablePath(), + writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction()); List<String> partitionKeys = fileStoreTable.partitionKeys(); @@ -95,9 +101,15 @@ public class PaimonLakeWriter implements LakeWriter<PaimonWriteResult> { } } - private FileStoreTable getTable(TablePath tablePath) throws IOException { + private FileStoreTable getTable(TablePath tablePath, boolean isAutoCompaction) + throws IOException { try { - return (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath)); + Map<String, String> compactionOptions = + Collections.singletonMap( + CoreOptions.WRITE_ONLY.key(), + isAutoCompaction ? Boolean.FALSE.toString() : Boolean.TRUE.toString()); + return table.copy(compactionOptions); } catch (Exception e) { throw new IOException("Failed to get table " + tablePath + " in Paimon.", e); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java index fdbd1eb31..87bdefa69 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.lake.paimon.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; import org.apache.fluss.lake.committer.LakeCommitter; @@ -24,6 +25,8 @@ import org.apache.fluss.lake.serializer.SimpleVersionedSerializer; import org.apache.fluss.lake.writer.LakeWriter; import org.apache.fluss.lake.writer.WriterInitContext; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.ChangeType; import org.apache.fluss.record.GenericRecord; @@ -118,6 +121,18 @@ class PaimonTieringTest { isPartitioned ? "partitioned" : "non_partitioned")); createTable( tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable ? bucketNum : null); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.INT()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column("c3", org.apache.fluss.types.DataTypes.STRING()) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); List<PaimonWriteResult> paimonWriteResults = new ArrayList<>(); SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer = @@ -148,7 +163,7 @@ class PaimonTieringTest { for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { String partition = entry.getValue(); try (LakeWriter<PaimonWriteResult> lakeWriter = - createLakeWriter(tablePath, bucket, partition, entry.getKey())) { + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket); Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords = isPrimaryKeyTable @@ -233,6 +248,20 @@ class PaimonTieringTest { // Test multiple partitions: region + year TablePath tablePath = TablePath.of("paimon", "test_multi_partition"); createMultiPartitionTable(tablePath); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.INT()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column("region", org.apache.fluss.types.DataTypes.STRING()) + .column("year", org.apache.fluss.types.DataTypes.STRING()) + .build()) + .partitionedBy("region", "year") + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); Map<String, List<LogRecord>> recordsByPartition = new HashMap<>(); List<PaimonWriteResult> paimonWriteResults = new ArrayList<>(); @@ -253,7 +282,7 @@ class PaimonTieringTest { for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { String partition = entry.getValue(); try (LakeWriter<PaimonWriteResult> lakeWriter = - createLakeWriter(tablePath, bucket, partition, entry.getKey())) { + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { List<LogRecord> logRecords = genLogTableRecordsForMultiPartition(partition, bucket, 3); recordsByPartition.put(partition, logRecords); @@ -295,7 +324,21 @@ class PaimonTieringTest { // Test three partitions: region + year + month TablePath tablePath = TablePath.of("paimon", "test_three_partition"); createThreePartitionTable(tablePath); - + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", org.apache.fluss.types.DataTypes.INT()) + .column("c2", org.apache.fluss.types.DataTypes.STRING()) + .column("region", org.apache.fluss.types.DataTypes.STRING()) + .column("year", org.apache.fluss.types.DataTypes.STRING()) + .column("month", org.apache.fluss.types.DataTypes.STRING()) + .build()) + .partitionedBy("region", "year", "month") + .distributedBy(1) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); Map<String, List<LogRecord>> recordsByPartition = new HashMap<>(); List<PaimonWriteResult> paimonWriteResults = new ArrayList<>(); Map<TableBucket, Long> tableBucketOffsets = new HashMap<>(); @@ -313,7 +356,7 @@ class PaimonTieringTest { for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) { String partition = entry.getValue(); try (LakeWriter<PaimonWriteResult> lakeWriter = - createLakeWriter(tablePath, bucket, partition, entry.getKey())) { + createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) { List<LogRecord> logRecords = genLogTableRecordsForMultiPartition( partition, bucket, 2); // Use same method @@ -639,7 +682,11 @@ class PaimonTieringTest { } private LakeWriter<PaimonWriteResult> createLakeWriter( - TablePath tablePath, int bucket, @Nullable String partition, @Nullable Long partitionId) + TablePath tablePath, + int bucket, + @Nullable String partition, + @Nullable Long partitionId, + TableInfo tableInfo) throws IOException { return paimonLakeTieringFactory.createLakeWriter( new WriterInitContext() { @@ -661,15 +708,8 @@ class PaimonTieringTest { } @Override - public Map<String, String> customProperties() { - // don't care about table custom properties for Paimon lake writer - return new HashMap<>(); - } - - @Override - public org.apache.fluss.metadata.Schema schema() { - throw new UnsupportedOperationException( - "The lake writer in Paimon currently uses paimonCatalog to determine the schema."); + public TableInfo tableInfo() { + return tableInfo; } }); } diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index fce9bb7a0..159352c6c 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -80,6 +80,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); | table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. [...] | table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only `paimon` is supported. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Unio [...] | table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer targe [...] +| table.datalake.auto-compaction | Boolean | false | If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default. [...] | table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). It also supports two merge engines are `first_row` and `versioned`. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [v [...] | table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the `versioned` merge engine. If the merge engine is set to `versioned`, the version column must be set. [...] diff --git a/website/docs/maintenance/operations/upgrading.md b/website/docs/maintenance/operations/upgrading.md index 71b009916..0b635ec27 100644 --- a/website/docs/maintenance/operations/upgrading.md +++ b/website/docs/maintenance/operations/upgrading.md @@ -83,4 +83,25 @@ The compatibility between the Fluss client and the Fluss server is described in | | Server 0.6 | Server 0.7 | Limitations | |------------|------------|------------|-------------| | Client 0.6 | ✔️ | ✔️ | | -| Client 0.7 | ✔️ | ✔️ | | \ No newline at end of file +| Client 0.7 | ✔️ | ✔️ | | + +## Upgrading Fluss datalake tiering service + +### Behavior Change +Since Fluss 0.8, the auto-compaction feature during datalake tiering is **disabled** by default. Compaction will no longer be triggered automatically as part of the tiering process. + +| Version | Behavior | +|----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------| +| **Previous Version(v0.7 and earlier)** | Auto-compaction enabled during tiering | +| **Fluss 0.8+** | **Auto-compaction disabled by default**. Tiering service focus solely on data movement; compaction must be explicitly enabled. | + +### How to Enable Compaction +To maintain the previous behavior and enable automatic compaction, you must manually configure `table.datalake.auto-compaction = true` for each table in table option. + +**Important Note**: This is a per-table setting. This design provides granular control, allowing you to manage compaction based on the specific performance and storage needs of each individual table. + +### Reason for Change & Benefits +This change was implemented to significantly improve the core stability and performance of the Datalake Tiering Service. +- **Enhanced Stability & Performance**: Compaction is a resource-intensive operation (CPU/IO) that can impose significant pressure on the tiering service. By disabling it by default, the service can dedicate all resources to its primary function: reliably and efficiently moving data. This results in a more stable, predictable, and smoother tiering experience for all users. +- **Granular Control & Flexibility**: This change empowers users to make a conscious choice based on their specific needs. You can now decide the optimal balance between storage efficiency (achieved through compaction) and computational resource allocation on a per-table basis. +