This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1965b06dd [lake] Introduce table.datalake.auto-compaction options 
(#1612)
1965b06dd is described below

commit 1965b06ddb35379fb8e5d8db8948cb153461f97d
Author: xx789 <xx7896...@163.com>
AuthorDate: Fri Aug 29 19:00:43 2025 +0800

    [lake] Introduce table.datalake.auto-compaction options (#1612)
---
 .../org/apache/fluss/config/ConfigOptions.java     |  7 +++
 .../java/org/apache/fluss/config/TableConfig.java  |  5 ++
 .../fluss/lake/writer/WriterInitContext.java       | 20 ++-----
 .../flink/tiering/source/TieringSplitReader.java   |  3 +-
 .../tiering/source/TieringWriterInitContext.java   | 22 ++-----
 .../tiering/writer/AppendOnlyTaskWriter.java       |  2 +-
 .../iceberg/tiering/writer/DeltaTaskWriter.java    |  2 +-
 .../lake/iceberg/tiering/IcebergTieringTest.java   | 37 +++++++-----
 .../fluss/lake/lance/tiering/LanceLakeWriter.java  |  4 +-
 .../fluss/lake/lance/tiering/LanceTieringTest.java | 29 ++++-----
 .../lake/paimon/tiering/PaimonLakeWriter.java      | 18 +++++-
 .../lake/paimon/tiering/PaimonTieringTest.java     | 68 +++++++++++++++++-----
 website/docs/engine-flink/options.md               |  1 +
 website/docs/maintenance/operations/upgrading.md   | 23 +++++++-
 14 files changed, 161 insertions(+), 80 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index d739d1ab3..99e99405a 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1290,6 +1290,13 @@ public class ConfigOptions {
                                     + "Based on this target freshness, the 
Fluss service automatically moves data from the Fluss table and updates to the 
datalake table, so that the data in the datalake table is kept up to date 
within this target. "
                                     + "If the data does not need to be as 
fresh, you can specify a longer target freshness time to reduce costs.");
 
+    public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_COMPACTION =
+            key("table.datalake.auto-compaction")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, compaction will be triggered 
automatically when tiering service writes to the datalake. It is disabled by 
default.");
+
     public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
             key("table.merge-engine")
                     .enumType(MergeEngineType.class)
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index 910a3d8b1..a1f8ac9cc 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -94,6 +94,11 @@ public class TableConfig {
         return config.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
     }
 
+    /** Whether auto compaction is enabled. */
+    public boolean isDataLakeAutoCompaction() {
+        return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
+    }
+
     /** Gets the optional merge engine type of the table. */
     public Optional<MergeEngineType> getMergeEngineType() {
         return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java
index 4e9a3d290..6c9c2f408 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java
@@ -18,17 +18,16 @@
 package org.apache.fluss.lake.writer;
 
 import org.apache.fluss.annotation.PublicEvolving;
-import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
 
-import java.util.Map;
-
 /**
  * The WriterInitContext interface provides the context needed to create a 
LakeWriter. It includes
- * methods to obtain the table path, table bucket, and an optional partition.
+ * methods to obtain the table path, table bucket, an optional partition, and 
table metadata
+ * information.
  *
  * @since 0.7
  */
@@ -58,16 +57,9 @@ public interface WriterInitContext {
     String partition();
 
     /**
-     * Returns the table schema.
-     *
-     * @return the table schema
-     */
-    Schema schema();
-
-    /**
-     * Returns the table custom properties.
+     * Returns the Fluss table info.
      *
-     * @return the table custom properties
+     * @return the Fluss table info
      */
-    Map<String, String> customProperties();
+    TableInfo tableInfo();
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
index a2e4061b0..80e2e4c6f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
@@ -309,8 +309,7 @@ public class TieringSplitReader<WriteResult>
                                     currentTablePath,
                                     bucket,
                                     partitionName,
-                                    currentTable.getTableInfo().getSchema(),
-                                    
currentTable.getTableInfo().getCustomProperties().toMap()));
+                                    currentTable.getTableInfo()));
             lakeWriters.put(bucket, lakeWriter);
         }
         return lakeWriter;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java
