This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 58957fdec [lake] Support auto snapshot expiration for paimon lake
table (#2184)
58957fdec is described below
commit 58957fdec456f9f4433046b17f29eb3f4e156bd8
Author: Liebing <[email protected]>
AuthorDate: Mon Dec 22 09:47:47 2025 +0800
[lake] Support auto snapshot expiration for paimon lake table (#2184)
---
.../org/apache/fluss/config/ConfigOptions.java | 21 ++
.../java/org/apache/fluss/config/TableConfig.java | 5 +
.../fluss/lake/committer/CommitterInitContext.java | 18 +-
.../fluss/flink/tiering/LakeTieringJobBuilder.java | 10 +-
.../tiering/committer/TieringCommitOperator.java | 8 +-
.../committer/TieringCommitOperatorFactory.java | 6 +-
.../committer/TieringCommitterInitContext.java | 19 +-
.../committer/TieringCommitOperatorTest.java | 3 +
.../flink/tiering/FlussLakeTieringEntrypoint.java | 7 +
.../testutils/FlinkIcebergTieringTestBase.java | 1 +
.../lake/iceberg/tiering/IcebergTieringTest.java | 23 +-
.../lance/testutils/FlinkLanceTieringTestBase.java | 1 +
.../fluss/lake/lance/tiering/LanceTieringTest.java | 27 ++-
.../lake/paimon/tiering/PaimonLakeCommitter.java | 69 +++---
.../paimon/tiering/PaimonLakeTieringFactory.java | 2 +-
.../testutils/FlinkPaimonTieringTestBase.java | 1 +
.../lake/paimon/tiering/PaimonTieringTest.java | 240 ++++++++++++++++-----
fluss-test-coverage/pom.xml | 3 +
website/docs/engine-flink/options.md | 51 ++---
.../tiered-storage/lakehouse-storage.md | 10 +-
20 files changed, 399 insertions(+), 126 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 7eb203999..43c244554 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -1405,6 +1405,13 @@ public class ConfigOptions {
.withDescription(
"If true, compaction will be triggered
automatically when tiering service writes to the datalake. It is disabled by
default.");
+ public static final ConfigOption<Boolean>
TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT =
+ key("table.datalake.auto-expire-snapshot")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, snapshot expiration will be triggered
automatically when tiering service commits to the datalake. It is disabled by
default.");
+
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
key("table.merge-engine")
.enumType(MergeEngineType.class)
@@ -1777,6 +1784,20 @@ public class ConfigOptions {
"The datalake format used by of Fluss to be as
lakehouse storage. Currently, supported formats are Paimon, Iceberg, and Lance.
"
+ "In the future, more kinds of data lake
format will be supported, such as DeltaLake or Hudi.");
+ // ------------------------------------------------------------------------
+ // ConfigOptions for tiering service
+ // ------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean>
LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT =
+ key("lake.tiering.auto-expire-snapshot")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, snapshot expiration will be triggered
automatically when tiering service commits to the datalake, "
+ + "even if "
+ +
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
+ + " is false.");
+
// ------------------------------------------------------------------------
// ConfigOptions for fluss kafka
// ------------------------------------------------------------------------
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
index fc7966ab0..80d2ee8f7 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java
@@ -101,6 +101,11 @@ public class TableConfig {
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
}
+ /** Whether auto expire snapshot is enabled. */
+ public boolean isDataLakeAutoExpireSnapshot() {
+ return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
+ }
+
/** Gets the optional merge engine type of the table. */
public Optional<MergeEngineType> getMergeEngineType() {
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
index 263d3a2fd..dab6b1485 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/CommitterInitContext.java
@@ -18,11 +18,13 @@
package org.apache.fluss.lake.committer;
import org.apache.fluss.annotation.PublicEvolving;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
/**
* The CommitterInitContext interface provides the context needed to create a
LakeCommitter. It
- * includes methods to obtain the table path.
+ * includes methods to obtain the table path, table info and lake tiering
configs.
*
* @since 0.7
*/
@@ -35,4 +37,18 @@ public interface CommitterInitContext {
* @return the table path
*/
TablePath tablePath();
+
+ /**
+ * Returns the table info.
+ *
+ * @return the table info
+ */
+ TableInfo tableInfo();
+
+ /**
+ * Returns the lake tiering config.
+ *
+ * @return the lake tiering config
+ */
+ Configuration lakeTieringConfig();
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
index 85eed128c..74cf91e0e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
@@ -46,16 +46,19 @@ public class LakeTieringJobBuilder {
private final StreamExecutionEnvironment env;
private final Configuration flussConfig;
private final Configuration dataLakeConfig;
+ private final Configuration lakeTieringConfig;
private final String dataLakeFormat;
private LakeTieringJobBuilder(
StreamExecutionEnvironment env,
Configuration flussConfig,
Configuration dataLakeConfig,
+ Configuration lakeTieringConfig,
String dataLakeFormat) {
this.env = checkNotNull(env);
this.flussConfig = checkNotNull(flussConfig);
this.dataLakeConfig = checkNotNull(dataLakeConfig);
+ this.lakeTieringConfig = checkNotNull(lakeTieringConfig);
this.dataLakeFormat = checkNotNull(dataLakeFormat);
}
@@ -63,8 +66,10 @@ public class LakeTieringJobBuilder {
StreamExecutionEnvironment env,
Configuration flussConfig,
Configuration dataLakeConfig,
+ Configuration lakeTieringConfig,
String dataLakeFormat) {
- return new LakeTieringJobBuilder(env, flussConfig, dataLakeConfig,
dataLakeFormat);
+ return new LakeTieringJobBuilder(
+ env, flussConfig, dataLakeConfig, lakeTieringConfig,
dataLakeFormat);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -99,7 +104,8 @@ public class LakeTieringJobBuilder {
"TieringCommitter",
CommittableMessageTypeInfo.of(
() ->
lakeTieringFactory.getCommittableSerializer()),
- new TieringCommitOperatorFactory(flussConfig,
lakeTieringFactory))
+ new TieringCommitOperatorFactory(
+ flussConfig, lakeTieringConfig,
lakeTieringFactory))
.setParallelism(1)
.setMaxParallelism(1)
.sinkTo(new DiscardingSink())
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index ebe4a1ae6..a678a4b23 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -89,6 +89,7 @@ public class TieringCommitOperator<WriteResult, Committable>
private static final long serialVersionUID = 1L;
private final Configuration flussConfig;
+ private final Configuration lakeTieringConfig;
private final LakeTieringFactory<WriteResult, Committable>
lakeTieringFactory;
private final FlussTableLakeSnapshotCommitter
flussTableLakeSnapshotCommitter;
private Connection connection;
@@ -105,11 +106,13 @@ public class TieringCommitOperator<WriteResult,
Committable>
public TieringCommitOperator(
StreamOperatorParameters<CommittableMessage<Committable>>
parameters,
Configuration flussConf,
+ Configuration lakeTieringConfig,
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
this.lakeTieringFactory = lakeTieringFactory;
this.flussTableLakeSnapshotCommitter = new
FlussTableLakeSnapshotCommitter(flussConf);
this.collectedTableBucketWriteResults = new HashMap<>();
this.flussConfig = flussConf;
+ this.lakeTieringConfig = lakeTieringConfig;
this.operatorEventGateway =
parameters
.getOperatorEventDispatcher()
@@ -204,7 +207,10 @@ public class TieringCommitOperator<WriteResult,
Committable>
}
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
lakeTieringFactory.createLakeCommitter(
- new TieringCommitterInitContext(tablePath))) {
+ new TieringCommitterInitContext(
+ tablePath,
+ admin.getTableInfo(tablePath).get(),
+ lakeTieringConfig))) {
List<WriteResult> writeResults =
committableWriteResults.stream()
.map(TableBucketWriteResult::writeResult)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
index 65d6f377e..efced7aea 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java
@@ -33,12 +33,15 @@ public class TieringCommitOperatorFactory<WriteResult,
Committable>
TableBucketWriteResult<WriteResult>,
CommittableMessage<Committable>> {
private final Configuration flussConfig;
+ private final Configuration lakeTieringConfig;
private final LakeTieringFactory<WriteResult, Committable>
lakeTieringFactory;
public TieringCommitOperatorFactory(
Configuration flussConfig,
+ Configuration lakeTieringConfig,
LakeTieringFactory<WriteResult, Committable> lakeTieringFactory) {
this.flussConfig = flussConfig;
+ this.lakeTieringConfig = lakeTieringConfig;
this.lakeTieringFactory = lakeTieringFactory;
}
@@ -47,7 +50,8 @@ public class TieringCommitOperatorFactory<WriteResult,
Committable>
StreamOperatorParameters<CommittableMessage<Committable>>
parameters) {
TieringCommitOperator<WriteResult, Committable> commitOperator =
- new TieringCommitOperator<>(parameters, flussConfig,
lakeTieringFactory);
+ new TieringCommitOperator<>(
+ parameters, flussConfig, lakeTieringConfig,
lakeTieringFactory);
@SuppressWarnings("unchecked")
final T castedOperator = (T) commitOperator;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
index 69c5aff61..3cae145f0 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitterInitContext.java
@@ -17,21 +17,38 @@
package org.apache.fluss.flink.tiering.committer;
+import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
/** The {@link CommitterInitContext} implementation for {@link LakeCommitter}.
*/
public class TieringCommitterInitContext implements CommitterInitContext {
private final TablePath tablePath;
+ private final TableInfo tableInfo;
+ private final Configuration lakeTieringConfig;
- public TieringCommitterInitContext(TablePath tablePath) {
+ public TieringCommitterInitContext(
+ TablePath tablePath, TableInfo tableInfo, Configuration
lakeTieringConfig) {
this.tablePath = tablePath;
+ this.tableInfo = tableInfo;
+ this.lakeTieringConfig = lakeTieringConfig;
}
@Override
public TablePath tablePath() {
return tablePath;
}
+
+ @Override
+ public TableInfo tableInfo() {
+ return tableInfo;
+ }
+
+ @Override
+ public Configuration lakeTieringConfig() {
+ return lakeTieringConfig;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 593dcadd1..230460395 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -85,6 +85,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+ new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory());
committerOperator.open();
}
@@ -261,6 +262,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+ new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory(testingLakeCommitter));
committerOperator.open();
@@ -321,6 +323,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
new TieringCommitOperator<>(
parameters,
FLUSS_CLUSTER_EXTENSION.getClientConfig(),
+ new org.apache.fluss.config.Configuration(),
new TestingLakeTieringFactory(testingLakeCommitter));
committerOperator.open();
diff --git
a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
index 74236c91a..a2ce6ce1a 100644
---
a/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
+++
b/fluss-flink/fluss-flink-tiering/src/main/java/org/apache/fluss/flink/tiering/FlussLakeTieringEntrypoint.java
@@ -30,11 +30,13 @@ import java.util.Map;
import static
org.apache.flink.runtime.executiongraph.failover.FailoverStrategyFactoryLoader.FULL_RESTART_STRATEGY_NAME;
import static
org.apache.fluss.flink.tiering.source.TieringSourceOptions.DATA_LAKE_CONFIG_PREFIX;
import static org.apache.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
+import static org.apache.fluss.utils.PropertiesUtils.extractPrefix;
/** The entrypoint for Flink to tier fluss data to lake format like paimon. */
public class FlussLakeTieringEntrypoint {
private static final String FLUSS_CONF_PREFIX = "fluss.";
+ private static final String LAKE_TIERING_CONFIG_PREFIX = "lake.tiering.";
public static void main(String[] args) throws Exception {
@@ -65,6 +67,10 @@ public class FlussLakeTieringEntrypoint {
extractAndRemovePrefix(
paramsMap, String.format("%s%s.",
DATA_LAKE_CONFIG_PREFIX, dataLake));
+ // extract tiering service config
+ Map<String, String> lakeTieringConfigMap =
+ extractPrefix(paramsMap, LAKE_TIERING_CONFIG_PREFIX);
+
// now, we must use full restart strategy if any task is failed,
// since committer is stateless, if tiering committer is failover,
committer
// will lost the collected committable, and will never collect all
committable to do commit
@@ -83,6 +89,7 @@ public class FlussLakeTieringEntrypoint {
execEnv,
Configuration.fromMap(flussConfigMap),
Configuration.fromMap(lakeConfigMap),
+ Configuration.fromMap(lakeTieringConfigMap),
dataLake)
.build();
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index a2997f98c..a7c922ba5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -161,6 +161,7 @@ public class FlinkIcebergTieringTestBase {
execEnv,
flussConfig,
Configuration.fromMap(getIcebergCatalogConf()),
+ new Configuration(),
DataLakeFormat.ICEBERG.toString())
.build();
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index c81c331f1..ba59b1b74 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg.tiering;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.writer.LakeWriter;
@@ -185,7 +186,7 @@ class IcebergTieringTest {
// second, commit data
try (LakeCommitter<IcebergWriteResult, IcebergCommittable>
lakeCommitter =
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo)) {
// serialize/deserialize committable
IcebergCommittable icebergCommittable =
lakeCommitter.toCommittable(icebergWriteResults);
@@ -246,8 +247,24 @@ class IcebergTieringTest {
}
private LakeCommitter<IcebergWriteResult, IcebergCommittable>
createLakeCommitter(
- TablePath tablePath) throws IOException {
- return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
+ TablePath tablePath, TableInfo tableInfo) throws IOException {
+ return icebergLakeTieringFactory.createLakeCommitter(
+ new CommitterInitContext() {
+ @Override
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public TableInfo tableInfo() {
+ return tableInfo;
+ }
+
+ @Override
+ public Configuration lakeTieringConfig() {
+ return new Configuration();
+ }
+ });
}
private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
index a548e3558..0b94cedb5 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java
@@ -206,6 +206,7 @@ public class FlinkLanceTieringTestBase {
execEnv,
flussConfig,
Configuration.fromMap(getLanceCatalogConf()),
+ new Configuration(),
DataLakeFormat.LANCE.toString())
.build();
}
diff --git
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
index 642bd7afd..1157daf76 100644
---
a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
+++
b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.lance.tiering;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.lance.LanceConfig;
import org.apache.fluss.lake.lance.utils.LanceArrowUtils;
@@ -114,7 +115,7 @@ class LanceTieringTest {
lanceLakeTieringFactory.getCommittableSerializer();
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo)) {
// should no any missing snapshot
assertThat(lakeCommitter.getMissingLakeSnapshot(2L)).isNull();
}
@@ -159,7 +160,7 @@ class LanceTieringTest {
// second, commit data
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo)) {
// serialize/deserialize committable
LanceCommittable lanceCommittable =
lakeCommitter.toCommittable(lanceWriteResults);
byte[] serialized =
committableSerializer.serialize(lanceCommittable);
@@ -196,7 +197,7 @@ class LanceTieringTest {
// then, let's verify getMissingLakeSnapshot works
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo)) {
// use snapshot id 1 as the known snapshot id
CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(1L);
assertThat(committedLakeSnapshot).isNotNull();
@@ -242,8 +243,24 @@ class LanceTieringTest {
}
private LakeCommitter<LanceWriteResult, LanceCommittable>
createLakeCommitter(
- TablePath tablePath) throws IOException {
- return lanceLakeTieringFactory.createLakeCommitter(() -> tablePath);
+ TablePath tablePath, TableInfo tableInfo) throws IOException {
+ return lanceLakeTieringFactory.createLakeCommitter(
+ new CommitterInitContext() {
+ @Override
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public TableInfo tableInfo() {
+ return tableInfo;
+ }
+
+ @Override
+ public Configuration lakeTieringConfig() {
+ return new Configuration();
+ }
+ });
}
private LakeWriter<LanceWriteResult> createLakeWriter(
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 8351a377f..ee0419312 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -17,8 +17,10 @@
package org.apache.fluss.lake.paimon.tiering;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.committer.BucketOffset;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.metadata.TablePath;
import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -32,15 +34,15 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -55,14 +57,25 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private final Catalog paimonCatalog;
private final FileStoreTable fileStoreTable;
- private FileStoreCommit fileStoreCommit;
+ private TableCommitImpl tableCommit;
+
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider,
TablePath tablePath)
+ public PaimonLakeCommitter(
+ PaimonCatalogProvider paimonCatalogProvider, CommitterInitContext
committerInitContext)
throws IOException {
this.paimonCatalog = paimonCatalogProvider.get();
- this.fileStoreTable = getTable(tablePath);
+ this.fileStoreTable =
+ getTable(
+ committerInitContext.tablePath(),
+ committerInitContext
+ .tableInfo()
+ .getTableConfig()
+ .isDataLakeAutoExpireSnapshot()
+ || committerInitContext
+ .lakeTieringConfig()
+
.get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT));
}
@Override
@@ -82,19 +95,17 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
snapshotProperties.forEach(manifestCommittable::addProperty);
try {
- fileStoreCommit =
- fileStoreTable
- .store()
- .newCommit(FLUSS_LAKE_TIERING_COMMIT_USER,
fileStoreTable);
- fileStoreCommit.commit(manifestCommittable, false);
+ tableCommit =
fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+ tableCommit.commit(manifestCommittable);
+
Long commitSnapshotId = currentCommitSnapshotId.get();
currentCommitSnapshotId.remove();
return checkNotNull(commitSnapshotId, "Paimon committed snapshot
id must be non-null.");
} catch (Throwable t) {
- if (fileStoreCommit != null) {
+ if (tableCommit != null) {
// if any error happen while commit, abort the commit to clean
committable
- fileStoreCommit.abort(manifestCommittable.fileCommittables());
+ tableCommit.abort(manifestCommittable.fileCommittables());
}
throw new IOException(t);
}
@@ -102,9 +113,8 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
@Override
public void abort(PaimonCommittable committable) throws IOException {
- fileStoreCommit =
-
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER,
fileStoreTable);
-
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
+ tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
+
tableCommit.abort(committable.manifestCommittable().fileCommittables());
}
@Nullable
@@ -190,8 +200,8 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
@Override
public void close() throws Exception {
try {
- if (fileStoreCommit != null) {
- fileStoreCommit.close();
+ if (tableCommit != null) {
+ tableCommit.close();
}
if (paimonCatalog != null) {
paimonCatalog.close();
@@ -201,19 +211,20 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
}
}
- private FileStoreTable getTable(TablePath tablePath) throws IOException {
+ private FileStoreTable getTable(TablePath tablePath, boolean
isAutoSnapshotExpiration)
+ throws IOException {
try {
- FileStoreTable table =
- (FileStoreTable)
- paimonCatalog
- .getTable(toPaimon(tablePath))
- .copy(
- Collections.singletonMap(
-
CoreOptions.COMMIT_CALLBACKS.key(),
-
PaimonLakeCommitter.PaimonCommitCallback.class
- .getName()));
-
- return table;
+ FileStoreTable table = (FileStoreTable)
paimonCatalog.getTable(toPaimon(tablePath));
+
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(
+ CoreOptions.COMMIT_CALLBACKS.key(),
+ PaimonLakeCommitter.PaimonCommitCallback.class.getName());
+ dynamicOptions.put(
+ CoreOptions.WRITE_ONLY.key(),
+ isAutoSnapshotExpiration ? Boolean.FALSE.toString() :
Boolean.TRUE.toString());
+
+ return table.copy(dynamicOptions);
} catch (Exception e) {
throw new IOException("Failed to get table " + tablePath + " in
Paimon.", e);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
index c69a99790..432620efb 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
@@ -53,7 +53,7 @@ public class PaimonLakeTieringFactory
@Override
public LakeCommitter<PaimonWriteResult, PaimonCommittable>
createLakeCommitter(
CommitterInitContext committerInitContext) throws IOException {
- return new PaimonLakeCommitter(paimonCatalogProvider,
committerInitContext.tablePath());
+ return new PaimonLakeCommitter(paimonCatalogProvider,
committerInitContext);
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 5b86e6a8f..74cd75b7d 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -128,6 +128,7 @@ public abstract class FlinkPaimonTieringTestBase {
execEnv,
flussConfig,
Configuration.fromMap(getPaimonCatalogConf()),
+ new Configuration(),
DataLakeFormat.PAIMON.toString())
.build();
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
index afc43b86c..49f80d999 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.paimon.tiering;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitter;
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
import org.apache.fluss.lake.writer.LakeWriter;
@@ -49,6 +50,7 @@ import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -106,6 +108,18 @@ class PaimonTieringTest {
Arguments.of(false, false));
}
+ private static Stream<Arguments> snapshotExpireArgs() {
+ return Stream.of(
+ Arguments.of(true, true, true),
+ Arguments.of(true, true, false),
+ Arguments.of(true, false, true),
+ Arguments.of(true, false, false),
+ Arguments.of(false, true, true),
+ Arguments.of(false, true, false),
+ Arguments.of(false, false, true),
+ Arguments.of(false, false, false));
+ }
+
@ParameterizedTest
@MethodSource("tieringWriteArgs")
void testTieringWriteTable(boolean isPrimaryKeyTable, boolean
isPartitioned) throws Exception {
@@ -118,7 +132,11 @@ class PaimonTieringTest {
isPrimaryKeyTable ? "primary_key" : "log",
isPartitioned ? "partitioned" :
"non_partitioned"));
createTable(
- tablePath, isPrimaryKeyTable, isPartitioned, isPrimaryKeyTable
? bucketNum : null);
+ tablePath,
+ isPrimaryKeyTable,
+ isPartitioned,
+ isPrimaryKeyTable ? bucketNum : null,
+ Collections.emptyMap());
TableDescriptor descriptor =
TableDescriptor.builder()
.schema(
@@ -132,14 +150,8 @@ class PaimonTieringTest {
.build();
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L,
1L);
- List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
- SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
- paimonLakeTieringFactory.getWriteResultSerializer();
- SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
- paimonLakeTieringFactory.getCommittableSerializer();
-
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo, new
Configuration())) {
// should no any missing snapshot
assertThat(lakeCommitter.getMissingLakeSnapshot(1L)).isNull();
}
@@ -155,49 +167,9 @@ class PaimonTieringTest {
}
}
: Collections.singletonMap(null, null);
- Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
- // first, write data
- for (int bucket = 0; bucket < bucketNum; bucket++) {
- for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
- String partition = entry.getValue();
- try (LakeWriter<PaimonWriteResult> lakeWriter =
- createLakeWriter(tablePath, bucket, partition,
entry.getKey(), tableInfo)) {
- Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
- Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
- isPrimaryKeyTable
- ? genPrimaryKeyTableRecords(partition,
bucket)
- : genLogTableRecords(partition, bucket,
10);
- List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
- List<LogRecord> expectRecords = writeAndExpectRecords.f1;
- recordsByBucket.put(partitionBucket, expectRecords);
- tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 10L);
- for (LogRecord logRecord : writtenRecords) {
- lakeWriter.write(logRecord);
- }
- // serialize/deserialize writeResult
- PaimonWriteResult paimonWriteResult =
lakeWriter.complete();
- byte[] serialized =
writeResultSerializer.serialize(paimonWriteResult);
- paimonWriteResults.add(
- writeResultSerializer.deserialize(
- writeResultSerializer.getVersion(),
serialized));
- }
- }
- }
- // second, commit data
- try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
- createLakeCommitter(tablePath)) {
- // serialize/deserialize committable
- PaimonCommittable paimonCommittable =
lakeCommitter.toCommittable(paimonWriteResults);
- byte[] serialized =
committableSerializer.serialize(paimonCommittable);
- paimonCommittable =
- committableSerializer.deserialize(
- committableSerializer.getVersion(), serialized);
- long snapshot =
- lakeCommitter.commit(
- paimonCommittable,
toBucketOffsetsProperty(tableBucketOffsets));
- assertThat(snapshot).isEqualTo(1);
- }
+ // firstly, write some data
+ writeData(tableInfo, recordsByBucket, partitionIdAndName);
// then, check data
for (int bucket = 0; bucket < 3; bucket++) {
@@ -212,7 +184,7 @@ class PaimonTieringTest {
// then, let's verify getMissingLakeSnapshot works
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo, new
Configuration())) {
// use snapshot id 0 as the known snapshot id
CommittedLakeSnapshot committedLakeSnapshot =
lakeCommitter.getMissingLakeSnapshot(0L);
assertThat(committedLakeSnapshot).isNotNull();
@@ -292,7 +264,7 @@ class PaimonTieringTest {
// Commit all data
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo, new
Configuration())) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
long snapshot =
lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
@@ -364,7 +336,7 @@ class PaimonTieringTest {
// Commit all data
long snapshot;
try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
- createLakeCommitter(tablePath)) {
+ createLakeCommitter(tablePath, tableInfo, new
Configuration())) {
PaimonCommittable committable =
lakeCommitter.toCommittable(paimonWriteResults);
snapshot =
lakeCommitter.commit(committable,
toBucketOffsetsProperty(tableBucketOffsets));
@@ -387,6 +359,77 @@ class PaimonTieringTest {
}
}
+ @ParameterizedTest
+ @MethodSource("snapshotExpireArgs")
+ void testSnapshotExpiration(
+ boolean isPartitioned,
+ boolean isTableAutoExpireSnapshot,
+ boolean isLakeTieringExpireSnapshot)
+ throws Exception {
+ int bucketNum = 3;
+ TablePath tablePath =
+ TablePath.of(
+ "paimon",
+ String.format(
+ "test_tiering_snapshot_expiration_table_%s",
+ isPartitioned ? "partitioned" :
"non_partitioned"));
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "2");
+ createTable(tablePath, false, isPartitioned, null, options);
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(
+ org.apache.fluss.metadata.Schema.newBuilder()
+ .column("c1",
org.apache.fluss.types.DataTypes.INT())
+ .column("c2",
org.apache.fluss.types.DataTypes.STRING())
+ .column("c3",
org.apache.fluss.types.DataTypes.STRING())
+ .build())
+ .distributedBy(bucketNum)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(
+
ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT,
+ isTableAutoExpireSnapshot)
+ .build();
+ TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L,
1L);
+ // Get the FileStoreTable to verify snapshots
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
+ SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+
+ Map<Long, String> partitionIdAndName =
+ isPartitioned
+ ? new HashMap<Long, String>() {
+ {
+ put(1L, "p1");
+ put(2L, "p2");
+ put(3L, "p3");
+ }
+ }
+ : Collections.singletonMap(null, null);
+
+ Configuration lakeTieringConfig = new Configuration();
+ lakeTieringConfig.set(
+ ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT,
isLakeTieringExpireSnapshot);
+
+ // Write some data to generate 2 snapshots
+ writeData(tableInfo, lakeTieringConfig, new HashMap<>(),
partitionIdAndName);
+ writeData(tableInfo, lakeTieringConfig, new HashMap<>(),
partitionIdAndName);
+ assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
+
+ // write more data
+ for (int i = 0; i < 5; i++) {
+ writeData(tableInfo, lakeTieringConfig, new HashMap<>(),
partitionIdAndName);
+ if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) {
+ // if auto snapshot expiration is enabled, snapshot should be
expired
+ assertThat(snapshotManager.snapshotCount()).isEqualTo(2);
+ } else {
+ // if auto snapshot expiration is disabled, snapshot should
never be expired
+ assertThat(snapshotManager.snapshotCount()).isGreaterThan(2);
+ }
+ }
+ }
+
private void verifyLogTableRecordsMultiPartition(
CloseableIterator<InternalRow> actualRecords,
List<LogRecord> expectRecords,
@@ -704,15 +747,33 @@ class PaimonTieringTest {
}
private LakeCommitter<PaimonWriteResult, PaimonCommittable>
createLakeCommitter(
- TablePath tablePath) throws IOException {
- return paimonLakeTieringFactory.createLakeCommitter(() -> tablePath);
+ TablePath tablePath, TableInfo tableInfo, Configuration
lakeTieringConfig)
+ throws IOException {
+ return paimonLakeTieringFactory.createLakeCommitter(
+ new CommitterInitContext() {
+ @Override
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public TableInfo tableInfo() {
+ return tableInfo;
+ }
+
+ @Override
+ public Configuration lakeTieringConfig() {
+ return lakeTieringConfig;
+ }
+ });
}
private void createTable(
TablePath tablePath,
boolean isPrimaryTable,
boolean isPartitioned,
- @Nullable Integer numBuckets)
+ @Nullable Integer numBuckets,
+ Map<String, String> options)
throws Exception {
Schema.Builder builder =
Schema.newBuilder()
@@ -735,6 +796,7 @@ class PaimonTieringTest {
if (numBuckets != null) {
builder.option(CoreOptions.BUCKET.key(),
String.valueOf(numBuckets));
}
+ builder.options(options);
doCreatePaimonTable(tablePath, builder);
}
@@ -784,4 +846,70 @@ class PaimonTieringTest {
.properties()
.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
}
+
+ private void writeData(
+ TableInfo tableInfo,
+ Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket,
+ Map<Long, String> partitionIdAndName)
+ throws Exception {
+ writeData(tableInfo, new Configuration(), recordsByBucket,
partitionIdAndName);
+ }
+
+ private void writeData(
+ TableInfo tableInfo,
+ Configuration lakeTieringConfig,
+ Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket,
+ Map<Long, String> partitionIdAndName)
+ throws Exception {
+ TablePath tablePath = tableInfo.getTablePath();
+ int bucketNum = tableInfo.getNumBuckets();
+ boolean isPrimaryKeyTable = tableInfo.hasPrimaryKey();
+
+ List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+ SimpleVersionedSerializer<PaimonWriteResult> writeResultSerializer =
+ paimonLakeTieringFactory.getWriteResultSerializer();
+ SimpleVersionedSerializer<PaimonCommittable> committableSerializer =
+ paimonLakeTieringFactory.getCommittableSerializer();
+
+ Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+ // first, write data
+ for (int bucket = 0; bucket < bucketNum; bucket++) {
+ for (Map.Entry<Long, String> entry :
partitionIdAndName.entrySet()) {
+ String partition = entry.getValue();
+ try (LakeWriter<PaimonWriteResult> lakeWriter =
+ createLakeWriter(tablePath, bucket, partition,
entry.getKey(), tableInfo)) {
+ Tuple2<String, Integer> partitionBucket =
Tuple2.of(partition, bucket);
+ Tuple2<List<LogRecord>, List<LogRecord>>
writeAndExpectRecords =
+ isPrimaryKeyTable
+ ? genPrimaryKeyTableRecords(partition,
bucket)
+ : genLogTableRecords(partition, bucket,
10);
+ List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+ List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+ recordsByBucket.put(partitionBucket, expectRecords);
+ tableBucketOffsets.put(new TableBucket(0, entry.getKey(),
bucket), 10L);
+ for (LogRecord logRecord : writtenRecords) {
+ lakeWriter.write(logRecord);
+ }
+ // serialize/deserialize writeResult
+ PaimonWriteResult paimonWriteResult =
lakeWriter.complete();
+ byte[] serialized =
writeResultSerializer.serialize(paimonWriteResult);
+ paimonWriteResults.add(
+ writeResultSerializer.deserialize(
+ writeResultSerializer.getVersion(),
serialized));
+ }
+ }
+ }
+
+ // second, commit data
+ try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter
=
+ createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) {
+ // serialize/deserialize committable
+ PaimonCommittable paimonCommittable =
lakeCommitter.toCommittable(paimonWriteResults);
+ byte[] serialized =
committableSerializer.serialize(paimonCommittable);
+ paimonCommittable =
+ committableSerializer.deserialize(
+ committableSerializer.getVersion(), serialized);
+ lakeCommitter.commit(paimonCommittable,
toBucketOffsetsProperty(tableBucketOffsets));
+ }
+ }
}
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index 0c6f54cd5..46c34da57 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -414,6 +414,9 @@
</exclude>
<exclude>org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo*
</exclude>
+ <exclude>
+
org.apache.fluss.flink.tiering.committer.TieringCommitterInitContext
+ </exclude>
<exclude>
org.apache.fluss.flink.tiering.LakeTieringJobBuilder
</exclude>
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index 6c637240e..d6462fbfa 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -61,31 +61,32 @@ See more details about [ALTER TABLE ...
SET](engine-flink/ddl.md#set-properties)
## Storage Options
-| Option | Type | Default
| Description
[...]
-|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| bucket.num | int | The bucket number of
Fluss cluster. | The number of buckets of a Fluss table.
[...]
-| bucket.key | String | (None)
| Specific the distribution policy of the Fluss table. Data will be
distributed to each bucket according to the hash value of bucket-key (It must
be a subset of the primary keys excluding partition keys of the primary key
table). If you specify multiple fields, delimiter is `,`. If the table has a
primary key and a bucket key is not specified, the bucket key will be used as
primary key(excluding th [...]
-| table.log.ttl | Duration | 7 days
| The time to live for log segments. The configuration controls the
maximum time we will retain a log before we will delete old segments to free up
space. If set to -1, the log will not be deleted.
[...]
-| table.auto-partition.enabled | Boolean | false
| Whether enable auto partition for the table. Disable by default.
When auto partition is enabled, the partitions of the table will be created
automatically.
[...]
-| table.auto-partition.key | String | (None)
| This configuration defines the time-based partition key to be
used for auto-partitioning when a table is partitioned with multiple keys.
Auto-partitioning utilizes a time-based partition key to handle partitions
automatically, including creating new ones and removing outdated ones, by
comparing the time value of the partition with the current system time. In the
case of a table using multiple par [...]
-| table.auto-partition.time-unit | ENUM | DAY
| The time granularity for auto created partitions. The default
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the
value is `MONTH`, the partition format for auto created is yyyyMM. If the value
is `QUARTER`, the parti [...]
-| table.auto-partition.num-precreate | Integer | 2
| The number of partitions to pre-create for auto created
partitions in each check for auto partition. For example, if the current check
time is 2024-11-11 and the value is configured as 3, then partitions 20241111,
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip
creating the partition. The default value is 2, which means 2 partitions will
be pre-created. If the `tab [...]
-| table.auto-partition.num-retention | Integer | 7
| The number of history partitions to retain for auto created
partitions in each check for auto partition. For example, if the current check
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then
the history partitions 20241108, 20241109, 20241110 will be retained. The
partitions earlier than 20241108 will be deleted. The default value is 7, which
means that 7 partitions will [...]
-| table.auto-partition.time-zone | String | the system time zone
| The time zone for auto partitions, which is by default the same
as the system time zone.
[...]
-| table.replication.factor | Integer | (None)
| The replication factor for the log of the new table. When it's
not set, Fluss will use the cluster's default replication factor configured by
default.replication.factor. It should be a positive number and not larger than
the number of tablet servers in the Fluss cluster. A value larger than the
number of tablet servers in Fluss cluster will result in an error when the new
table is created. [...]
-| table.log.format | Enum | ARROW
| The format of the log records in log store. The default value is
`ARROW`. The supported formats are `ARROW` and `INDEXED`.
[...]
-| table.log.arrow.compression.type | Enum | ZSTD
| The compression type of the log records if the log format is set
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The
default value is `ZSTD`.
[...]
-| table.log.arrow.compression.zstd.level | Integer | 3
| The compression level of the log records if the log format is set
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to
22. The default value is 3.
[...]
-| table.kv.format | Enum | COMPACTED
| The format of the kv records in kv store. The default value is
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.
[...]
-| table.log.tiered.local-segments | Integer | 2
| The number of log segments to retain in local for each table when
log tiered storage is enabled. It must be greater that 0. The default is 2.
[...]
-| table.datalake.enabled | Boolean | false
| Whether enable lakehouse storage for the table. Disabled by
default. When this option is set to ture and the datalake tiering service is
up, the table will be tiered and compacted into datalake format stored on
lakehouse storage.
[...]
-| table.datalake.format | Enum | (None)
| The data lake format of the table specifies the tiered Lakehouse
storage format. Currently, supported formats are `paimon`, `iceberg`, and
`lance`. In the future, more kinds of data lake format will be supported, such
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured,
Fluss adopts the key encoding and bucketing strategy used by the corresponding
data lake format. This [...]
-| table.datalake.freshness | Duration | 3min
| It defines the maximum amount of time that the datalake table's
content should lag behind updates to the Fluss table. Based on this target
freshness, the Fluss service automatically moves data from the Fluss table and
updates to the datalake table, so that the data in the datalake table is kept
up to date within this target. If the data does not need to be as fresh, you
can specify a longer targe [...]
-| table.datalake.auto-compaction | Boolean | false
| If true, compaction will be triggered automatically when tiering
service writes to the datalake. It is disabled by default.
[...]
-| table.merge-engine | Enum | (None)
| Defines the merge engine for the primary key table. By default,
primary key table uses the [default merge
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md).
It also supports two merge engines are `first_row` and `versioned`. The
[first_row merge
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep
the first row of the same primary key. The [v [...]
-| table.merge-engine.versioned.ver-column | String | (None)
| The column name of the version column for the `versioned` merge
engine. If the merge engine is set to `versioned`, the version column must be
set.
[...]
-| table.delete.behavior | Enum | ALLOW
| Controls the behavior of delete operations on primary key tables.
Three modes are supported: `ALLOW` (default) - allows normal delete operations;
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects
delete requests and throws explicit errors. This configuration provides
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join)
that must not receive [...]
-| table.changelog.image | Enum | FULL
| Defines the changelog image mode for primary key tables. This
configuration is inspired by similar settings in database systems like MySQL's
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER
records for update operations, capturing complete information about updates and
allowing tracking of previous val [...]
+| Option | Type | Default
| Description
[...]
+|-----------------------------------------|----------|-------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| bucket.num | int | The bucket number of
Fluss cluster. | The number of buckets of a Fluss table.
[...]
+| bucket.key | String | (None)
| Specific the distribution policy of the Fluss table. Data will be
distributed to each bucket according to the hash value of bucket-key (It must
be a subset of the primary keys excluding partition keys of the primary key
table). If you specify multiple fields, delimiter is `,`. If the table has a
primary key and a bucket key is not specified, the bucket key will be used as
primary key(excluding th [...]
+| table.log.ttl | Duration | 7 days
| The time to live for log segments. The configuration controls the
maximum time we will retain a log before we will delete old segments to free up
space. If set to -1, the log will not be deleted.
[...]
+| table.auto-partition.enabled | Boolean | false
| Whether enable auto partition for the table. Disable by default.
When auto partition is enabled, the partitions of the table will be created
automatically.
[...]
+| table.auto-partition.key | String | (None)
| This configuration defines the time-based partition key to be
used for auto-partitioning when a table is partitioned with multiple keys.
Auto-partitioning utilizes a time-based partition key to handle partitions
automatically, including creating new ones and removing outdated ones, by
comparing the time value of the partition with the current system time. In the
case of a table using multiple par [...]
+| table.auto-partition.time-unit | ENUM | DAY
| The time granularity for auto created partitions. The default
value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If
the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If
the value is `DAY`, the partition format for auto created is yyyyMMdd. If the
value is `MONTH`, the partition format for auto created is yyyyMM. If the value
is `QUARTER`, the parti [...]
+| table.auto-partition.num-precreate | Integer | 2
| The number of partitions to pre-create for auto created
partitions in each check for auto partition. For example, if the current check
time is 2024-11-11 and the value is configured as 3, then partitions 20241111,
20241112, 20241113 will be pre-created. If any one partition exists, it'll skip
creating the partition. The default value is 2, which means 2 partitions will
be pre-created. If the `tab [...]
+| table.auto-partition.num-retention | Integer | 7
| The number of history partitions to retain for auto created
partitions in each check for auto partition. For example, if the current check
time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then
the history partitions 20241108, 20241109, 20241110 will be retained. The
partitions earlier than 20241108 will be deleted. The default value is 7, which
means that 7 partitions will [...]
+| table.auto-partition.time-zone | String | the system time zone
| The time zone for auto partitions, which is by default the same
as the system time zone.
[...]
+| table.replication.factor | Integer | (None)
| The replication factor for the log of the new table. When it's
not set, Fluss will use the cluster's default replication factor configured by
default.replication.factor. It should be a positive number and not larger than
the number of tablet servers in the Fluss cluster. A value larger than the
number of tablet servers in Fluss cluster will result in an error when the new
table is created. [...]
+| table.log.format | Enum | ARROW
| The format of the log records in log store. The default value is
`ARROW`. The supported formats are `ARROW` and `INDEXED`.
[...]
+| table.log.arrow.compression.type | Enum | ZSTD
| The compression type of the log records if the log format is set
to `ARROW`. The candidate compression type is `NONE`, `LZ4_FRAME`, `ZSTD`. The
default value is `ZSTD`.
[...]
+| table.log.arrow.compression.zstd.level | Integer | 3
| The compression level of the log records if the log format is set
to `ARROW` and the compression type is set to `ZSTD`. The valid range is 1 to
22. The default value is 3.
[...]
+| table.kv.format | Enum | COMPACTED
| The format of the kv records in kv store. The default value is
`COMPACTED`. The supported formats are `COMPACTED` and `INDEXED`.
[...]
+| table.log.tiered.local-segments | Integer | 2
| The number of log segments to retain in local for each table when
log tiered storage is enabled. It must be greater that 0. The default is 2.
[...]
+| table.datalake.enabled | Boolean | false
| Whether enable lakehouse storage for the table. Disabled by
default. When this option is set to ture and the datalake tiering service is
up, the table will be tiered and compacted into datalake format stored on
lakehouse storage.
[...]
+| table.datalake.format | Enum | (None)
| The data lake format of the table specifies the tiered Lakehouse
storage format. Currently, supported formats are `paimon`, `iceberg`, and
`lance`. In the future, more kinds of data lake format will be supported, such
as DeltaLake or Hudi. Once the `table.datalake.format` property is configured,
Fluss adopts the key encoding and bucketing strategy used by the corresponding
data lake format. This [...]
+| table.datalake.freshness | Duration | 3min
| It defines the maximum amount of time that the datalake table's
content should lag behind updates to the Fluss table. Based on this target
freshness, the Fluss service automatically moves data from the Fluss table and
updates to the datalake table, so that the data in the datalake table is kept
up to date within this target. If the data does not need to be as fresh, you
can specify a longer targe [...]
+| table.datalake.auto-compaction | Boolean | false
| If true, compaction will be triggered automatically when tiering
service writes to the datalake. It is disabled by default.
[...]
+| table.datalake.auto-expire-snapshot | Boolean | false
| If true, snapshot expiration will be triggered automatically when
tiering service commits to the datalake. It is disabled by default.
[...]
+| table.merge-engine | Enum | (None)
| Defines the merge engine for the primary key table. By default,
primary key table uses the [default merge
engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md).
It also supports two merge engines are `first_row` and `versioned`. The
[first_row merge
engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep
the first row of the same primary key. The [v [...]
+| table.merge-engine.versioned.ver-column | String | (None)
| The column name of the version column for the `versioned` merge
engine. If the merge engine is set to `versioned`, the version column must be
set.
[...]
+| table.delete.behavior | Enum | ALLOW
| Controls the behavior of delete operations on primary key tables.
Three modes are supported: `ALLOW` (default) - allows normal delete operations;
`IGNORE` - silently ignores delete requests without errors; `DISABLE` - rejects
delete requests and throws explicit errors. This configuration provides
system-level guarantees for some downstream pipelines (e.g., Flink Delta Join)
that must not receive [...]
+| table.changelog.image | Enum | FULL
| Defines the changelog image mode for primary key tables. This
configuration is inspired by similar settings in database systems like MySQL's
`binlog_row_image` and PostgreSQL's `replica identity`. Two modes are
supported: `FULL` (default) - produces both UPDATE_BEFORE and UPDATE_AFTER
records for update operations, capturing complete information about updates and
allowing tracking of previous val [...]
## Read Options
diff --git a/website/docs/maintenance/tiered-storage/lakehouse-storage.md
b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
index bd7b646d0..35b2394d7 100644
--- a/website/docs/maintenance/tiered-storage/lakehouse-storage.md
+++ b/website/docs/maintenance/tiered-storage/lakehouse-storage.md
@@ -102,4 +102,12 @@ To enable lakehouse storage for a table, the table must be
created with the opti
Another option `table.datalake.freshness`, allows per-table configuration of
data freshness in the datalake.
It defines the maximum amount of time that the datalake table's content should
lag behind updates to the Fluss table.
Based on this target freshness, the Fluss tiering service automatically moves
data from the Fluss table and updates to the datalake table, so that the data
in the datalake table is kept up to date within this target.
-The default is `3min`, if the data does not need to be as fresh, you can
specify a longer target freshness time to reduce costs.
\ No newline at end of file
+The default is `3min`, if the data does not need to be as fresh, you can
specify a longer target freshness time to reduce costs.
+
+# Datalake Tiering Service Options
+
+The following table lists the options that can be used to configure the
datalake tiering service.
+
+| Option | Type | Default | Description
|
+|-----------------------------------------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| lake.tiering.auto-expire-snapshot | Boolean | false | If true,
snapshot expiration will be triggered automatically when tiering service
commits to the datalake, even if `table.datalake.auto-expire-snapshot` is
false. |
\ No newline at end of file