This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8dfd6956b [server] Support alter "table.datalake.enabled" option
(#1753)
8dfd6956b is described below
commit 8dfd6956b9ca53f2a0ffc070c5a830c1ed6c3909
Author: Yang Guo <[email protected]>
AuthorDate: Sun Sep 28 22:06:37 2025 +0800
[server] Support alter "table.datalake.enabled" option (#1753)
]
---
.../org/apache/fluss/config/FlussConfigUtils.java | 9 +-
.../lake/paimon/LakeEnabledTableCreateITCase.java | 67 +++++++++
.../testutils/FlinkPaimonTieringTestBase.java | 12 +-
.../lake/paimon/tiering/PaimonTieringITCase.java | 161 ++++++++++++++++++++-
.../server/coordinator/CoordinatorService.java | 11 +-
.../fluss/server/coordinator/MetadataManager.java | 96 +++++++++++-
.../server/utils/TableDescriptorValidation.java | 3 +-
7 files changed, 346 insertions(+), 13 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 74bb3012f..fa9c4274c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -36,18 +36,23 @@ public class FlussConfigUtils {
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
- public static final List<String> ALTERABLE_TABLE_CONFIG;
+ public static final List<String> ALTERABLE_TABLE_OPTIONS;
static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
- ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+ ALTERABLE_TABLE_OPTIONS =
+
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
}
public static boolean isTableStorageConfig(String key) {
return key.startsWith(TABLE_PREFIX);
}
+ public static boolean isAlterableTableOption(String key) {
+ return ALTERABLE_TABLE_OPTIONS.contains(key);
+ }
+
@VisibleForTesting
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
Map<String, ConfigOption<?>> options = new HashMap<>();
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 1be24772d..5e97a635a 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -48,7 +49,9 @@ import javax.annotation.Nullable;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -364,6 +367,70 @@ class LakeEnabledTableCreateITCase {
BUCKET_NUM);
}
+ @Test
+ void testAlterLakeEnabledLogTable() throws Exception {
+ Map<String, String> customProperties = new HashMap<>();
+ customProperties.put("k1", "v1");
+ customProperties.put("paimon.file.format", "parquet");
+
+ // log table with lake disabled
+ TableDescriptor logTable =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("log_c1", DataTypes.INT())
+ .column("log_c2", DataTypes.STRING())
+ .build())
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
+ .customProperties(customProperties)
+ .distributedBy(BUCKET_NUM, "log_c1", "log_c2")
+ .build();
+ TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
+ admin.createTable(logTablePath, logTable, false).get();
+
+ assertThatThrownBy(
+ () ->
+ paimonCatalog.getTable(
+ Identifier.create(DATABASE,
logTablePath.getTableName())))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+
+ // enable lake
+ TableChange.SetOption enableLake =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ List<TableChange> changes = Collections.singletonList(enableLake);
+
+ admin.alterTable(logTablePath, changes, false).get();
+
+ Table enabledPaimonLogTable =
+ paimonCatalog.getTable(Identifier.create(DATABASE,
logTablePath.getTableName()));
+
+ Map<String, String> updatedProperties = new HashMap<>();
+ updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ TableDescriptor updatedLogTable =
logTable.withProperties(updatedProperties);
+ // check the gotten log table
+ verifyPaimonTable(
+ enabledPaimonLogTable,
+ updatedLogTable,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
+ },
+ new String[] {
+ "log_c1",
+ "log_c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "log_c1,log_c2",
+ BUCKET_NUM);
+ }
+
@Test
void testThrowExceptionWhenConflictWithSystemColumn() {
for (String systemColumn :
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 995b2db46..2580370d8 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -256,10 +256,16 @@ public abstract class FlinkPaimonTieringTestBase {
}
protected long createLogTable(TablePath tablePath, int bucketNum) throws
Exception {
- return createLogTable(tablePath, bucketNum, false);
+ return createLogTable(
+ tablePath, bucketNum, false, Collections.emptyMap(),
Collections.emptyMap());
}
- protected long createLogTable(TablePath tablePath, int bucketNum, boolean
isPartitioned)
+ protected long createLogTable(
+ TablePath tablePath,
+ int bucketNum,
+ boolean isPartitioned,
+ Map<String, String> properties,
+ Map<String, String> customProperties)
throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder().column("a", DataTypes.INT()).column("b",
DataTypes.STRING());
@@ -277,6 +283,8 @@ public abstract class FlinkPaimonTieringTestBase {
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
}
+ tableBuilder.properties(properties);
+ tableBuilder.customProperties(customProperties);
tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 442962f31..b4990f198 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
@@ -175,10 +176,7 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
{
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
- "["
- +
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
- +
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
- + "]");
+ getPartitionOffsetStr(partitionNameByIds));
}
};
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
@@ -187,6 +185,150 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
}
}
+ @Test
+ void testTieringForAlterTable() throws Exception {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+
+ long t1Id = createPkTable(t1, 1, tableProperties,
Collections.emptyMap());
+
+ TableChange.SetOption setOption =
+ TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
+ List<TableChange> changes = Collections.singletonList(setOption);
+ admin.alterTable(t1, changes, false).get();
+
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+ // write records
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+ writeRows(t1, rows, false);
+ waitUntilSnapshot(t1Id, 1, 0);
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 3);
+ // check data in paimon
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
+ // check snapshot property in paimon
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "[{\"bucket_id\":0,\"log_offset\":3}]");
+ }
+ };
+ checkSnapshotPropertyInPaimon(t1, properties);
+
+ // then, create another log table
+ TablePath t2 = TablePath.of(DEFAULT_DB, "logTableAlter");
+
+ Map<String, String> logTableProperties = new HashMap<>();
+ logTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+ long t2Id = createLogTable(t2, 1, false, logTableProperties,
Collections.emptyMap());
+ // enable lake
+ admin.alterTable(t2, changes, false).get();
+
+ TableBucket t2Bucket = new TableBucket(t2Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+ // write records
+ for (int i = 0; i < 10; i++) {
+ rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+ flussRows.addAll(rows);
+ // write records
+ writeRows(t2, rows, true);
+ }
+ // check the status of replica after synced;
+ // note: we can't update log start offset for unaware bucket mode
log table
+ assertReplicaStatus(t2Bucket, 30);
+
+ // check data in paimon
+ checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+
+ // then write data to the pk tables
+ // write records
+ rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3,
"v333"));
+ // write records
+ writeRows(t1, rows, false);
+
+ // check the status of replica of t2 after synced
+ // not check start offset since we won't
+ // update start log offset for primary key table
+ assertReplicaStatus(t1Bucket, 9);
+
+ checkDataInPaimonPrimaryKeyTable(t1, rows);
+
+ // then create partitioned table and wait partitions are ready
+ TablePath partitionedTablePath = TablePath.of(DEFAULT_DB,
"partitionedTableAlter");
+ Map<String, String> partitionTableProperties = new HashMap<>();
+
partitionTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"false");
+
+ Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+ createPartitionedTable(
+ partitionedTablePath, partitionTableProperties,
Collections.emptyMap());
+
+ admin.alterTable(partitionedTablePath, changes, false).get();
+
+ Map<Long, String> partitionNameByIds =
waitUntilPartitions(partitionedTablePath);
+
+ // now, write rows into partitioned table
+ TableDescriptor partitionedTableDescriptor =
tableIdAndDescriptor.f1;
+ Map<String, List<InternalRow>> writtenRowsByPartition =
+ writeRowsIntoPartitionedTable(
+ partitionedTablePath, partitionedTableDescriptor,
partitionNameByIds);
+ long tableId = tableIdAndDescriptor.f0;
+
+ // wait until synced to paimon
+ for (Long partitionId : partitionNameByIds.keySet()) {
+ TableBucket tableBucket = new TableBucket(tableId,
partitionId, 0);
+ assertReplicaStatus(tableBucket, 3);
+ }
+
+ // now, let's check data in paimon per partition
+ // check data in paimon
+ String partitionCol =
partitionedTableDescriptor.getPartitionKeys().get(0);
+ for (String partitionName : partitionNameByIds.values()) {
+ checkDataInPaimonAppendOnlyPartitionedTable(
+ partitionedTablePath,
+ Collections.singletonMap(partitionCol, partitionName),
+ writtenRowsByPartition.get(partitionName),
+ 0);
+ }
+
+ properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ getPartitionOffsetStr(partitionNameByIds));
+ }
+ };
+ checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds)
{
+ String raw =
+
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
+ List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
+ Collections.sort(partitionIds);
+ List<String> partitionOffsetStrs = new ArrayList<>();
+
+ for (Long partitionId : partitionIds) {
+ String partitionName = partitionNameByIds.get(partitionId);
+ String partitionOffsetStr = String.format(raw, partitionId,
partitionName);
+ partitionOffsetStrs.add(partitionOffsetStr);
+ }
+
+ return "[" + String.join(",", partitionOffsetStrs) + "]";
+ }
+
@Test
void testTieringToDvEnabledTable() throws Exception {
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
@@ -214,6 +356,15 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath
partitionedTablePath)
throws Exception {
+ return createPartitionedTable(
+ partitionedTablePath, Collections.emptyMap(),
Collections.emptyMap());
+ }
+
+ private Tuple2<Long, TableDescriptor> createPartitionedTable(
+ TablePath partitionedTablePath,
+ Map<String, String> properties,
+ Map<String, String> customProperties)
+ throws Exception {
TableDescriptor partitionedTableDescriptor =
TableDescriptor.builder()
.schema(
@@ -229,6 +380,8 @@ class PaimonTieringITCase extends
FlinkPaimonTieringTestBase {
AutoPartitionTimeUnit.YEAR)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
+ .properties(properties)
+ .customProperties(customProperties)
.build();
return Tuple2.of(
createTable(partitionedTablePath, partitionedTableDescriptor),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 8b0ffba40..5e0c5dc2a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -90,6 +90,7 @@ import
org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
@@ -297,10 +298,16 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
}
+ TablePropertyChanges tablePropertyChanges =
+ toTablePropertyChanges(request.getConfigChangesList());
+
metadataManager.alterTableProperties(
tablePath,
- toTablePropertyChanges(request.getConfigChangesList()),
- request.isIgnoreIfNotExists());
+ tablePropertyChanges,
+ request.isIgnoreIfNotExists(),
+ lakeCatalog,
+ dataLakeFormat,
+ lakeTableTieringManager);
return CompletableFuture.completedFuture(new
AlterTablePropertiesResponse());
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index f65814c65..2056068a5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -23,7 +23,9 @@ import
org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidPartitionException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
@@ -32,6 +34,8 @@ import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
@@ -307,7 +311,10 @@ public class MetadataManager {
public void alterTableProperties(
TablePath tablePath,
TablePropertyChanges tablePropertyChanges,
- boolean ignoreIfNotExists) {
+ boolean ignoreIfNotExists,
+ @Nullable LakeCatalog lakeCatalog,
+ @Nullable DataLakeFormat dataLakeFormat,
+ LakeTableTieringManager lakeTableTieringManager) {
try {
// it throws TableNotExistException if the table or database not
exists
TableRegistration tableReg = getTableRegistration(tablePath);
@@ -328,12 +335,26 @@ public class MetadataManager {
if (newDescriptor != null) {
// reuse the same validate logic with the createTable() method
- validateTableDescriptor(tableDescriptor, maxBucketNum);
+ validateTableDescriptor(newDescriptor, maxBucketNum);
+
+ // pre alter table properties, e.g. create lake table in lake
storage if it's to
+ // enable datalake for the table
+ preAlterTableProperties(
+ tablePath, tableDescriptor, newDescriptor,
lakeCatalog, dataLakeFormat);
// update the table to zk
TableRegistration updatedTableRegistration =
tableReg.newProperties(
newDescriptor.getProperties(),
newDescriptor.getCustomProperties());
zookeeperClient.updateTable(tablePath,
updatedTableRegistration);
+
+ // post alter table properties, e.g. add the table to lake
table tiering manager if
+ // it's to enable datalake for the table
+ postAlterTableProperties(
+ tablePath,
+ schemaInfo,
+ tableDescriptor,
+ updatedTableRegistration,
+ lakeTableTieringManager);
} else {
LOG.info(
"No properties changed when alter table {}, skip
update table.", tablePath);
@@ -352,6 +373,66 @@ public class MetadataManager {
}
}
+ private void preAlterTableProperties(
+ TablePath tablePath,
+ TableDescriptor tableDescriptor,
+ TableDescriptor newDescriptor,
+ LakeCatalog lakeCatalog,
+ DataLakeFormat dataLakeFormat) {
+
+ boolean toEnableDataLake =
+ !isDataLakeEnabled(tableDescriptor) &&
isDataLakeEnabled(newDescriptor);
+
+ // enable lake table
+ if (toEnableDataLake) {
+ // TODO: should tolerate if the lake exist but matches our schema.
This ensures
+ // eventually
+ // consistent by idempotently creating the table multiple times.
See #846
+ // before create table in fluss, we may create in lake
+ if (lakeCatalog == null) {
+ throw new InvalidAlterTableException(
+ "Cannot alter table "
+ + tablePath
+ + " to enable data lake, because the Fluss
cluster doesn't enable datalake tables.");
+ } else {
+ try {
+ lakeCatalog.createTable(tablePath, newDescriptor);
+ } catch (TableAlreadyExistException e) {
+ throw new LakeTableAlreadyExistException(
+ String.format(
+ "The table %s already exists in %s
catalog, please "
+ + "first drop the table in %s
catalog or use a new table name.",
+ tablePath, dataLakeFormat,
dataLakeFormat));
+ }
+ }
+ }
+ // more pre-alter actions can be added here
+ }
+
+ private void postAlterTableProperties(
+ TablePath tablePath,
+ SchemaInfo schemaInfo,
+ TableDescriptor oldTableDescriptor,
+ TableRegistration newTableRegistration,
+ LakeTableTieringManager lakeTableTieringManager) {
+
+ boolean toEnableDataLake =
+ !isDataLakeEnabled(oldTableDescriptor)
+ && isDataLakeEnabled(newTableRegistration.properties);
+ boolean toDisableDataLake =
+ isDataLakeEnabled(oldTableDescriptor)
+ && !isDataLakeEnabled(newTableRegistration.properties);
+
+ if (toEnableDataLake) {
+ TableInfo newTableInfo =
newTableRegistration.toTableInfo(tablePath, schemaInfo);
+ // if the table is lake table, we need to add it to lake table
tiering manager
+ lakeTableTieringManager.addNewLakeTable(newTableInfo);
+ } else if (toDisableDataLake) {
+
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
+ }
+ // more post-alter actions can be added here
+ }
+
/**
* Get a new TableDescriptor with updated properties.
*
@@ -387,6 +468,17 @@ public class MetadataManager {
}
}
+ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) {
+ String dataLakeEnabledValue =
+
tableDescriptor.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+ return Boolean.parseBoolean(dataLakeEnabledValue);
+ }
+
+ private boolean isDataLakeEnabled(Map<String, String> properties) {
+ String dataLakeEnabledValue =
properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
+ return Boolean.parseBoolean(dataLakeEnabledValue);
+ }
+
public TableInfo getTable(TablePath tablePath) throws
TableNotExistException {
Optional<TableRegistration> optionalTable;
try {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 8b56166d6..76e8adc82 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -47,6 +47,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
+import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption;
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -108,7 +109,7 @@ public class TableDescriptorValidation {
TableInfo currentTable, Set<String> tableKeysToChange, Set<String>
customKeysToChange) {
tableKeysToChange.forEach(
k -> {
- if (isTableStorageConfig(k)) {
+ if (isTableStorageConfig(k) && !isAlterableTableOption(k))
{
throw new InvalidAlterTableException(
"The option '" + k + "' is not supported to
alter yet.");
}