index b6598e46d..121c4fb9c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java
@@ -18,34 +18,29 @@
 package org.apache.fluss.flink.tiering.source;
 
 import org.apache.fluss.lake.writer.WriterInitContext;
-import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
 import javax.annotation.Nullable;
 
-import java.util.Map;
-
 /** The implementation of {@link WriterInitContext}. */
 public class TieringWriterInitContext implements WriterInitContext {
 
     private final TablePath tablePath;
     private final TableBucket tableBucket;
-    private final Schema schema;
     @Nullable private final String partition;
-    private final Map<String, String> customProperties;
+    private final TableInfo tableInfo;
 
     public TieringWriterInitContext(
             TablePath tablePath,
             TableBucket tableBucket,
             @Nullable String partition,
-            Schema schema,
-            Map<String, String> customProperties) {
+            TableInfo tableInfo) {
         this.tablePath = tablePath;
         this.tableBucket = tableBucket;
         this.partition = partition;
-        this.schema = schema;
-        this.customProperties = customProperties;
+        this.tableInfo = tableInfo;
     }
 
     @Override
@@ -65,12 +60,7 @@ public class TieringWriterInitContext implements 
WriterInitContext {
     }
 
     @Override
-    public Schema schema() {
-        return schema;
-    }
-
-    @Override
-    public Map<String, String> customProperties() {
-        return customProperties;
+    public TableInfo tableInfo() {
+        return tableInfo;
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
index 53ccb3a80..57db9a5db 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
@@ -35,7 +35,7 @@ public class AppendOnlyTaskWriter extends RecordWriter {
         super(
                 taskWriter,
                 icebergTable.schema(),
-                writerInitContext.schema().getRowType(),
+                writerInitContext.tableInfo().getRowType(),
                 writerInitContext.tableBucket());
     }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
index cbab8fdf9..cea23d891 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -35,7 +35,7 @@ public class DeltaTaskWriter extends RecordWriter {
         super(
                 taskWriter,
                 icebergTable.schema(),
-                writerInitContext.schema().getRowType(),
+                writerInitContext.tableInfo().getRowType(),
                 writerInitContext.tableBucket());
     }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index 0e3600fab..c81c331f1 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.GenericRecord;
@@ -119,6 +122,19 @@ class IcebergTieringTest {
                                 isPartitionedTable ? "partitioned" : 
"unpartitioned"));
         createTable(tablePath, isPrimaryKeyTable, isPartitionedTable);
 
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", DataTypes.INT())
+                                        .column("c2", DataTypes.STRING())
+                                        .column("c3", DataTypes.STRING())
+                                        .build())
+                        .distributedBy(BUCKET_NUM)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
         Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
 
         Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
@@ -144,7 +160,7 @@ class IcebergTieringTest {
             for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
                 String partition = entry.getValue();
                 try (LakeWriter<IcebergWriteResult> writer =
-                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
+                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
                     Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
                     Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
                             isPrimaryKeyTable
@@ -198,7 +214,11 @@ class IcebergTieringTest {
     }
 
     private LakeWriter<IcebergWriteResult> createLakeWriter(
-            TablePath tablePath, int bucket, @Nullable String partition, 
@Nullable Long partitionId)
+            TablePath tablePath,
+            int bucket,
+            @Nullable String partition,
+            @Nullable Long partitionId,
+            TableInfo tableInfo)
             throws IOException {
         return icebergLakeTieringFactory.createLakeWriter(
                 new WriterInitContext() {
@@ -219,17 +239,8 @@ class IcebergTieringTest {
                     }
 
                     @Override
-                    public Map<String, String> customProperties() {
-                        return Collections.emptyMap();
-                    }
-
-                    @Override
-                    public org.apache.fluss.metadata.Schema schema() {
-                        return org.apache.fluss.metadata.Schema.newBuilder()
-                                .column("c1", DataTypes.INT())
-                                .column("c2", DataTypes.STRING())
-                                .column("c3", DataTypes.STRING())
-                                .build();
+                    public TableInfo tableInfo() {
+                        return tableInfo;
                     }
                 });
     }
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
index e3f3e9b40..699a528cc 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java
@@ -45,7 +45,7 @@ public class LanceLakeWriter implements 
LakeWriter<LanceWriteResult> {
         LanceConfig config =
                 LanceConfig.from(
                         options.toMap(),
-                        writerInitContext.customProperties(),
+                        
writerInitContext.tableInfo().getCustomProperties().toMap(),
                         writerInitContext.tablePath().getDatabaseName(),
                         writerInitContext.tablePath().getTableName());
         int batchSize = LanceConfig.getBatchSize(config);
@@ -56,7 +56,7 @@ public class LanceLakeWriter implements 
LakeWriter<LanceWriteResult> {
 
         this.arrowWriter =
                 LanceDatasetAdapter.getArrowWriter(
-                        schema.get(), batchSize, 
writerInitContext.schema().getRowType());
+                        schema.get(), batchSize, 
writerInitContext.tableInfo().getRowType());
 
         WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
         Callable<List<FragmentMetadata>> fragmentCreator =
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
index 3017d38c2..9571f651a 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.lake.lance.tiering;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitter;
@@ -28,6 +29,8 @@ import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.GenericRecord;
@@ -96,6 +99,15 @@ class LanceTieringTest {
                         tablePath.getTableName());
         Schema schema = createTable(config);
 
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(schema)
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .customProperties(customProperties)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+
         List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
         SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
                 lanceLakeTieringFactory.getWriteResultSerializer();
@@ -126,7 +138,7 @@ class LanceTieringTest {
             for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
                 String partition = entry.getValue();
                 try (LakeWriter<LanceWriteResult> lakeWriter =
-                        createLakeWriter(tablePath, bucket, partition, schema, 
customProperties)) {
+                        createLakeWriter(tablePath, bucket, partition, 
tableInfo)) {
                     Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
                     Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
                             genLogTableRecords(partition, bucket, 10);
@@ -239,11 +251,7 @@ class LanceTieringTest {
     }
 
     private LakeWriter<LanceWriteResult> createLakeWriter(
-            TablePath tablePath,
-            int bucket,
-            @Nullable String partition,
-            Schema schema,
-            Map<String, String> customProperties)
+            TablePath tablePath, int bucket, @Nullable String partition, 
TableInfo tableInfo)
             throws IOException {
         return lanceLakeTieringFactory.createLakeWriter(
                 new WriterInitContext() {
@@ -265,13 +273,8 @@ class LanceTieringTest {
                     }
 
                     @Override
-                    public org.apache.fluss.metadata.Schema schema() {
-                        return schema;
-                    }
-
-                    @Override
-                    public Map<String, String> customProperties() {
-                        return customProperties;
+                    public TableInfo tableInfo() {
+                        return tableInfo;
                     }
                 });
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
index 383177535..8472b825b 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java
@@ -24,12 +24,15 @@ import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogRecord;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 
@@ -43,7 +46,10 @@ public class PaimonLakeWriter implements 
LakeWriter<PaimonWriteResult> {
             PaimonCatalogProvider paimonCatalogProvider, WriterInitContext 
writerInitContext)
             throws IOException {
         this.paimonCatalog = paimonCatalogProvider.get();
-        FileStoreTable fileStoreTable = 
getTable(writerInitContext.tablePath());
+        FileStoreTable fileStoreTable =
+                getTable(
+                        writerInitContext.tablePath(),
+                        
writerInitContext.tableInfo().getTableConfig().isDataLakeAutoCompaction());
 
         List<String> partitionKeys = fileStoreTable.partitionKeys();
 
@@ -95,9 +101,15 @@ public class PaimonLakeWriter implements 
LakeWriter<PaimonWriteResult> {
         }
     }
 
-    private FileStoreTable getTable(TablePath tablePath) throws IOException {
+    private FileStoreTable getTable(TablePath tablePath, boolean 
isAutoCompaction)
+            throws IOException {
         try {
-            return (FileStoreTable) 
paimonCatalog.getTable(toPaimon(tablePath));
+            FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(toPaimon(tablePath));
+            Map<String, String> compactionOptions =
+                    Collections.singletonMap(
+                            CoreOptions.WRITE_ONLY.key(),
+                            isAutoCompaction ? Boolean.FALSE.toString() : 
Boolean.TRUE.toString());
+            return table.copy(compactionOptions);
         } catch (Exception e) {
             throw new IOException("Failed to get table " + tablePath + " in 
Paimon.", e);
         }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index fdbd1eb31..87bdefa69 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.lake.paimon.tiering;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitter;
@@ -24,6 +25,8 @@ import 
org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.GenericRecord;
@@ -118,6 +121,18 @@ class PaimonTieringTest {
                                 isPartitioned ? "partitioned" : 
"non_partitioned"));
         createTable(
                 tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable 
? bucketNum : null);
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", 
org.apache.fluss.types.DataTypes.INT())
+                                        .column("c2", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("c3", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .build())
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
 
         List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
         SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
@@ -148,7 +163,7 @@ class PaimonTieringTest {
             for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
                 String partition = entry.getValue();
                 try (LakeWriter<PaimonWriteResult> lakeWriter =
-                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
+                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
                     Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
                     Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
                             isPrimaryKeyTable
@@ -233,6 +248,20 @@ class PaimonTieringTest {
         // Test multiple partitions: region + year
         TablePath tablePath = TablePath.of("paimon", "test_multi_partition");
         createMultiPartitionTable(tablePath);
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", 
org.apache.fluss.types.DataTypes.INT())
+                                        .column("c2", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("region", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("year", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .build())
+                        .partitionedBy("region", "year")
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
 
         Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
         List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
@@ -253,7 +282,7 @@ class PaimonTieringTest {
         for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
             String partition = entry.getValue();
             try (LakeWriter<PaimonWriteResult> lakeWriter =
-                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
+                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
                 List<LogRecord> logRecords =
                         genLogTableRecordsForMultiPartition(partition, bucket, 
3);
                 recordsByPartition.put(partition, logRecords);
@@ -295,7 +324,21 @@ class PaimonTieringTest {
         // Test three partitions: region + year + month
         TablePath tablePath = TablePath.of("paimon", "test_three_partition");
         createThreePartitionTable(tablePath);
-
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", 
org.apache.fluss.types.DataTypes.INT())
+                                        .column("c2", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("region", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("year", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("month", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .build())
+                        .partitionedBy("region", "year", "month")
+                        .distributedBy(1)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
         Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
         List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
         Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
@@ -313,7 +356,7 @@ class PaimonTieringTest {
         for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
             String partition = entry.getValue();
             try (LakeWriter<PaimonWriteResult> lakeWriter =
-                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
+                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
                 List<LogRecord> logRecords =
                         genLogTableRecordsForMultiPartition(
                                 partition, bucket, 2); // Use same method
@@ -639,7 +682,11 @@ class PaimonTieringTest {
     }
 
     private LakeWriter<PaimonWriteResult> createLakeWriter(
-            TablePath tablePath, int bucket, @Nullable String partition, 
@Nullable Long partitionId)
+            TablePath tablePath,
+            int bucket,
+            @Nullable String partition,
+            @Nullable Long partitionId,
+            TableInfo tableInfo)
             throws IOException {
         return paimonLakeTieringFactory.createLakeWriter(
                 new WriterInitContext() {
@@ -661,15 +708,8 @@ class PaimonTieringTest {
                     }
 
                     @Override
-                    public Map<String, String> customProperties() {
-                        // don't care about table custom properties for Paimon 
lake writer
-                        return new HashMap<>();
-                    }
-
-                    @Override
-                    public org.apache.fluss.metadata.Schema schema() {
-                        throw new UnsupportedOperationException(
-                                "The lake writer in Paimon currently uses 
paimonCatalog to determine the schema.");
+                    public TableInfo tableInfo() {
+                        return tableInfo;
                     }
                 });
     }
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index fce9bb7a0..159352c6c 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -80,6 +80,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d');
 | table.datalake.enabled                  | Boolean  | false                   
            | Whether enable lakehouse storage for the table. Disabled by 
default. When this option is set to ture and the datalake tiering service is 
up, the table will be tiered and compacted into datalake format stored on 
lakehouse storage.                                                              
                                                                                
                             [...]
 | table.datalake.format                   | Enum     | (None)                  
            | The data lake format of the table specifies the tiered Lakehouse 
storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only 
`paimon` is supported. Once the `table.datalake.format` property is configured, 
Fluss adopts the key encoding and bucketing strategy used by the corresponding 
data lake format. This ensures consistency in key encoding and bucketing, 
enabling seamless **Unio [...]
 | table.datalake.freshness                | Duration | 3min                    
            | It defines the maximum amount of time that the datalake table's 
content should lag behind updates to the Fluss table. Based on this target 
freshness, the Fluss service automatically moves data from the Fluss table and 
updates to the datalake table, so that the data in the datalake table is kept 
up to date within this target. If the data does not need to be as fresh, you 
can specify a longer targe [...]
+| table.datalake.auto-compaction          | Boolean | false                    
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
 | table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). 
It also supports two merge engines are `first_row` and `versioned`. The 
[first_row merge 
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep 
the first row of the same primary key. The [v [...]
 | table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
 
diff --git a/website/docs/maintenance/operations/upgrading.md 
b/website/docs/maintenance/operations/upgrading.md
index 71b009916..0b635ec27 100644
--- a/website/docs/maintenance/operations/upgrading.md
+++ b/website/docs/maintenance/operations/upgrading.md
@@ -83,4 +83,25 @@ The compatibility between the Fluss client and the Fluss 
server is described in
 |            | Server 0.6 | Server 0.7 | Limitations |
 |------------|------------|------------|-------------|
 | Client 0.6 | ✔️         | ✔️         |             |
-| Client 0.7 | ✔️         | ✔️         |             |
\ No newline at end of file
+| Client 0.7 | ✔️         | ✔️         |             |
+
+## Upgrading Fluss datalake tiering service
+
+### Behavior Change
+Since Fluss 0.8, the auto-compaction feature during datalake tiering is 
**disabled** by default. Compaction will no longer be triggered automatically 
as part of the tiering process.
+
+| Version                                | Behavior                            
                                                                                
           |
+|----------------------------------------|--------------------------------------------------------------------------------------------------------------------------------|
+| **Previous Version(v0.7 and earlier)** | Auto-compaction enabled during 
tiering                                                                         
                |
+| **Fluss 0.8+**                         | **Auto-compaction disabled by 
default**. Tiering service focus solely on data movement; compaction must be 
explicitly enabled. |
+
+### How to Enable Compaction
+To maintain the previous behavior and enable automatic compaction, you must 
manually configure `table.datalake.auto-compaction = true` for each table in 
table option.
+
+**Important Note**: This is a per-table setting. This design provides granular 
control, allowing you to manage compaction based on the specific performance 
and storage needs of each individual table.
+
+### Reason for Change & Benefits
+This change was implemented to significantly improve the core stability and 
performance of the Datalake Tiering Service.
+- **Enhanced Stability & Performance**: Compaction is a resource-intensive 
operation (CPU/IO) that can impose significant pressure on the tiering service. 
By disabling it by default, the service can dedicate all resources to its 
primary function: reliably and efficiently moving data. This results in a more 
stable, predictable, and smoother tiering experience for all users.
+- **Granular Control & Flexibility**: This change empowers users to make a 
conscious choice based on their specific needs. You can now decide the optimal 
balance between storage efficiency (achieved through compaction) and 
computational resource allocation on a per-table basis.
+


Reply via email to