This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 58957fdec [lake] Support auto snapshot expiration for paimon lake 
table (#2184)
58957fdec is described below

commit 58957fdec456f9f4433046b17f29eb3f4e156bd8
Author: Liebing <[email protected]>
AuthorDate: Mon Dec 22 09:47:47 2025 +0800

    [lake] Support auto snapshot expiration for paimon lake table (#2184)
---
 .../org/apache/fluss/config/ConfigOptions.java     |  21 ++
 .../java/org/apache/fluss/config/TableConfig.java  |   5 +
 .../fluss/lake/committer/CommitterInitContext.java |  18 +-
 .../fluss/flink/tiering/LakeTieringJobBuilder.java |  10 +-
 .../tiering/committer/TieringCommitOperator.java   |   8 +-
 .../committer/TieringCommitOperatorFactory.java    |   6 +-
 .../committer/TieringCommitterInitContext.java     |  19 +-
 .../committer/TieringCommitOperatorTest.java       |   3 +
 .../flink/tiering/FlussLakeTieringEntrypoint.java  |   7 +
 .../testutils/FlinkIcebergTieringTestBase.java     |   1 +
 .../lake/iceberg/tiering/IcebergTieringTest.java   |  23 +-
 .../lance/testutils/FlinkLanceTieringTestBase.java |   1 +
 .../fluss/lake/lance/tiering/LanceTieringTest.java |  27 ++-
 .../lake/paimon/tiering/PaimonLakeCommitter.java   |  69 +++---
 .../paimon/tiering/PaimonLakeTieringFactory.java   |   2 +-
 .../testutils/FlinkPaimonTieringTestBase.java      |   1 +
 .../lake/paimon/tiering/PaimonTieringTest.java     | 240 ++++++++++++++++-----
 fluss-test-coverage/pom.xml                        |   3 +
 website/docs/engine-flink/options.md               |  51 ++---
 .../tiered-storage/lakehouse-storage.md            |  10 +-
 20 files changed, 399 insertions(+), 126 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 7eb203999..43c244554 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1405,6 +1405,13 @@ public class ConfigOptions {
                     .withDescription(
                             "If true, compaction will be triggered 
automatically when tiering service writes to the datalake. It is disabled by 
default.");
 
+    public static final ConfigOption<Boolean> 
TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
+            key("table.datalake.auto-expire-snapshot")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, snapshot expiration will be triggered 
automatically when tiering service commits to the datalake. It is disabled by 
default.");
+
     public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
             key("table.merge-engine")
                     .enumType(MergeEngineType.class)
@@ -1777,6 +1784,20 @@ public class ConfigOptions {
                             "The datalake format used by of Fluss to be as 
lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance. 
"
                                     + "In the future, more kinds of data lake 
format will be supported, such as DeltaLake or Hudi.");
 
+    // ------------------------------------------------------------------------
+    //  ConfigOptions for tiering service
+    // ------------------------------------------------------------------------
+
+    public static final ConfigOption<Boolean> 
LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT =
+            key("lake.tiering.auto-expire-snapshot")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, snapshot expiration will be triggered 
automatically when tiering service commits to the datalake, "
+                                    + "even if "
+                                    + 
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
+                                    + " is false.");
+
     // ------------------------------------------------------------------------
     //  ConfigOptions for fluss kafka
     // ------------------------------------------------------------------------
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java 
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index fc7966ab0..80d2ee8f7 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -101,6 +101,11 @@ public class TableConfig {
         return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
     }
 
+    /** Whether auto expire snapshot is enabled. */
+    public boolean isDataLakeAutoExpireSnapshot() {
+        return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
+    }
+
     /** Gets the optional merge engine type of the table. */
     public Optional<MergeEngineType> getMergeEngineType() {
         return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
index 263d3a2fd..dab6b1485 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
@@ -18,11 +18,13 @@
 package org.apache.fluss.lake.committer;
 
 import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
 /**
  * The CommitterInitContext interface provides the context needed to create a 
LakeCommitter. It
- * includes methods to obtain the table path.
+ * includes methods to obtain the table path, table info and lake tiering 
configs.
  *
  * @since 0.7
  */
@@ -35,4 +37,18 @@ public interface CommitterInitContext {
      * @return the table path
      */
     TablePath tablePath();
+
+    /**
+     * Returns the table info.
+     *
+     * @return the table info
+     */
+    TableInfo tableInfo();
+
+    /**
+     * Returns the lake tiering config.
+     *
+     * @return the lake tiering config
+     */
+    Configuration lakeTieringConfig();
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
index 85eed128c..74cf91e0e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
@@ -46,16 +46,19 @@ public class LakeTieringJobBuilder {
     private final StreamExecutionEnvironment env;
     private final Configuration flussConfig;
     private final Configuration dataLakeConfig;
+    private final Configuration lakeTieringConfig;
     private final String dataLakeFormat;
 
     private LakeTieringJobBuilder(
             StreamExecutionEnvironment env,
             Configuration flussConfig,
             Configuration dataLakeConfig,
+            Configuration lakeTieringConfig,
             String dataLakeFormat) {
         this.env = checkNotNull(env);
         this.flussConfig = checkNotNull(flussConfig);
         this.dataLakeConfig = checkNotNull(dataLakeConfig);
+        this.lakeTieringConfig = checkNotNull(lakeTieringConfig);
         this.dataLakeFormat = checkNotNull(dataLakeFormat);
     }
 
@@ -63,8 +66,10 @@ public class LakeTieringJobBuilder {
             StreamExecutionEnvironment env,
             Configuration flussConfig,
             Configuration dataLakeConfig,
+            Configuration lakeTieringConfig,
             String dataLakeFormat) {
-        return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig, 
dataLakeFormat);
+        return new LakeTieringJobBuilder(
+                env, flussConfig, dataLakeConfig, lakeTieringConfig, 
dataLakeFormat);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,7 +104,8 @@ public class LakeTieringJobBuilder {
                         "TieringCommitter",
                         CommittableMessageTypeInfo.of(
                                 () -> 
lakeTieringFactory.getCommittableSerializer()),
-                        new TieringCommitOperatorFactory(flussConfig, 
lakeTieringFactory))
+                        new TieringCommitOperatorFactory(
+                                flussConfig, lakeTieringConfig, 
lakeTieringFactory))
                 .setParallelism(1)
                 .setMaxParallelism(1)
                 .sinkTo(new DiscardingSink())
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index ebe4a1ae6..a678a4b23 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -89,6 +89,7 @@ public class TieringCommitOperator<WriteResult, Committable>
     private static final long serialVersionUID = 1L;
 
     private final Configuration flussConfig;
+    private final Configuration lakeTieringConfig;
     private final LakeTieringFactory<WriteResult, Committable> 
lakeTieringFactory;
     private final FlussTableLakeSnapshotCommitter 
flussTableLakeSnapshotCommitter;
     private Connection connection;
@@ -105,11 +106,13 @@ public class TieringCommitOperator<WriteResult, 
Committable>
     public TieringCommitOperator(
             StreamOperatorParameters<CommittableMessage<Committable>> 
parameters,
             Configuration flussConf,
+            Configuration lakeTieringConfig,
             LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
         this.lakeTieringFactory = lakeTieringFactory;
         this.flussTableLakeSnapshotCommitter = new 
FlussTableLakeSnapshotCommitter(flussConf);
         this.collectedTableBucketWriteResults = new HashMap<>();
         this.flussConfig = flussConf;
+        this.lakeTieringConfig = lakeTieringConfig;
         this.operatorEventGateway =
                 parameters
                         .getOperatorEventDispatcher()
@@ -204,7 +207,10 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         }
         try (LakeCommitter<WriteResult, Committable> lakeCommitter =
                 lakeTieringFactory.createLakeCommitter(
-                        new TieringCommitterInitContext(tablePath))) {
+                        new TieringCommitterInitContext(
+                                tablePath,
+                                admin.getTableInfo(tablePath).get(),
+                                lakeTieringConfig))) {
             List<WriteResult> writeResults =
                     committableWriteResults.stream()
                             .map(TableBucketWriteResult::writeResult)
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
index 65d6f377e..efced7aea 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
@@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory<WriteResult, 
Committable>
                 TableBucketWriteResult<WriteResult>, 
CommittableMessage<Committable>> {
 
     private final Configuration flussConfig;
+    private final Configuration lakeTieringConfig;
     private final LakeTieringFactory<WriteResult, Committable> 
lakeTieringFactory;
 
     public TieringCommitOperatorFactory(
             Configuration flussConfig,
+            Configuration lakeTieringConfig,
             LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
         this.flussConfig = flussConfig;
+        this.lakeTieringConfig = lakeTieringConfig;
         this.lakeTieringFactory = lakeTieringFactory;
     }
 
@@ -47,7 +50,8 @@ public class TieringCommitOperatorFactory<WriteResult, 
Committable>
             StreamOperatorParameters<CommittableMessage<Committable>> 
parameters) {
 
         TieringCommitOperator<WriteResult, Committable> commitOperator =
-                new TieringCommitOperator<>(parameters, flussConfig, 
lakeTieringFactory);
+                new TieringCommitOperator<>(
+                        parameters, flussConfig, lakeTieringConfig, 
lakeTieringFactory);
 
         @SuppressWarnings("unchecked")
         final T castedOperator = (T) commitOperator;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
index 69c5aff61..3cae145f0 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
@@ -17,21 +17,38 @@
 
 package org.apache.fluss.flink.tiering.committer;
 
+import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
 /** The {@link CommitterInitContext} implementation for {@link LakeCommitter}. 
*/
 public class TieringCommitterInitContext implements CommitterInitContext {
 
     private final TablePath tablePath;
+    private final TableInfo tableInfo;
+    private final Configuration lakeTieringConfig;
 
-    public TieringCommitterInitContext(TablePath tablePath) {
+    public TieringCommitterInitContext(
+            TablePath tablePath, TableInfo tableInfo, Configuration 
lakeTieringConfig) {
         this.tablePath = tablePath;
+        this.tableInfo = tableInfo;
+        this.lakeTieringConfig = lakeTieringConfig;
     }
 
     @Override
     public TablePath tablePath() {
         return tablePath;
     }
+
+    @Override
+    public TableInfo tableInfo() {
+        return tableInfo;
+    }
+
+    @Override
+    public Configuration lakeTieringConfig() {
+        return lakeTieringConfig;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 593dcadd1..230460395 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -85,6 +85,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 new TieringCommitOperator<>(
                         parameters,
                         FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+                        new org.apache.fluss.config.Configuration(),
                         new TestingLakeTieringFactory());
         committerOperator.open();
     }
@@ -261,6 +262,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 new TieringCommitOperator<>(
                         parameters,
                         FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+                        new org.apache.fluss.config.Configuration(),
                         new TestingLakeTieringFactory(testingLakeCommitter));
         committerOperator.open();
 
@@ -321,6 +323,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 new TieringCommitOperator<>(
                         parameters,
                         FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+                        new org.apache.fluss.config.Configuration(),
                         new TestingLakeTieringFactory(testingLakeCommitter));
         committerOperator.open();
 
diff --git 
a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
 
b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
index 74236c91a..a2ce6ce1a 100644
--- 
a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
+++ 
b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
@@ -30,11 +30,13 @@ import java.util.Map;
 import static 
org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
 import static 
org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
 import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
+import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
 
 /** The entrypoint for Flink to tier fluss data to lake format like paimon. */
 public class FlussLakeTieringEntrypoint {
 
     private static final String FLUSS_CONF_PREFIX = "fluss.";
+    private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
 
     public static void main(String[] args) throws Exception {
 
@@ -65,6 +67,10 @@ public class FlussLakeTieringEntrypoint {
                 extractAndRemovePrefix(
                         paramsMap, String.format("%s%s.", 
DATA_LAKE_CONFIG_PREFIX, dataLake));
 
+        // extract tiering service config
+        Map<String, String> lakeTieringConfigMap =
+                extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
+
         // now, we must use full restart strategy if any task is failed,
         // since committer is stateless, if tiering committer is failover, 
committer
         // will lost the collected committable, and will never collect all 
committable to do commit
@@ -83,6 +89,7 @@ public class FlussLakeTieringEntrypoint {
                                 execEnv,
                                 Configuration.fromMap(flussConfigMap),
                                 Configuration.fromMap(lakeConfigMap),
+                                Configuration.fromMap(lakeTieringConfigMap),
                                 dataLake)
                         .build();
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index a2997f98c..a7c922ba5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -161,6 +161,7 @@ public class FlinkIcebergTieringTestBase {
                         execEnv,
                         flussConfig,
                         Configuration.fromMap(getIcebergCatalogConf()),
+                        new Configuration(),
                         DataLakeFormat.ICEBERG.toString())
                 .build();
     }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index c81c331f1..ba59b1b74 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.tiering;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.writer.LakeWriter;
@@ -185,7 +186,7 @@ class IcebergTieringTest {
 
         // second, commit data
         try (LakeCommitter<IcebergWriteResult, IcebergCommittable> 
lakeCommitter =
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo)) {
             // serialize/deserialize committable
             IcebergCommittable icebergCommittable =
                     lakeCommitter.toCommittable(icebergWriteResults);
@@ -246,8 +247,24 @@ class IcebergTieringTest {
     }
 
     private LakeCommitter<IcebergWriteResult, IcebergCommittable> 
createLakeCommitter(
-            TablePath tablePath) throws IOException {
-        return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
+            TablePath tablePath, TableInfo tableInfo) throws IOException {
+        return icebergLakeTieringFactory.createLakeCommitter(
+                new CommitterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableInfo tableInfo() {
+                        return tableInfo;
+                    }
+
+                    @Override
+                    public Configuration lakeTieringConfig() {
+                        return new Configuration();
+                    }
+                });
     }
 
     private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index a548e3558..0b94cedb5 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -206,6 +206,7 @@ public class FlinkLanceTieringTestBase {
                         execEnv,
                         flussConfig,
                         Configuration.fromMap(getLanceCatalogConf()),
+                        new Configuration(),
                         DataLakeFormat.LANCE.toString())
                 .build();
     }
diff --git 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index 642bd7afd..1157daf76 100644
--- 
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++ 
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.lance.tiering;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.lance.LanceConfig;
 import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
@@ -114,7 +115,7 @@ class LanceTieringTest {
                 lanceLakeTieringFactory.getCommittableSerializer();
 
         try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo)) {
             // should no any missing snapshot
             assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull();
         }
@@ -159,7 +160,7 @@ class LanceTieringTest {
 
         // second, commit data
         try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo)) {
             // serialize/deserialize committable
             LanceCommittable lanceCommittable = 
lakeCommitter.toCommittable(lanceWriteResults);
             byte[] serialized = 
committableSerializer.serialize(lanceCommittable);
@@ -196,7 +197,7 @@ class LanceTieringTest {
 
         // then, let's verify getMissingLakeSnapshot works
         try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo)) {
             // use snapshot id 1 as the known snapshot id
             CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(1L);
             assertThat(committedLakeSnapshot).isNotNull();
@@ -242,8 +243,24 @@ class LanceTieringTest {
     }
 
     private LakeCommitter<LanceWriteResult, LanceCommittable> 
createLakeCommitter(
-            TablePath tablePath) throws IOException {
-        return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath);
+            TablePath tablePath, TableInfo tableInfo) throws IOException {
+        return lanceLakeTieringFactory.createLakeCommitter(
+                new CommitterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableInfo tableInfo() {
+                        return tableInfo;
+                    }
+
+                    @Override
+                    public Configuration lakeTieringConfig() {
+                        return new Configuration();
+                    }
+                });
     }
 
     private LakeWriter<LanceWriteResult> createLakeWriter(
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 8351a377f..ee0419312 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -17,8 +17,10 @@
 
 package org.apache.fluss.lake.paimon.tiering;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.lake.committer.BucketOffset;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.metadata.TablePath;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -32,15 +34,15 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -55,14 +57,25 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
 
     private final Catalog paimonCatalog;
     private final FileStoreTable fileStoreTable;
-    private FileStoreCommit fileStoreCommit;
+    private TableCommitImpl tableCommit;
+
     private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
-    public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, 
TablePath tablePath)
+    public PaimonLakeCommitter(
+            PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext 
committerInitContext)
             throws IOException {
         this.paimonCatalog = paimonCatalogProvider.get();
-        this.fileStoreTable = getTable(tablePath);
+        this.fileStoreTable =
+                getTable(
+                        committerInitContext.tablePath(),
+                        committerInitContext
+                                        .tableInfo()
+                                        .getTableConfig()
+                                        .isDataLakeAutoExpireSnapshot()
+                                || committerInitContext
+                                        .lakeTieringConfig()
+                                        
.get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT));
     }
 
     @Override
@@ -82,19 +95,17 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
         snapshotProperties.forEach(manifestCommittable::addProperty);
 
         try {
-            fileStoreCommit =
-                    fileStoreTable
-                            .store()
-                            .newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, 
fileStoreTable);
-            fileStoreCommit.commit(manifestCommittable, false);
+            tableCommit = 
fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+            tableCommit.commit(manifestCommittable);
+
             Long commitSnapshotId = currentCommitSnapshotId.get();
             currentCommitSnapshotId.remove();
 
             return checkNotNull(commitSnapshotId, "Paimon committed snapshot 
id must be non-null.");
         } catch (Throwable t) {
-            if (fileStoreCommit != null) {
+            if (tableCommit != null) {
                 // if any error happen while commit, abort the commit to clean 
committable
-                fileStoreCommit.abort(manifestCommittable.fileCommittables());
+                tableCommit.abort(manifestCommittable.fileCommittables());
             }
             throw new IOException(t);
         }
@@ -102,9 +113,8 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
 
     @Override
     public void abort(PaimonCommittable committable) throws IOException {
-        fileStoreCommit =
-                
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, 
fileStoreTable);
-        
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
+        tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+        
tableCommit.abort(committable.manifestCommittable().fileCommittables());
     }
 
     @Nullable
@@ -190,8 +200,8 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     @Override
     public void close() throws Exception {
         try {
-            if (fileStoreCommit != null) {
-                fileStoreCommit.close();
+            if (tableCommit != null) {
+                tableCommit.close();
             }
             if (paimonCatalog != null) {
                 paimonCatalog.close();
@@ -201,19 +211,20 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
         }
     }
 
-    private FileStoreTable getTable(TablePath tablePath) throws IOException {
+    private FileStoreTable getTable(TablePath tablePath, boolean 
isAutoSnapshotExpiration)
+            throws IOException {
         try {
-            FileStoreTable table =
-                    (FileStoreTable)
-                            paimonCatalog
-                                    .getTable(toPaimon(tablePath))
-                                    .copy(
-                                            Collections.singletonMap(
-                                                    
CoreOptions.COMMIT_CALLBACKS.key(),
-                                                    
PaimonLakeCommitter.PaimonCommitCallback.class
-                                                            .getName()));
-
-            return table;
+            FileStoreTable table = (FileStoreTable) 
paimonCatalog.getTable(toPaimon(tablePath));
+
+            Map<String, String> dynamicOptions = new HashMap<>();
+            dynamicOptions.put(
+                    CoreOptions.COMMIT_CALLBACKS.key(),
+                    PaimonLakeCommitter.PaimonCommitCallback.class.getName());
+            dynamicOptions.put(
+                    CoreOptions.WRITE_ONLY.key(),
+                    isAutoSnapshotExpiration ? Boolean.FALSE.toString() : 
Boolean.TRUE.toString());
+
+            return table.copy(dynamicOptions);
         } catch (Exception e) {
             throw new IOException("Failed to get table " + tablePath + " in 
Paimon.", e);
         }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
index c69a99790..432620efb 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
@@ -53,7 +53,7 @@ public class PaimonLakeTieringFactory
     @Override
     public LakeCommitter<PaimonWriteResult, PaimonCommittable> 
createLakeCommitter(
             CommitterInitContext committerInitContext) throws IOException {
-        return new PaimonLakeCommitter(paimonCatalogProvider, 
committerInitContext.tablePath());
+        return new PaimonLakeCommitter(paimonCatalogProvider, 
committerInitContext);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 5b86e6a8f..74cd75b7d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -128,6 +128,7 @@ public abstract class FlinkPaimonTieringTestBase {
                         execEnv,
                         flussConfig,
                         Configuration.fromMap(getPaimonCatalogConf()),
+                        new Configuration(),
                         DataLakeFormat.PAIMON.toString())
                 .build();
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index afc43b86c..49f80d999 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitter;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.lake.writer.LakeWriter;
@@ -49,6 +50,7 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.SnapshotManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -106,6 +108,18 @@ class PaimonTieringTest {
                 Arguments.of(false, false));
     }
 
+    private static Stream<Arguments> snapshotExpireArgs() {
+        return Stream.of(
+                Arguments.of(true, true, true),
+                Arguments.of(true, true, false),
+                Arguments.of(true, false, true),
+                Arguments.of(true, false, false),
+                Arguments.of(false, true, true),
+                Arguments.of(false, true, false),
+                Arguments.of(false, false, true),
+                Arguments.of(false, false, false));
+    }
+
     @ParameterizedTest
     @MethodSource("tieringWriteArgs")
     void testTieringWriteTable(boolean isPrimaryKeyTable, boolean 
isPartitioned) throws Exception {
@@ -118,7 +132,11 @@ class PaimonTieringTest {
                                 isPrimaryKeyTable ? "primary_key" : "log",
                                 isPartitioned ? "partitioned" : 
"non_partitioned"));
         createTable(
-                tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable 
? bucketNum : null);
+                tablePath,
+                isPrimaryKeyTable,
+                isPartitioned,
+                isPrimaryKeyTable ? bucketNum : null,
+                Collections.emptyMap());
         TableDescriptor descriptor =
                 TableDescriptor.builder()
                         .schema(
@@ -132,14 +150,8 @@ class PaimonTieringTest {
                         .build();
         TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
 
-        List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
-        SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
-                paimonLakeTieringFactory.getWriteResultSerializer();
-        SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
-                paimonLakeTieringFactory.getCommittableSerializer();
-
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo, new 
Configuration())) {
             // should no any missing snapshot
             assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull();
         }
@@ -155,49 +167,9 @@ class PaimonTieringTest {
                             }
                         }
                         : Collections.singletonMap(null, null);
-        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
-        // first, write data
-        for (int bucket = 0; bucket < bucketNum; bucket++) {
-            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
-                String partition = entry.getValue();
-                try (LakeWriter<PaimonWriteResult> lakeWriter =
-                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
-                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
-                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
-                            isPrimaryKeyTable
-                                    ? genPrimaryKeyTableRecords(partition, 
bucket)
-                                    : genLogTableRecords(partition, bucket, 
10);
-                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
-                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
-                    recordsByBucket.put(partitionBucket, expectRecords);
-                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
-                    for (LogRecord logRecord : writtenRecords) {
-                        lakeWriter.write(logRecord);
-                    }
-                    // serialize/deserialize writeResult
-                    PaimonWriteResult paimonWriteResult = 
lakeWriter.complete();
-                    byte[] serialized = 
writeResultSerializer.serialize(paimonWriteResult);
-                    paimonWriteResults.add(
-                            writeResultSerializer.deserialize(
-                                    writeResultSerializer.getVersion(), 
serialized));
-                }
-            }
-        }
 
-        // second, commit data
-        try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
-                createLakeCommitter(tablePath)) {
-            // serialize/deserialize committable
-            PaimonCommittable paimonCommittable = 
lakeCommitter.toCommittable(paimonWriteResults);
-            byte[] serialized = 
committableSerializer.serialize(paimonCommittable);
-            paimonCommittable =
-                    committableSerializer.deserialize(
-                            committableSerializer.getVersion(), serialized);
-            long snapshot =
-                    lakeCommitter.commit(
-                            paimonCommittable, 
toBucketOffsetsProperty(tableBucketOffsets));
-            assertThat(snapshot).isEqualTo(1);
-        }
+        // firstly, write some data
+        writeData(tableInfo, recordsByBucket, partitionIdAndName);
 
         // then, check data
         for (int bucket = 0; bucket < 3; bucket++) {
@@ -212,7 +184,7 @@ class PaimonTieringTest {
 
         // then, let's verify getMissingLakeSnapshot works
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo, new 
Configuration())) {
             // use snapshot id 0 as the known snapshot id
             CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(0L);
             assertThat(committedLakeSnapshot).isNotNull();
@@ -292,7 +264,7 @@ class PaimonTieringTest {
 
         // Commit all data
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo, new 
Configuration())) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
             long snapshot =
                     lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
@@ -364,7 +336,7 @@ class PaimonTieringTest {
         // Commit all data
         long snapshot;
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
-                createLakeCommitter(tablePath)) {
+                createLakeCommitter(tablePath, tableInfo, new 
Configuration())) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
             snapshot =
                     lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
@@ -387,6 +359,77 @@ class PaimonTieringTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("snapshotExpireArgs")
+    void testSnapshotExpiration(
+            boolean isPartitioned,
+            boolean isTableAutoExpireSnapshot,
+            boolean isLakeTieringExpireSnapshot)
+            throws Exception {
+        int bucketNum = 3;
+        TablePath tablePath =
+                TablePath.of(
+                        "paimon",
+                        String.format(
+                                "test_tiering_snapshot_expiration_table_%s",
+                                isPartitioned ? "partitioned" : 
"non_partitioned"));
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+        options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2");
+        createTable(tablePath, false, isPartitioned, null, options);
+        TableDescriptor descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                org.apache.fluss.metadata.Schema.newBuilder()
+                                        .column("c1", 
org.apache.fluss.types.DataTypes.INT())
+                                        .column("c2", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .column("c3", 
org.apache.fluss.types.DataTypes.STRING())
+                                        .build())
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+                        .property(
+                                
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT,
+                                isTableAutoExpireSnapshot)
+                        .build();
+        TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 
1L);
+        // Get the FileStoreTable to verify snapshots
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+
+        Configuration lakeTieringConfig = new Configuration();
+        lakeTieringConfig.set(
+                ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, 
isLakeTieringExpireSnapshot);
+
+        // Write some data to generate 2 snapshots
+        writeData(tableInfo, lakeTieringConfig, new HashMap<>(), 
partitionIdAndName);
+        writeData(tableInfo, lakeTieringConfig, new HashMap<>(), 
partitionIdAndName);
+        assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
+
+        // write more data
+        for (int i = 0; i < 5; i++) {
+            writeData(tableInfo, lakeTieringConfig, new HashMap<>(), 
partitionIdAndName);
+            if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) {
+                // if auto snapshot expiration is enabled, snapshot should be 
expired
+                assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
+            } else {
+                // if auto snapshot expiration is disabled, snapshot should 
never be expired
+                assertThat(snapshotManager.snapshotCount()).isGreaterThan(2);
+            }
+        }
+    }
+
     private void verifyLogTableRecordsMultiPartition(
             CloseableIterator<InternalRow> actualRecords,
             List<LogRecord> expectRecords,
@@ -704,15 +747,33 @@ class PaimonTieringTest {
     }
 
     private LakeCommitter<PaimonWriteResult, PaimonCommittable> 
createLakeCommitter(
-            TablePath tablePath) throws IOException {
-        return paimonLakeTieringFactory.createLakeCommitter(() -> tablePath);
+            TablePath tablePath, TableInfo tableInfo, Configuration 
lakeTieringConfig)
+            throws IOException {
+        return paimonLakeTieringFactory.createLakeCommitter(
+                new CommitterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableInfo tableInfo() {
+                        return tableInfo;
+                    }
+
+                    @Override
+                    public Configuration lakeTieringConfig() {
+                        return lakeTieringConfig;
+                    }
+                });
     }
 
     private void createTable(
             TablePath tablePath,
             boolean isPrimaryTable,
             boolean isPartitioned,
-            @Nullable Integer numBuckets)
+            @Nullable Integer numBuckets,
+            Map<String, String> options)
             throws Exception {
         Schema.Builder builder =
                 Schema.newBuilder()
@@ -735,6 +796,7 @@ class PaimonTieringTest {
         if (numBuckets != null) {
             builder.option(CoreOptions.BUCKET.key(), 
String.valueOf(numBuckets));
         }
+        builder.options(options);
         doCreatePaimonTable(tablePath, builder);
     }
 
@@ -784,4 +846,70 @@ class PaimonTieringTest {
                 .properties()
                 .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
     }
+
+    private void writeData(
+            TableInfo tableInfo,
+            Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket,
+            Map<Long, String> partitionIdAndName)
+            throws Exception {
+        writeData(tableInfo, new Configuration(), recordsByBucket, 
partitionIdAndName);
+    }
+
+    private void writeData(
+            TableInfo tableInfo,
+            Configuration lakeTieringConfig,
+            Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket,
+            Map<Long, String> partitionIdAndName)
+            throws Exception {
+        TablePath tablePath = tableInfo.getTablePath();
+        int bucketNum = tableInfo.getNumBuckets();
+        boolean isPrimaryKeyTable = tableInfo.hasPrimaryKey();
+
+        List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
+                paimonLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
+                paimonLakeTieringFactory.getCommittableSerializer();
+
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        // first, write data
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
+                try (LakeWriter<PaimonWriteResult> lakeWriter =
+                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey(), tableInfo)) {
+                    Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
+                    Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
+                            isPrimaryKeyTable
+                                    ? genPrimaryKeyTableRecords(partition, 
bucket)
+                                    : genLogTableRecords(partition, bucket, 
10);
+                    List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                    List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                    recordsByBucket.put(partitionBucket, expectRecords);
+                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
+                    for (LogRecord logRecord : writtenRecords) {
+                        lakeWriter.write(logRecord);
+                    }
+                    // serialize/deserialize writeResult
+                    PaimonWriteResult paimonWriteResult = 
lakeWriter.complete();
+                    byte[] serialized = 
writeResultSerializer.serialize(paimonWriteResult);
+                    paimonWriteResults.add(
+                            writeResultSerializer.deserialize(
+                                    writeResultSerializer.getVersion(), 
serialized));
+                }
+            }
+        }
+
+        // second, commit data
+        try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
+                createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) {
+            // serialize/deserialize committable
+            PaimonCommittable paimonCommittable = 
lakeCommitter.toCommittable(paimonWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(paimonCommittable);
+            paimonCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            lakeCommitter.commit(paimonCommittable, 
toBucketOffsetsProperty(tableBucketOffsets));
+        }
+    }
 }
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 0c6f54cd5..46c34da57 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -414,6 +414,9 @@
                                         </exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
                                         </exclude>
+                                        <exclude>
+                                            
org.apache.fluss.flink.tiering.committer.TieringCommitterInitContext
+                                        </exclude>
                                         <exclude>
                                             
org.apache.fluss.flink.tiering.LakeTieringJobBuilder
                                         </exclude>
diff --git a/website/docs/engine-flink/options.md 
b/website/docs/engine-flink/options.md
index 6c637240e..d6462fbfa 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -61,31 +61,32 @@ See more details about [ALTER TABLE ... 
SET](engine-flink/ddl.md#set-properties)
 
 ## Storage Options
 
-| Option                                  | Type     | Default                 
            | Description                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| bucket.num                              | int      | The bucket number of 
Fluss cluster. | The number of buckets of a Fluss table.                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
-| bucket.key                              | String   | (None)                  
            | Specific the distribution policy of the Fluss table. Data will be 
distributed to each bucket according to the hash value of bucket-key (It must 
be a subset of the primary keys excluding partition keys of the primary key 
table). If you specify multiple fields, delimiter is `,`. If the table has a 
primary key and a bucket key is not specified, the bucket key will be used as 
primary key(excluding th [...]
-| table.log.ttl                           | Duration | 7 days                  
            | The time to live for log segments. The configuration controls the 
maximum time we will retain a log before we will delete old segments to free up 
space. If set to -1, the log will not be deleted.                               
                                                                                
                                                                                
              [...]
-| table.auto-partition.enabled            | Boolean  | false                   
            | Whether enable auto partition for the table. Disable by default. 
When auto partition is enabled, the partitions of the table will be created 
automatically.                                                                  
                                                                                
                                                                                
                   [...]
-| table.auto-partition.key                | String   | (None)                  
            | This configuration defines the time-based partition key to be 
used for auto-partitioning when a table is partitioned with multiple keys. 
Auto-partitioning utilizes a time-based partition key to handle partitions 
automatically, including creating new ones and removing outdated ones, by 
comparing the time value of the partition with the current system time. In the 
case of a table using multiple par [...]
-| table.auto-partition.time-unit          | ENUM     | DAY                     
            | The time granularity for auto created partitions. The default 
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If 
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If 
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the 
value is `MONTH`, the partition format for auto created is yyyyMM. If the value 
is `QUARTER`, the parti [...]
-| table.auto-partition.num-precreate      | Integer  | 2                       
            | The number of partitions to pre-create for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip 
creating the partition. The default value is 2, which means 2 partitions will 
be pre-created. If the `tab [...]
-| table.auto-partition.num-retention      | Integer  | 7                       
            | The number of history partitions to retain for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then 
the history partitions 20241108, 20241109, 20241110 will be retained. The 
partitions earlier than 20241108 will be deleted. The default value is 7, which 
means that 7 partitions will  [...]
-| table.auto-partition.time-zone          | String   | the system time zone    
            | The time zone for auto partitions, which is by default the same 
as the system time zone.                                                        
                                                                                
                                                                                
                                                                                
                [...]
-| table.replication.factor                | Integer  | (None)                  
            | The replication factor for the log of the new table. When it's 
not set, Fluss will use the cluster's default replication factor configured by 
default.replication.factor. It should be a positive number and not larger than 
the number of tablet servers in the Fluss cluster. A value larger than the 
number of tablet servers in Fluss cluster will result in an error when the new 
table is created.        [...]
-| table.log.format                        | Enum     | ARROW                   
            | The format of the log records in log store. The default value is 
`ARROW`. The supported formats are `ARROW` and `INDEXED`.                       
                                                                                
                                                                                
                                                                                
               [...]
-| table.log.arrow.compression.type        | Enum     | ZSTD                    
            | The compression type of the log records if the log format is set 
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The 
default value is `ZSTD`.                                                        
                                                                                
                                                                                
                [...]
-| table.log.arrow.compression.zstd.level  | Integer  | 3                       
            | The compression level of the log records if the log format is set 
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 
22. The default value is 3.                                                     
                                                                                
                                                                                
                [...]
-| table.kv.format                         | Enum     | COMPACTED               
            | The format of the kv records in kv store. The default value is 
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.               
                                                                                
                                                                                
                                                                                
                 [...]
-| table.log.tiered.local-segments         | Integer  | 2                       
            | The number of log segments to retain in local for each table when 
log tiered storage is enabled. It must be greater that 0. The default is 2.     
                                                                                
                                                                                
                                                                                
              [...]
-| table.datalake.enabled                  | Boolean  | false                   
            | Whether enable lakehouse storage for the table. Disabled by 
default. When this option is set to ture and the datalake tiering service is 
up, the table will be tiered and compacted into datalake format stored on 
lakehouse storage.                                                              
                                                                                
                             [...]
-| table.datalake.format                   | Enum     | (None)                  
            | The data lake format of the table specifies the tiered Lakehouse 
storage format. Currently, supported formats are `paimon`, `iceberg`, and 
`lance`. In the future, more kinds of data lake format will be supported, such 
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, 
Fluss adopts the key encoding and bucketing strategy used by the corresponding 
data lake format. This  [...]
-| table.datalake.freshness                | Duration | 3min                    
            | It defines the maximum amount of time that the datalake table's 
content should lag behind updates to the Fluss table. Based on this target 
freshness, the Fluss service automatically moves data from the Fluss table and 
updates to the datalake table, so that the data in the datalake table is kept 
up to date within this target. If the data does not need to be as fresh, you 
can specify a longer targe [...]
-| table.datalake.auto-compaction          | Boolean | false                    
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
-| table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). 
It also supports two merge engines are `first_row` and `versioned`. The 
[first_row merge 
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep 
the first row of the same primary key. The [v [...]
-| table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
-| table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default) - allows normal delete operations; 
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects 
delete requests and throws explicit errors. This configuration provides 
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) 
that must not receive  [...]
-| table.changelog.image                   | Enum     | FULL                    
            | Defines the changelog image mode for primary key tables. This 
configuration is inspired by similar settings in database systems like MySQL's 
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are 
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER 
records for update operations, capturing complete information about updates and 
allowing tracking of previous val [...]
+| Option                                  | Type     | Default                 
            | Description                                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| bucket.num                              | int      | The bucket number of 
Fluss cluster. | The number of buckets of a Fluss table.                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| bucket.key                              | String   | (None)                  
            | Specific the distribution policy of the Fluss table. Data will be 
distributed to each bucket according to the hash value of bucket-key (It must 
be a subset of the primary keys excluding partition keys of the primary key 
table). If you specify multiple fields, delimiter is `,`. If the table has a 
primary key and a bucket key is not specified, the bucket key will be used as 
primary key(excluding th [...]
+| table.log.ttl                           | Duration | 7 days                  
            | The time to live for log segments. The configuration controls the 
maximum time we will retain a log before we will delete old segments to free up 
space. If set to -1, the log will not be deleted.                               
                                                                                
                                                                                
              [...]
+| table.auto-partition.enabled            | Boolean  | false                   
            | Whether enable auto partition for the table. Disable by default. 
When auto partition is enabled, the partitions of the table will be created 
automatically.                                                                  
                                                                                
                                                                                
                   [...]
+| table.auto-partition.key                | String   | (None)                  
            | This configuration defines the time-based partition key to be 
used for auto-partitioning when a table is partitioned with multiple keys. 
Auto-partitioning utilizes a time-based partition key to handle partitions 
automatically, including creating new ones and removing outdated ones, by 
comparing the time value of the partition with the current system time. In the 
case of a table using multiple par [...]
+| table.auto-partition.time-unit          | ENUM     | DAY                     
            | The time granularity for auto created partitions. The default 
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If 
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If 
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the 
value is `MONTH`, the partition format for auto created is yyyyMM. If the value 
is `QUARTER`, the parti [...]
+| table.auto-partition.num-precreate      | Integer  | 2                       
            | The number of partitions to pre-create for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip 
creating the partition. The default value is 2, which means 2 partitions will 
be pre-created. If the `tab [...]
+| table.auto-partition.num-retention      | Integer  | 7                       
            | The number of history partitions to retain for auto created 
partitions in each check for auto partition. For example, if the current check 
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then 
the history partitions 20241108, 20241109, 20241110 will be retained. The 
partitions earlier than 20241108 will be deleted. The default value is 7, which 
means that 7 partitions will  [...]
+| table.auto-partition.time-zone          | String   | the system time zone    
            | The time zone for auto partitions, which is by default the same 
as the system time zone.                                                        
                                                                                
                                                                                
                                                                                
                [...]
+| table.replication.factor                | Integer  | (None)                  
            | The replication factor for the log of the new table. When it's 
not set, Fluss will use the cluster's default replication factor configured by 
default.replication.factor. It should be a positive number and not larger than 
the number of tablet servers in the Fluss cluster. A value larger than the 
number of tablet servers in Fluss cluster will result in an error when the new 
table is created.        [...]
+| table.log.format                        | Enum     | ARROW                   
            | The format of the log records in log store. The default value is 
`ARROW`. The supported formats are `ARROW` and `INDEXED`.                       
                                                                                
                                                                                
                                                                                
               [...]
+| table.log.arrow.compression.type        | Enum     | ZSTD                    
            | The compression type of the log records if the log format is set 
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The 
default value is `ZSTD`.                                                        
                                                                                
                                                                                
                [...]
+| table.log.arrow.compression.zstd.level  | Integer  | 3                       
            | The compression level of the log records if the log format is set 
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to 
22. The default value is 3.                                                     
                                                                                
                                                                                
                [...]
+| table.kv.format                         | Enum     | COMPACTED               
            | The format of the kv records in kv store. The default value is 
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.               
                                                                                
                                                                                
                                                                                
                 [...]
+| table.log.tiered.local-segments         | Integer  | 2                       
            | The number of log segments to retain in local for each table when 
log tiered storage is enabled. It must be greater that 0. The default is 2.     
                                                                                
                                                                                
                                                                                
              [...]
+| table.datalake.enabled                  | Boolean  | false                   
            | Whether enable lakehouse storage for the table. Disabled by 
default. When this option is set to ture and the datalake tiering service is 
up, the table will be tiered and compacted into datalake format stored on 
lakehouse storage.                                                              
                                                                                
                             [...]
+| table.datalake.format                   | Enum     | (None)                  
            | The data lake format of the table specifies the tiered Lakehouse 
storage format. Currently, supported formats are `paimon`, `iceberg`, and 
`lance`. In the future, more kinds of data lake format will be supported, such 
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured, 
Fluss adopts the key encoding and bucketing strategy used by the corresponding 
data lake format. This  [...]
+| table.datalake.freshness                | Duration | 3min                    
            | It defines the maximum amount of time that the datalake table's 
content should lag behind updates to the Fluss table. Based on this target 
freshness, the Fluss service automatically moves data from the Fluss table and 
updates to the datalake table, so that the data in the datalake table is kept 
up to date within this target. If the data does not need to be as fresh, you 
can specify a longer targe [...]
+| table.datalake.auto-compaction          | Boolean  | false                   
            | If true, compaction will be triggered automatically when tiering 
service writes to the datalake. It is disabled by default.                      
                                                                                
                                                                                
                                                                                
               [...]
+| table.datalake.auto-expire-snapshot     | Boolean  | false                   
            | If true, snapshot expiration will be triggered automatically when 
tiering service commits to the datalake. It is disabled by default.             
                                                                                
                                                                                
                                                                                
              [...]
+| table.merge-engine                      | Enum     | (None)                  
            | Defines the merge engine for the primary key table. By default, 
primary key table uses the [default merge 
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). 
It also supports two merge engines are `first_row` and `versioned`. The 
[first_row merge 
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep 
the first row of the same primary key. The [v [...]
+| table.merge-engine.versioned.ver-column | String   | (None)                  
            | The column name of the version column for the `versioned` merge 
engine. If the merge engine is set to `versioned`, the version column must be 
set.                                                                            
                                                                                
                                                                                
                  [...]
+| table.delete.behavior                   | Enum     | ALLOW                   
            | Controls the behavior of delete operations on primary key tables. 
Three modes are supported: `ALLOW` (default) - allows normal delete operations; 
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects 
delete requests and throws explicit errors. This configuration provides 
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join) 
that must not receive  [...]
+| table.changelog.image                   | Enum     | FULL                    
            | Defines the changelog image mode for primary key tables. This 
configuration is inspired by similar settings in database systems like MySQL's 
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are 
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER 
records for update operations, capturing complete information about updates and 
allowing tracking of previous val [...]
 
 
 ## Read Options
diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md 
b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
index bd7b646d0..35b2394d7 100644
--- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md
+++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
@@ -102,4 +102,12 @@ To enable lakehouse storage for a table, the table must be 
created with the opti
 Another option `table.datalake.freshness`, allows per-table configuration of 
data freshness in the datalake.
 It defines the maximum amount of time that the datalake table's content should 
lag behind updates to the Fluss table. 
 Based on this target freshness, the Fluss tiering service automatically moves 
data from the Fluss table and updates to the datalake table, so that the data 
in the datalake table is kept up to date within this target.
-The default is `3min`, if the data does not need to be as fresh, you can 
specify a longer target freshness time to reduce costs.
\ No newline at end of file
+The default is `3min`, if the data does not need to be as fresh, you can 
specify a longer target freshness time to reduce costs.
+
+# Datalake Tiering Service Options
+
+The following table lists the options that can be used to configure the 
datalake tiering service.
+
+| Option                                  | Type     | Default | Description   
                                                                                
                                                                     |
+|-----------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lake.tiering.auto-expire-snapshot       | Boolean  | false   | If true, 
snapshot expiration will be triggered automatically when tiering service 
commits to the datalake, even if `table.datalake.auto-expire-snapshot` is 
false. |
\ No newline at end of file

Reply via email to