This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 86a283ad7 [server] Update tablet server metadata cache when
TableRegistration changed (#2378)
86a283ad7 is described below
commit 86a283ad7feeafb8877108441beaa18e062b0b82
Author: Liebing <[email protected]>
AuthorDate: Sat Jan 24 19:52:06 2026 +0800
[server] Update tablet server metadata cache when TableRegistration changed
(#2378)
---
.../java/org/apache/fluss/metadata/TableInfo.java | 17 --
.../lake/paimon/LakeEnabledTableCreateITCase.java | 36 ++++
.../coordinator/CoordinatorEventProcessor.java | 62 +++++++
.../coordinator/CoordinatorRequestBatch.java | 3 +-
.../server/coordinator/CoordinatorService.java | 5 -
.../fluss/server/coordinator/MetadataManager.java | 65 +-------
.../event/TableRegistrationChangeEvent.java | 68 ++++++++
.../event/watcher/TableChangeWatcher.java | 17 +-
.../org/apache/fluss/server/log/LogTablet.java | 2 -
.../org/apache/fluss/server/replica/Replica.java | 26 +++
.../fluss/server/replica/ReplicaManager.java | 29 ++++
.../coordinator/CoordinatorEventProcessorTest.java | 67 +++++++-
.../server/coordinator/LakeTableManagerITCase.java | 12 +-
.../server/coordinator/TableManagerITCase.java | 33 +---
.../event/watcher/TableChangeWatcherTest.java | 64 ++++++-
.../fluss/server/log/remote/RemoteLogITCase.java | 183 ++++++++++++++++++++-
.../apache/fluss/server/replica/ReplicaTest.java | 22 +++
.../server/testutils/RpcMessageTestUtils.java | 20 ++-
18 files changed, 612 insertions(+), 119 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
index eba41e078..be894ee79 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableInfo.java
@@ -340,23 +340,6 @@ public final class TableInfo {
modifiedTime);
}
- /** Replace a TableInfo with a new SchemaInfo. */
- public TableInfo withNewSchema(SchemaInfo schemaInfo) {
- return new TableInfo(
- tablePath,
- tableId,
- schemaInfo.getSchemaId(),
- schemaInfo.getSchema(),
- bucketKeys,
- partitionKeys,
- numBuckets,
- properties,
- customProperties,
- comment,
- createdTime,
- modifiedTime);
- }
-
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 6fb700322..cf6def30f 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -27,9 +27,11 @@ import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.types.DataTypes;
@@ -59,6 +61,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import javax.annotation.Nullable;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +75,7 @@ import static
org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -584,6 +588,7 @@ class LakeEnabledTableCreateITCase {
.build();
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
admin.createTable(logTablePath, logTable, false).get();
+ long tableId = admin.getTableInfo(logTablePath).get().getTableId();
assertThatThrownBy(
() ->
@@ -591,6 +596,9 @@ class LakeEnabledTableCreateITCase {
Identifier.create(DATABASE,
logTablePath.getTableName())))
.isInstanceOf(Catalog.TableNotExistException.class);
+ // verify LogTablet datalake status is initially disabled
+ verifyLogTabletDataLakeEnabled(tableId, false);
+
// enable lake
TableChange.SetOption enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true");
@@ -598,6 +606,9 @@ class LakeEnabledTableCreateITCase {
admin.alterTable(logTablePath, changes, false).get();
+ // verify LogTablet datalake status is enabled
+ verifyLogTabletDataLakeEnabled(tableId, true);
+
Identifier paimonTablePath = Identifier.create(DATABASE,
logTablePath.getTableName());
Table enabledPaimonLogTable = paimonCatalog.getTable(paimonTablePath);
@@ -635,11 +646,17 @@ class LakeEnabledTableCreateITCase {
// paimon table should still exist although lake is disabled
paimonCatalog.getTable(paimonTablePath);
+ // verify LogTablet datalake status is disabled
+ verifyLogTabletDataLakeEnabled(tableId, false);
+
// try to enable lake table again
enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();
+ // verify LogTablet datalake status is enabled again
+ verifyLogTabletDataLakeEnabled(tableId, true);
+
// write some data to the lake table
writeData(paimonCatalog.getTable(paimonTablePath));
Optional<Snapshot> snapshot =
paimonCatalog.getTable(paimonTablePath).latestSnapshot();
@@ -649,10 +666,16 @@ class LakeEnabledTableCreateITCase {
changes = Collections.singletonList(disableLake);
admin.alterTable(logTablePath, changes, false).get();
+ // verify LogTablet datalake status is disabled again
+ verifyLogTabletDataLakeEnabled(tableId, false);
+
// try to enable lake table again, the snapshot should not change
changes = Collections.singletonList(enableLake);
admin.alterTable(logTablePath, changes, false).get();
assertThat(paimonCatalog.getTable(paimonTablePath).latestSnapshot()).isEqualTo(snapshot);
+
+ // verify LogTablet datalake status is enabled
+ verifyLogTabletDataLakeEnabled(tableId, true);
}
@Test
@@ -1021,6 +1044,19 @@ class LakeEnabledTableCreateITCase {
assertThat(paimonTable.comment()).isEqualTo(flussTable.getComment());
}
+ private void verifyLogTabletDataLakeEnabled(long tableId, boolean
isDataLakeEnabled) {
+ for (int bucket = 0; bucket < BUCKET_NUM; bucket++) {
+ TableBucket tb = new TableBucket(tableId, bucket);
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ Replica replica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+ assertThat(replica.getLogTablet().isDataLakeEnabled())
+ .isEqualTo(isDataLakeEnabled);
+ });
+ }
+ }
+
private TableDescriptor createTableDescriptor(
int columnNum,
int bucketNum,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 55b59753c..aa0afbbcd 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -85,6 +85,7 @@ import
org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseRecei
import org.apache.fluss.server.coordinator.event.RebalanceEvent;
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import
org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
@@ -125,6 +126,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -560,6 +562,8 @@ public class CoordinatorEventProcessor implements
EventProcessor {
} else if (event instanceof SchemaChangeEvent) {
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
processSchemaChange(schemaChangeEvent);
+ } else if (event instanceof TableRegistrationChangeEvent) {
+ processTableRegistrationChange((TableRegistrationChangeEvent)
event);
} else if (event instanceof NotifyLeaderAndIsrResponseReceivedEvent) {
processNotifyLeaderAndIsrResponseReceivedEvent(
(NotifyLeaderAndIsrResponseReceivedEvent) event);
@@ -720,6 +724,64 @@ public class CoordinatorEventProcessor implements
EventProcessor {
null);
}
+ private void processTableRegistrationChange(TableRegistrationChangeEvent
event) {
+ TablePath tablePath = event.getTablePath();
+ Long tableId = coordinatorContext.getTableIdByPath(tablePath);
+
+ // Skip if the table is not yet registered in coordinator context.
+ // Should not happen in normal cases.
+ if (tableId == null) {
+ LOG.warn(
+ "Table {} is not registered in coordinator context, "
+ + "skip processing table registration change.",
+ tablePath);
+ return;
+ }
+
+ TableInfo oldTableInfo = coordinatorContext.getTableInfoById(tableId);
+
+ TableInfo newTableInfo =
+ event.getNewTableRegistration()
+ .toTableInfo(tablePath, oldTableInfo.getSchemaInfo());
+ coordinatorContext.putTableInfo(newTableInfo);
+ postAlterTableProperties(oldTableInfo, newTableInfo);
+
+ // Notify tablet servers about the metadata change
+ updateTabletServerMetadataCache(
+ new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ tableId,
+ null,
+ null);
+ }
+
+ private void postAlterTableProperties(TableInfo oldTableInfo, TableInfo
newTableInfo) {
+ boolean dataLakeEnabled =
newTableInfo.getTableConfig().isDataLakeEnabled();
+ boolean toEnableDataLake =
+ !oldTableInfo.getTableConfig().isDataLakeEnabled()
+ && newTableInfo.getTableConfig().isDataLakeEnabled();
+ boolean toDisableDataLake =
+ oldTableInfo.getTableConfig().isDataLakeEnabled()
+ && !newTableInfo.getTableConfig().isDataLakeEnabled();
+
+ if (toEnableDataLake) {
+ // if the table is lake table, we need to add it to lake table
tiering manager
+ lakeTableTieringManager.addNewLakeTable(newTableInfo);
+ } else if (toDisableDataLake) {
+ lakeTableTieringManager.removeLakeTable(newTableInfo.getTableId());
+ } else if (dataLakeEnabled) {
+ // The table is still a lake table, check if freshness has changed
+ Duration oldFreshness =
oldTableInfo.getTableConfig().getDataLakeFreshness();
+ Duration newFreshness =
newTableInfo.getTableConfig().getDataLakeFreshness();
+
+ // Check if freshness has changed
+ if (!Objects.equals(oldFreshness, newFreshness)) {
+ lakeTableTieringManager.updateTableLakeFreshness(
+ newTableInfo.getTableId(), newFreshness.toMillis());
+ }
+ }
+ // more post-alter actions can be added here
+ }
+
private void processCreatePartition(CreatePartitionEvent
createPartitionEvent) {
long partitionId = createPartitionEvent.getPartitionId();
// skip the partition if it already exists
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 35527805c..436cc5203 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -280,6 +280,7 @@ public class CoordinatorRequestBatch {
* <li>case8: One newly tabletServer added into cluster
* <li>case9: One tabletServer is removed from cluster
* <li>case10: schemaId is changed after table is created.
+ * <li>case 11: TableRegistration changed after table is created.
* </ol>
*/
// todo: improve this with different phase enum.
@@ -300,7 +301,7 @@ public class CoordinatorRequestBatch {
.computeIfAbsent(tableId, k -> new HashMap<>())
.put(partitionId, Collections.emptyList());
} else {
- // case3, case4, case10
+ // case3, case4, case10, case 11
updateMetadataRequestBucketMap.put(tableId,
Collections.emptyList());
}
} else {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 94d56dd33..a9cd5b3c5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -378,8 +378,6 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
+ "table properties or table schema.");
}
- LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
- lakeCatalogDynamicLoader.getLakeCatalogContainer();
LakeCatalog.Context lakeCatalogContext =
new DefaultLakeCatalogContext(false,
currentSession().getPrincipal());
@@ -388,7 +386,6 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
tablePath,
alterSchemaChanges,
request.isIgnoreIfNotExists(),
- lakeCatalogContainer.getLakeCatalog(),
lakeCatalogContext);
}
@@ -398,8 +395,6 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
alterTableConfigChanges,
tablePropertyChanges,
request.isIgnoreIfNotExists(),
- lakeCatalogContainer.getLakeCatalog(),
- lakeTableTieringManager,
lakeCatalogContext);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index f728e91da..8d7b61a08 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -60,14 +60,12 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -327,7 +325,6 @@ public class MetadataManager {
TablePath tablePath,
List<TableChange> schemaChanges,
boolean ignoreIfNotExists,
- @Nullable LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext)
throws TableNotExistException, TableNotPartitionedException {
try {
@@ -339,8 +336,7 @@ public class MetadataManager {
Schema newSchema = SchemaUpdate.applySchemaChanges(table,
schemaChanges);
// Lake First: sync to Lake before updating Fluss schema
- syncSchemaChangesToLake(
- tablePath, table, schemaChanges, lakeCatalog,
lakeCatalogContext);
+ syncSchemaChangesToLake(tablePath, table, schemaChanges,
lakeCatalogContext);
// Update Fluss schema (ZK) after Lake sync succeeds
if (!newSchema.equals(table.getSchema())) {
@@ -370,12 +366,13 @@ public class MetadataManager {
TablePath tablePath,
TableInfo tableInfo,
List<TableChange> schemaChanges,
- @Nullable LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext) {
if (!isDataLakeEnabled(tableInfo.toTableDescriptor())) {
return;
}
+ LakeCatalog lakeCatalog =
+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
if (lakeCatalog == null) {
throw new InvalidAlterTableException(
"Cannot alter schema for datalake enabled table "
@@ -399,8 +396,6 @@ public class MetadataManager {
List<TableChange> tableChanges,
TablePropertyChanges tablePropertyChanges,
boolean ignoreIfNotExists,
- @Nullable LakeCatalog lakeCatalog,
- LakeTableTieringManager lakeTableTieringManager,
LakeCatalog.Context lakeCatalogContext) {
try {
// it throws TableNotExistException if the table or database not
exists
@@ -431,22 +426,12 @@ public class MetadataManager {
tableDescriptor,
newDescriptor,
tableChanges,
- lakeCatalog,
lakeCatalogContext);
// update the table to zk
TableRegistration updatedTableRegistration =
tableReg.newProperties(
newDescriptor.getProperties(),
newDescriptor.getCustomProperties());
zookeeperClient.updateTable(tablePath,
updatedTableRegistration);
-
- // post alter table properties, e.g. add the table to lake
table tiering manager if
- // it's to enable datalake for the table
- postAlterTableProperties(
- tablePath,
- schemaInfo,
- tableDescriptor,
- updatedTableRegistration,
- lakeTableTieringManager);
} else {
LOG.info(
"No properties changed when alter table {}, skip
update table.", tablePath);
@@ -471,8 +456,10 @@ public class MetadataManager {
TableDescriptor tableDescriptor,
TableDescriptor newDescriptor,
List<TableChange> tableChanges,
- LakeCatalog lakeCatalog,
LakeCatalog.Context lakeCatalogContext) {
+ LakeCatalog lakeCatalog =
+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
+
if (isDataLakeEnabled(newDescriptor)) {
if (lakeCatalog == null) {
throw new InvalidAlterTableException(
@@ -511,41 +498,6 @@ public class MetadataManager {
}
}
- private void postAlterTableProperties(
- TablePath tablePath,
- SchemaInfo schemaInfo,
- TableDescriptor oldTableDescriptor,
- TableRegistration newTableRegistration,
- LakeTableTieringManager lakeTableTieringManager) {
-
- boolean dataLakeEnabled =
isDataLakeEnabled(newTableRegistration.properties);
- boolean toEnableDataLake = !isDataLakeEnabled(oldTableDescriptor) &&
dataLakeEnabled;
- boolean toDisableDataLake = isDataLakeEnabled(oldTableDescriptor) &&
!dataLakeEnabled;
-
- if (toEnableDataLake) {
- TableInfo newTableInfo =
newTableRegistration.toTableInfo(tablePath, schemaInfo);
- // if the table is lake table, we need to add it to lake table
tiering manager
- lakeTableTieringManager.addNewLakeTable(newTableInfo);
- } else if (toDisableDataLake) {
-
lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId);
- } else if (dataLakeEnabled) {
- // The table is still a lake table, check if freshness has changed
- Duration oldFreshness =
- Configuration.fromMap(oldTableDescriptor.getProperties())
- .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
- Duration newFreshness =
- Configuration.fromMap(newTableRegistration.properties)
- .get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
-
- // Check if freshness has changed
- if (!Objects.equals(oldFreshness, newFreshness)) {
- lakeTableTieringManager.updateTableLakeFreshness(
- newTableRegistration.tableId, newFreshness.toMillis());
- }
- }
- // more post-alter actions can be added here
- }
-
/**
* Get a new TableDescriptor with updated properties.
*
@@ -587,11 +539,6 @@ public class MetadataManager {
return Boolean.parseBoolean(dataLakeEnabledValue);
}
- private boolean isDataLakeEnabled(Map<String, String> properties) {
- String dataLakeEnabledValue =
properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
- return Boolean.parseBoolean(dataLakeEnabledValue);
- }
-
public void removeSensitiveTableOptions(Map<String, String>
tableLakeOptions) {
if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
return;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
new file mode 100644
index 000000000..6d128cfa7
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/TableRegistrationChangeEvent.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.zk.data.TableRegistration;
+
+import java.util.Objects;
+
+/** An event for table registration change. */
+public class TableRegistrationChangeEvent implements CoordinatorEvent {
+ private final TablePath tablePath;
+ private final TableRegistration newTableRegistration;
+
+ public TableRegistrationChangeEvent(
+ TablePath tablePath, TableRegistration newTableRegistration) {
+ this.tablePath = tablePath;
+ this.newTableRegistration = newTableRegistration;
+ }
+
+ public TablePath getTablePath() {
+ return tablePath;
+ }
+
+ public TableRegistration getNewTableRegistration() {
+ return newTableRegistration;
+ }
+
+ @Override
+ public String toString() {
+ return "TablePropertiesChangeEvent{"
+ + "tablePath="
+ + tablePath
+ + ", tableRegistration="
+ + newTableRegistration
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableRegistrationChangeEvent that = (TableRegistrationChangeEvent) o;
+ return Objects.equals(tablePath, that.tablePath)
+ && Objects.equals(newTableRegistration,
that.newTableRegistration);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tablePath, newTableRegistration);
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
index 09c273e7d..5b4821f19 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcher.java
@@ -30,6 +30,7 @@ import
org.apache.fluss.server.coordinator.event.DropPartitionEvent;
import org.apache.fluss.server.coordinator.event.DropTableEvent;
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;
@@ -127,7 +128,16 @@ public class TableChangeWatcher {
if (tablePath == null) {
break;
}
- processCreateTable(tablePath, newData);
+ // Distinguish between table creation and
properties change.
+ // If oldData exists and contains valid table
registration data,
+ // it's a properties change; otherwise, it's a
table creation.
+ if (oldData != null
+ && oldData.getData() != null
+ && oldData.getData().length > 0) {
+ processTableRegistrationChange(tablePath,
newData);
+ } else {
+ processCreateTable(tablePath, newData);
+ }
}
break;
}
@@ -241,6 +251,11 @@ public class TableChangeWatcher {
new CreatePartitionEvent(
tablePath, tableId, partitionId, partitionName,
partitionAssignment));
}
+
+ private void processTableRegistrationChange(TablePath tablePath,
ChildData newData) {
+ TableRegistration newTable = TableZNode.decode(newData.getData());
+ eventManager.put(new TableRegistrationChangeEvent(tablePath,
newTable));
+ }
}
private void processSchemaChange(TablePath tablePath, int schemaId) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index a4d7668f9..3f333dc10 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -529,8 +529,6 @@ public final class LogTablet {
public void updateLakeTableSnapshotId(long snapshotId) {
if (snapshotId > this.lakeTableSnapshotId) {
this.lakeTableSnapshotId = snapshotId;
- // it means the data lake is enabled if we have got the snapshot id
- this.isDataLakeEnabled = true;
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 9314901fe..ee158b7a4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -560,6 +560,32 @@ public final class Replica {
logTablet.updateLeaderEndOffsetSnapshot();
}
+ public void updateIsDataLakeEnabled(boolean isDataLakeEnabled) {
+ boolean old = logTablet.isDataLakeEnabled();
+ if (old == isDataLakeEnabled) {
+ return;
+ }
+
+ logTablet.updateIsDataLakeEnabled(isDataLakeEnabled);
+
+ if (isLeader()) {
+ if (isDataLakeEnabled) {
+ registerLakeTieringMetrics();
+ } else {
+ if (lakeTieringMetricGroup != null) {
+ lakeTieringMetricGroup.close();
+ lakeTieringMetricGroup = null;
+ }
+ }
+ }
+
+ LOG.info(
+ "Replica for {} isDataLakeEnabled changed from {} to {}",
+ tableBucket,
+ old,
+ isDataLakeEnabled);
+ }
+
private void createKv() {
try {
// create a closeable registry for the closable related to kv
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 421db4a06..76e3ea186 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -84,6 +84,7 @@ import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.metadata.ClusterMetadata;
+import org.apache.fluss.server.metadata.TableMetadata;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.metrics.UserMetrics;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
@@ -459,9 +460,37 @@ public class ReplicaManager {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(coordinatorEpoch,
"updateMetadataCache");
metadataCache.updateClusterMetadata(clusterMetadata);
+ updateReplicaTableConfig(clusterMetadata);
});
}
+ private void updateReplicaTableConfig(ClusterMetadata clusterMetadata) {
+ Map<Long, Boolean> tableIdToLakeFlag = new HashMap<>();
+ for (TableMetadata tableMetadata :
clusterMetadata.getTableMetadataList()) {
+ TableInfo tableInfo = tableMetadata.getTableInfo();
+ if (tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
+ long tableId = tableInfo.getTableId();
+ boolean dataLakeEnabled =
tableInfo.getTableConfig().isDataLakeEnabled();
+ tableIdToLakeFlag.put(tableId, dataLakeEnabled);
+ }
+ }
+
+ if (tableIdToLakeFlag.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry<TableBucket, HostedReplica> entry :
allReplicas.entrySet()) {
+ HostedReplica hostedReplica = entry.getValue();
+ if (hostedReplica instanceof OnlineReplica) {
+ Replica replica = ((OnlineReplica) hostedReplica).getReplica();
+ long tableId = replica.getTableBucket().getTableId();
+ if (tableIdToLakeFlag.containsKey(tableId)) {
+
replica.updateIsDataLakeEnabled(tableIdToLakeFlag.get(tableId));
+ }
+ }
+ }
+ }
+
/**
* Append log records to leader replicas of the buckets, and wait for them
to be replicated to
* other replicas.
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index dfb4088e4..d4ec8f8db 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -53,6 +53,7 @@ import
org.apache.fluss.server.coordinator.statemachine.ReplicaState;
import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.CommitRemoteLogManifestData;
+import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
import org.apache.fluss.server.metadata.BucketMetadata;
@@ -906,6 +907,70 @@ class CoordinatorEventProcessorTest {
3, new TableMetadata(tableInfo2,
Collections.emptyList())));
}
+ @Test
+ void testTableRegistrationChange() throws Exception {
+ // make sure all request to gateway should be successful
+ initCoordinatorChannel();
+
+ // create a table
+ TablePath t1 = TablePath.of(defaultDatabase,
"test_table_registration_change");
+ int nBuckets = 1;
+ int replicationFactor = 3;
+ TableAssignment tableAssignment =
+ generateAssignment(
+ nBuckets,
+ replicationFactor,
+ new TabletServerInfo[] {
+ new TabletServerInfo(0, "rack0"),
+ new TabletServerInfo(1, "rack1"),
+ new TabletServerInfo(2, "rack2")
+ });
+ // create table
+ List<Integer> replicas =
tableAssignment.getBucketAssignment(0).getReplicas();
+ metadataManager.createTable(t1, TEST_TABLE, tableAssignment, false);
+ TableInfo tableInfo = metadataManager.getTable(t1);
+
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ verifyMetadataUpdateRequest(
+ 3,
+ new TableMetadata(
+ tableInfo,
+ Collections.singletonList(
+ new BucketMetadata(
+ 0, replicas.get(0), 0,
replicas)))));
+
+ // alter table properties (custom property)
+ TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+ builder.setCustomProperty("custom.key", "custom.value");
+ TablePropertyChanges tablePropertyChanges = builder.build();
+ metadataManager.alterTableProperties(
+ t1, Collections.emptyList(), tablePropertyChanges, false,
null);
+
+ // get updated table info and verify metadata update request is sent
+ TableInfo updatedTableInfo = metadataManager.getTable(t1);
+ assertThat(updatedTableInfo.getCustomProperties().toMap())
+ .containsEntry("custom.key", "custom.value");
+
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ verifyMetadataUpdateRequest(
+ 3, new TableMetadata(updatedTableInfo,
Collections.emptyList())));
+
+ // verify the table info in coordinator context is updated
+ retryVerifyContext(
+ ctx -> {
+ Long tableId = ctx.getTableIdByPath(t1);
+ assertThat(tableId).isNotNull();
+ TableInfo tableInfoInCtx = ctx.getTableInfoById(tableId);
+ assertThat(tableInfoInCtx).isNotNull();
+ assertThat(tableInfoInCtx.getCustomProperties().toMap())
+ .containsEntry("custom.key", "custom.value");
+ });
+ }
+
@Test
void testDoBucketReassignment() throws Exception {
zookeeperClient.registerTabletServer(
@@ -1305,7 +1370,7 @@ class CoordinatorEventProcessorTest {
}
private void alterTable(TablePath tablePath, List<TableChange>
schemaChanges) {
- metadataManager.alterTableSchema(tablePath, schemaChanges, true, null,
null);
+ metadataManager.alterTableSchema(tablePath, schemaChanges, true, null);
}
private TableDescriptor getPartitionedTable() {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index 2472d5041..dd25e0ef1 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -156,7 +156,8 @@ class LakeTableManagerITCase {
.alterTable(
newAlterTableRequest(
tablePath,
- alterTableProperties(setProperties, new
ArrayList<>()),
+ setProperties,
+ Collections.emptyList(),
Collections.emptyList(),
false))
.get();
@@ -185,7 +186,8 @@ class LakeTableManagerITCase {
.alterTable(
newAlterTableRequest(
tablePath,
- alterTableProperties(new HashMap<>(),
resetProperties),
+ Collections.emptyMap(),
+ resetProperties,
Collections.emptyList(),
false))
.get();
@@ -214,7 +216,8 @@ class LakeTableManagerITCase {
.alterTable(
newAlterTableRequest(
tablePath,
- alterTableProperties(new HashMap<>(),
resetProperties2),
+ Collections.emptyMap(),
+ resetProperties2,
Collections.emptyList(),
false))
.get();
@@ -280,7 +283,8 @@ class LakeTableManagerITCase {
.alterTable(
newAlterTableRequest(
tablePath,
- alterTableProperties(setProperties, new
ArrayList<>()),
+ setProperties,
+ Collections.emptyList(),
Collections.emptyList(),
false))
.get();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index f21d5bdd8..155b2c4fe 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -23,7 +23,6 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
@@ -51,7 +50,6 @@ import org.apache.fluss.rpc.messages.ListDatabasesRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.MetadataResponse;
import org.apache.fluss.rpc.messages.PbAddColumn;
-import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
import org.apache.fluss.rpc.messages.PbPartitionMetadata;
import org.apache.fluss.rpc.messages.PbServerNode;
@@ -303,7 +301,8 @@ class TableManagerITCase {
.alterTable(
newAlterTableRequest(
tablePath,
- alterTableProperties(setProperties,
resetProperties),
+ setProperties,
+ resetProperties,
Collections.emptyList(),
false))
.get();
@@ -679,7 +678,11 @@ class TableManagerITCase {
adminGateway
.alterTable(
newAlterTableRequest(
- tablePath, Collections.emptyList(),
alterTableAddColumns(), false))
+ tablePath,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ alterTableAddColumns(),
+ false))
.get();
// restart coordinatorServer
@@ -820,28 +823,6 @@ class TableManagerITCase {
.build();
}
- private static List<PbAlterConfig> alterTableProperties(
- Map<String, String> setProperties, List<String> resetProperties) {
- List<PbAlterConfig> res = new ArrayList<>();
-
- for (Map.Entry<String, String> entry : setProperties.entrySet()) {
- PbAlterConfig info = new PbAlterConfig();
- info.setConfigKey(entry.getKey());
- info.setConfigValue(entry.getValue());
- info.setOpType(AlterConfigOpType.SET.value());
- res.add(info);
- }
-
- for (String resetProperty : resetProperties) {
- PbAlterConfig info = new PbAlterConfig();
- info.setConfigKey(resetProperty);
- info.setOpType(AlterConfigOpType.DELETE.value());
- res.add(info);
- }
-
- return res;
- }
-
private static List<PbAddColumn> alterTableAddColumns() {
List<PbAddColumn> addColumns = new ArrayList<>();
PbAddColumn newNestedColumn = new PbAddColumn();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
index 691f0d9ce..76634b970 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java
@@ -35,12 +35,15 @@ import
org.apache.fluss.server.coordinator.event.CreateTableEvent;
import org.apache.fluss.server.coordinator.event.DropPartitionEvent;
import org.apache.fluss.server.coordinator.event.DropTableEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
+import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.TestingEventManager;
+import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.TableAssignment;
+import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.testutils.common.AllCallbackWrapper;
import org.apache.fluss.types.DataTypes;
@@ -309,7 +312,6 @@ class TableChangeWatcherTest {
null,
TableChange.ColumnPosition.last())),
false,
- null,
null);
Schema newSchema =
Schema.newBuilder()
@@ -329,4 +331,64 @@ class TableChangeWatcherTest {
assertThat(eventManager.getEvents())
.containsExactlyInAnyOrderElementsOf(allEvents));
}
+
+ @Test
+ void testTableRegistrationChange() {
+ // create a table
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"table_registration_change");
+ TableAssignment tableAssignment =
+ generateAssignment(
+ 3,
+ 3,
+ new TabletServerInfo[] {
+ new TabletServerInfo(0, "rack0"),
+ new TabletServerInfo(1, "rack1"),
+ new TabletServerInfo(2, "rack2")
+ });
+ long tableId = metadataManager.createTable(tablePath, TEST_TABLE,
tableAssignment, false);
+ SchemaInfo schemaInfo = metadataManager.getLatestSchema(tablePath);
+ long currentMillis = System.currentTimeMillis();
+
+ List<CoordinatorEvent> expectedEvents = new ArrayList<>();
+ expectedEvents.add(
+ new CreateTableEvent(
+ TableInfo.of(
+ tablePath,
+ tableId,
+ schemaInfo.getSchemaId(),
+ TEST_TABLE,
+ currentMillis,
+ currentMillis),
+ tableAssignment));
+ expectedEvents.add(new SchemaChangeEvent(tablePath, schemaInfo));
+
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ assertThat(eventManager.getEvents())
+
.containsExactlyInAnyOrderElementsOf(expectedEvents));
+
+ // alter table properties (custom property)
+ TablePropertyChanges.Builder builder = TablePropertyChanges.builder();
+ builder.setCustomProperty("custom.key", "custom.value");
+ TablePropertyChanges tablePropertyChanges = builder.build();
+ metadataManager.alterTableProperties(
+ tablePath, Collections.emptyList(), tablePropertyChanges,
false, null);
+
+ // get the updated table registration
+ TableRegistration updatedTableRegistration =
+ metadataManager.getTableRegistration(tablePath);
+
+ // verify TableRegistrationChangeEvent is generated
+ expectedEvents.add(new TableRegistrationChangeEvent(tablePath,
updatedTableRegistration));
+
+ metadataManager.dropTable(tablePath, false);
+ expectedEvents.add(new DropTableEvent(tableId, false, false));
+
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ assertThat(eventManager.getEvents())
+
.containsExactlyInAnyOrderElementsOf(expectedEvents));
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
index 3187e5db5..90dfa0914 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogITCase.java
@@ -22,17 +22,24 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.remote.RemoteLogSegment;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.server.entity.FetchReqInfo;
import org.apache.fluss.server.log.FetchParams;
+import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.tablet.TabletServer;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.clock.ManualClock;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -41,14 +48,19 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static org.apache.fluss.record.TestData.DATA1;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.createTable;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest;
import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
@@ -58,11 +70,15 @@ import static org.assertj.core.api.Assertions.assertThat;
/** ITCase for remote log. */
public class RemoteLogITCase {
+
+ private static final ManualClock MANUAL_CLOCK = new
ManualClock(System.currentTimeMillis());
+
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder()
.setNumOfTabletServers(3)
.setClusterConf(initConfig())
+ .setClock(MANUAL_CLOCK)
.build();
private TableBucket setupTableBucket() throws Exception {
@@ -75,6 +91,11 @@ public class RemoteLogITCase {
private void produceRecordsAndWaitRemoteLogCopy(
TabletServerGateway leaderGateway, TableBucket tb) throws
Exception {
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 0L);
+ }
+
+ private void produceRecordsAndWaitRemoteLogCopy(
+ TabletServerGateway leaderGateway, TableBucket tb, long
baseOffset) throws Exception {
for (int i = 0; i < 10; i++) {
assertProduceLogResponse(
leaderGateway
@@ -86,7 +107,7 @@ public class RemoteLogITCase {
genMemoryLogRecordsByObject(DATA1)))
.get(),
0,
- i * 10L);
+ baseOffset + i * 10L);
}
FLUSS_CLUSTER_EXTENSION.waitUntilSomeLogSegmentsCopyToRemote(
new TableBucket(tb.getTableId(), 0));
@@ -202,6 +223,164 @@ public class RemoteLogITCase {
FLUSS_CLUSTER_EXTENSION.waitUntilReplicaExpandToIsr(tb, follower);
}
+ @Test
+ void testRemoteLogTTLWithDynamicLakeToggle() throws Exception {
+ TablePath tablePath = TablePath.of("fluss",
"test_remote_log_ttl_dynamic_lake");
+ // ==================== Stage A: Lake Disabled (Initial)
====================
+ // Create table without data lake enabled
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(DATA1_SCHEMA)
+ .distributedBy(1)
+ // Set a short TTL for testing (1 hour)
+ .property(ConfigOptions.TABLE_LOG_TTL,
Duration.ofHours(1))
+ .build();
+
+ long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
tableDescriptor);
+ TableBucket tb = new TableBucket(tableId, 0);
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb);
+
+ int leaderId = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb);
+ TabletServerGateway leaderGateway =
+ FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderId);
+
+ // Produce records to create remote log segments
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb);
+
+ TabletServer tabletServer =
FLUSS_CLUSTER_EXTENSION.getTabletServerById(leaderId);
+ RemoteLogManager remoteLogManager =
tabletServer.getReplicaManager().getRemoteLogManager();
+ RemoteLogTablet remoteLogTablet = remoteLogManager.remoteLogTablet(tb);
+
+
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0);
+ assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0L);
+
+ Replica leaderReplica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tb);
+ LogTablet logTablet = leaderReplica.getLogTablet();
+ assertThat(logTablet.isDataLakeEnabled()).isFalse();
+
+ // Advance time past TTL (1 hour + buffer)
+ MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+ // Since lake is disabled, expired segments should be deleted directly
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+ });
+
+ // ==================== Stage B: Dynamic Enable & Retention
====================
+ // Dynamically enable data lake
+ CoordinatorGateway coordinatorGateway =
FLUSS_CLUSTER_EXTENSION.newCoordinatorClient();
+ coordinatorGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ Collections.singletonMap(
+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ false))
+ .get();
+ retry(Duration.ofMinutes(1), () ->
assertThat(logTablet.isDataLakeEnabled()).isTrue());
+
+ // Produce new data after enabling lake (baseOffset = 100 from Stage A)
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 100L);
+ retry(
+ Duration.ofMinutes(2),
+ () ->
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0));
+ int stageBSegmentCount = remoteLogTablet.allRemoteLogSegments().size();
+ long stageBRemoteLogEndOffset =
remoteLogTablet.getRemoteLogEndOffset().orElse(-1L);
+ assertThat(stageBRemoteLogEndOffset).isGreaterThan(0L);
+
+ // Advance time past TTL (1 hour + buffer)
+ MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+ // Since lake is enabled but no data has been tiered (lakeLogEndOffset
= -1),
+ // the expired segments should NOT be deleted
+
assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(stageBSegmentCount);
+
+ // ==================== Stage C: Lake Progress Update (Cleanup)
====================
+ // Step 1: Partially update lake log end offset to trigger partial
cleanup
+ // Get segments sorted by remoteLogEndOffset and use middle segment's
end offset
+ List<RemoteLogSegment> sortedSegments =
+ remoteLogTablet.allRemoteLogSegments().stream()
+
.sorted(Comparator.comparingLong(RemoteLogSegment::remoteLogEndOffset))
+ .collect(Collectors.toList());
+ assertThat(sortedSegments.size()).isGreaterThanOrEqualTo(2);
+
+ // Use the end offset of a middle segment to ensure partial deletion
+ int midIndex = sortedSegments.size() / 2;
+ long partialLakeOffset =
sortedSegments.get(midIndex).remoteLogEndOffset();
+ logTablet.updateLakeLogEndOffset(partialLakeOffset);
+
+ final int expectedRemainingSegments = sortedSegments.size() - midIndex
- 1;
+ // The new remoteLogStartOffset should be the start offset of the
first remaining segment
+ final long expectedNewStartOffset = sortedSegments.get(midIndex +
1).remoteLogStartOffset();
+
+ // Wait for partial cleanup - only segments that have been tiered
should be deleted
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+ // Some segments should be deleted (those with endOffset
<= partialLakeOffset)
+ int currentSegmentCount =
remoteLogTablet.allRemoteLogSegments().size();
+
assertThat(currentSegmentCount).isEqualTo(expectedRemainingSegments);
+ // Remote log start offset should be updated to the first
remaining segment's
+ // start
+ assertThat(remoteLogTablet.getRemoteLogStartOffset())
+ .isEqualTo(expectedNewStartOffset);
+ // Remaining segments should have remoteLogEndOffset >
partialLakeOffset
+ assertThat(remoteLogTablet.allRemoteLogSegments())
+ .allSatisfy(
+ segment ->
+
assertThat(segment.remoteLogEndOffset())
+
.isGreaterThan(partialLakeOffset));
+ });
+
+ // Step 2: Fully update lake log end offset to trigger complete cleanup
+ logTablet.updateLakeLogEndOffset(stageBRemoteLogEndOffset);
+
+ // Wait for complete cleanup - all segments should be deleted
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+ });
+
+ // ==================== Stage D: Dynamic Disable ====================
+ // Dynamically disable data lake
+ coordinatorGateway
+ .alterTable(
+ newAlterTableRequest(
+ tablePath,
+ Collections.singletonMap(
+
ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ false))
+ .get();
+ retry(Duration.ofMinutes(1), () ->
assertThat(logTablet.isDataLakeEnabled()).isFalse());
+
+ // Produce new data after disabling lake (baseOffset = 200 from Stage
A + B)
+ produceRecordsAndWaitRemoteLogCopy(leaderGateway, tb, 200L);
+ retry(
+ Duration.ofMinutes(2),
+ () ->
assertThat(remoteLogTablet.allRemoteLogSegments().size()).isGreaterThan(0));
+
+ // Advance time past TTL (1 hour + buffer)
+ MANUAL_CLOCK.advanceTime(Duration.ofHours(1).plusMinutes(30));
+
+ // Since lake is disabled, expired segments should be deleted directly,
+ // ignoring the lake offset status
+ retry(
+ Duration.ofMinutes(2),
+ () -> {
+
assertThat(remoteLogTablet.allRemoteLogSegments()).isEmpty();
+
assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(Long.MAX_VALUE);
+ });
+ }
+
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_BUCKET_NUMBER, 1);
@@ -213,6 +392,8 @@ public class RemoteLogITCase {
// set a shorter max log time to allow replica shrink from isr. Don't
be too low, otherwise
// normal follower synchronization will also be affected
conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME,
Duration.ofSeconds(5));
+
+ conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON);
return conf;
}
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index c8e26ea7b..fd487a0bc 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -752,6 +752,28 @@ final class ReplicaTest extends ReplicaTestBase {
verifyGetKeyValues(kvTablet, expectedKeyValues);
}
+ @Test
+ void testUpdateIsDataLakeEnabled() throws Exception {
+ Replica logReplica =
+ makeLogReplica(DATA1_PHYSICAL_TABLE_PATH, new
TableBucket(DATA1_TABLE_ID, 1));
+ makeLogReplicaAsLeader(logReplica);
+
+ // initial state should be false
+ assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse();
+
+ // update to true
+ logReplica.updateIsDataLakeEnabled(true);
+ assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue();
+
+ // update with same value should not change anything (no-op)
+ logReplica.updateIsDataLakeEnabled(true);
+ assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isTrue();
+
+ // update to false
+ logReplica.updateIsDataLakeEnabled(false);
+ assertThat(logReplica.getLogTablet().isDataLakeEnabled()).isFalse();
+ }
+
private void makeLogReplicaAsLeader(Replica replica) throws Exception {
makeLeaderReplica(
replica,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index b567eff1f..5055105e9 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.testutils;
+import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableDescriptor;
@@ -75,6 +76,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
@@ -147,9 +149,25 @@ public class RpcMessageTestUtils {
public static AlterTableRequest newAlterTableRequest(
TablePath tablePath,
- List<PbAlterConfig> alterConfigs,
+ Map<String, String> setProperties,
+ List<String> resetProperties,
List<PbAddColumn> addColumns,
boolean ignoreIfExists) {
+ List<PbAlterConfig> alterConfigs = new ArrayList<>();
+ for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(entry.getKey());
+ info.setConfigValue(entry.getValue());
+ info.setOpType(AlterConfigOpType.SET.value());
+ alterConfigs.add(info);
+ }
+ for (String resetProperty : resetProperties) {
+ PbAlterConfig info = new PbAlterConfig();
+ info.setConfigKey(resetProperty);
+ info.setOpType(AlterConfigOpType.DELETE.value());
+ alterConfigs.add(info);
+ }
+
AlterTableRequest request = new AlterTableRequest();
request.addAllConfigChanges(alterConfigs)
.addAllAddColumns(addColumns)