This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 14f0f1283 [server] Support alter database comment and custom
properties (#1172)
14f0f1283 is described below
commit 14f0f1283ec8f45aa1ac532dc98db322c5e9a269
Author: Liebing <[email protected]>
AuthorDate: Thu Mar 19 11:07:27 2026 +0800
[server] Support alter database comment and custom properties (#1172)
---
.../java/org/apache/fluss/client/admin/Admin.java | 19 +++
.../org/apache/fluss/client/admin/FlussAdmin.java | 12 ++
.../fluss/client/utils/ClientRpcMessageUtils.java | 43 +++++
.../fluss/client/admin/FlussAdminITCase.java | 87 ++++++++++
.../org/apache/fluss/metadata/DatabaseChange.java | 178 +++++++++++++++++++++
.../apache/fluss/flink/catalog/FlinkCatalog.java | 51 +++++-
.../fluss/flink/catalog/FlinkCatalogITCase.java | 32 ++++
.../fluss/flink/catalog/FlinkCatalogTest.java | 76 ++++++++-
.../flink/sink/testutils/TestAdminAdapter.java | 7 +
.../org/apache/fluss/rpc/gateway/AdminGateway.java | 10 ++
.../org/apache/fluss/rpc/protocol/ApiKeys.java | 3 +-
fluss-rpc/src/main/proto/FlussApi.proto | 11 ++
.../server/coordinator/CoordinatorService.java | 47 ++++++
.../fluss/server/coordinator/MetadataManager.java | 94 ++++++++++-
.../server/entity/DatabasePropertyChanges.java | 72 +++++++++
.../fluss/server/utils/ServerRpcMessageUtils.java | 34 +++-
.../apache/fluss/server/zk/ZooKeeperClient.java | 7 +
.../fluss/server/zk/data/DatabaseRegistration.java | 8 +
.../server/coordinator/TestCoordinatorGateway.java | 7 +
19 files changed, 786 insertions(+), 12 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index d8fe57660..ea28a246b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -53,6 +53,7 @@ import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.exception.TooManyBucketsException;
import org.apache.fluss.exception.TooManyPartitionsException;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
@@ -137,6 +138,24 @@ public interface Admin extends AutoCloseable {
CompletableFuture<Void> createDatabase(
String databaseName, DatabaseDescriptor databaseDescriptor,
boolean ignoreIfExists);
+ /**
+ * Alter a database with the given {@code databaseChanges}.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code
get()} on returned future.
+ *
+ * <ul>
+ * <li>{@link DatabaseNotExistException} when the database does not
exist and {@code
+ * ignoreIfNotExists} is false.
+ * </ul>
+ *
+ * @param databaseName The name of the database.
+ * @param databaseChanges The database changes.
+ * @param ignoreIfNotExists if it is true, do nothing if database does not
exist. If false,
+ * throw a {@link DatabaseNotExistException}.
+ */
+ CompletableFuture<Void> alterDatabase(
+ String databaseName, List<DatabaseChange> databaseChanges, boolean
ignoreIfNotExists);
+
/**
* Get the database with the given database name asynchronously.
*
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 8f39a6e42..e72b96ef8 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -32,6 +32,7 @@ import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
@@ -53,6 +54,7 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -109,6 +111,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterDatabaseRequest;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -283,6 +286,15 @@ public class FlussAdmin implements Admin {
return gateway.alterTable(request).thenApply(r -> null);
}
+ @Override
+ public CompletableFuture<Void> alterDatabase(
+ String databaseName, List<DatabaseChange> databaseChanges, boolean
ignoreIfNotExists) {
+ TablePath.validateDatabaseName(databaseName);
+ AlterDatabaseRequest request =
+ makeAlterDatabaseRequest(databaseName, databaseChanges,
ignoreIfNotExists);
+ return gateway.alterDatabase(request).thenApply(r -> null);
+ }
+
@Override
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
GetTableInfoRequest request = new GetTableInfoRequest();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index a7dbec47c..4c2e236c5 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -37,6 +37,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
@@ -46,6 +47,7 @@ import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CreatePartitionRequest;
import org.apache.fluss.rpc.messages.DropPartitionRequest;
@@ -420,6 +422,47 @@ public class ClientRpcMessageUtils {
return request;
}
+ public static AlterDatabaseRequest makeAlterDatabaseRequest(
+ String databaseName, List<DatabaseChange> databaseChanges, boolean
ignoreIfNotExists) {
+ AlterDatabaseRequest request = new AlterDatabaseRequest();
+
+ List<PbAlterConfig> pbDatabaseChanges = new
ArrayList<>(databaseChanges.size());
+ String comment = null;
+ for (DatabaseChange databaseChange : databaseChanges) {
+ PbAlterConfig info = new PbAlterConfig();
+ if (databaseChange instanceof DatabaseChange.SetOption) {
+ DatabaseChange.SetOption setOption =
(DatabaseChange.SetOption) databaseChange;
+ info.setConfigKey(setOption.getKey());
+ info.setConfigValue(setOption.getValue());
+ info.setOpType(AlterConfigOpType.SET.value());
+ pbDatabaseChanges.add(info);
+ } else if (databaseChange instanceof DatabaseChange.ResetOption) {
+ DatabaseChange.ResetOption resetOption =
+ (DatabaseChange.ResetOption) databaseChange;
+ info.setConfigKey(resetOption.getKey());
+ info.setOpType(AlterConfigOpType.DELETE.value());
+ pbDatabaseChanges.add(info);
+ } else if (databaseChange instanceof DatabaseChange.UpdateComment)
{
+ DatabaseChange.UpdateComment updateComment =
+ (DatabaseChange.UpdateComment) databaseChange;
+ comment = updateComment.getComment();
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported database change: " +
databaseChange.getClass());
+ }
+ }
+
+ request.addAllConfigChanges(pbDatabaseChanges)
+ .setDatabaseName(databaseName)
+ .setIgnoreIfNotExists(ignoreIfNotExists);
+
+ if (comment != null) {
+ request.setComment(comment);
+ }
+
+ return request;
+ }
+
public static AcquireKvSnapshotLeaseRequest
makeAcquireKvSnapshotLeaseRequest(
String leaseId, Map<TableBucket, Long> snapshotIds, long
leaseDuration) {
AcquireKvSnapshotLeaseRequest request = new
AcquireKvSnapshotLeaseRequest();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index f8ae1dbf6..ad3bfd775 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -56,6 +56,7 @@ import org.apache.fluss.exception.TooManyPartitionsException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
@@ -181,6 +182,92 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.isBetween(timestampBeforeCreate, timestampAfterCreate);
}
+ @Test
+ void testAlterDatabase() throws Exception {
+ // create database
+ String dbName = "test_alter_db";
+ admin.createDatabase(
+ dbName,
+ DatabaseDescriptor.builder()
+ .comment("original comment")
+ .customProperty("key1", "value1")
+ .customProperty("key2", "value2")
+ .build(),
+ false)
+ .get();
+
+ DatabaseInfo databaseInfo = admin.getDatabaseInfo(dbName).get();
+ DatabaseDescriptor existingDescriptor =
databaseInfo.getDatabaseDescriptor();
+
+ // Verify initial state
+ assertThat(existingDescriptor.getComment().get()).isEqualTo("original
comment");
+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key1",
"value1");
+
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key2",
"value2");
+
+ // Alter database: add and modify custom properties
+ List<DatabaseChange> databaseChanges = new ArrayList<>();
+ databaseChanges.add(DatabaseChange.set("key3", "value3"));
+ databaseChanges.add(DatabaseChange.set("key1", "updated_value1"));
+ databaseChanges.add(DatabaseChange.updateComment("updated comment"));
+ admin.alterDatabase(dbName, databaseChanges, false).get();
+
+ // Verify alterations
+ DatabaseInfo alteredDatabaseInfo = admin.getDatabaseInfo(dbName).get();
+ DatabaseDescriptor alteredDescriptor =
alteredDatabaseInfo.getDatabaseDescriptor();
+ assertThat(alteredDescriptor.getComment().get()).isEqualTo("updated
comment");
+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key1",
"updated_value1");
+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key2",
"value2");
+
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key3",
"value3");
+ assertThat(alteredDescriptor.getCustomProperties()).hasSize(3);
+
+ // Alter database: reset a property
+ databaseChanges = new ArrayList<>();
+ databaseChanges.add(DatabaseChange.reset("key2"));
+ admin.alterDatabase(dbName, databaseChanges, false).get();
+
+ // Verify reset
+ DatabaseInfo resetDatabaseInfo = admin.getDatabaseInfo(dbName).get();
+ DatabaseDescriptor resetDescriptor =
resetDatabaseInfo.getDatabaseDescriptor();
+ assertThat(resetDescriptor.getComment().get()).isEqualTo("updated
comment");
+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key1",
"updated_value1");
+
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key3",
"value3");
+
assertThat(resetDescriptor.getCustomProperties()).doesNotContainKey("key2");
+ assertThat(resetDescriptor.getCustomProperties()).hasSize(2);
+
+ // Alter database: reset comment
+ databaseChanges = new ArrayList<>();
+ // Empty string means reset comment
+ databaseChanges.add(DatabaseChange.updateComment(""));
+ admin.alterDatabase(dbName, databaseChanges, false).get();
+
+ // Verify reset
+ DatabaseInfo resetCommentDatabaseInfo =
admin.getDatabaseInfo(dbName).get();
+ DatabaseDescriptor resetCommentDescriptor =
+ resetCommentDatabaseInfo.getDatabaseDescriptor();
+ assertThat(resetCommentDescriptor.getComment()).isEmpty();
+ assertThat(resetCommentDescriptor.getCustomProperties())
+ .containsEntry("key1", "updated_value1");
+
assertThat(resetCommentDescriptor.getCustomProperties()).containsEntry("key3",
"value3");
+
assertThat(resetCommentDescriptor.getCustomProperties()).doesNotContainKey("key2");
+ assertThat(resetCommentDescriptor.getCustomProperties()).hasSize(2);
+
+ // throw exception if database not exist
+ List<DatabaseChange> finalDatabaseChanges = databaseChanges;
+ assertThatThrownBy(
+ () ->
+ admin.alterDatabase(
+ "test_alter_db_not_exist",
+ finalDatabaseChanges,
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessage(String.format("Database %s not exists.",
"test_alter_db_not_exist"));
+
+ // should success if ignore not exist
+ admin.alterDatabase("test_alter_db_not_exist", databaseChanges,
true).get();
+ }
+
@Test
void testGetTableInfoAndSchema() throws Exception {
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java
new file mode 100644
index 000000000..a228e613b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link DatabaseChange} represents the modification of the Fluss Database.
*/
+public interface DatabaseChange {
+
+ static SetOption set(String key, String value) {
+ return new SetOption(key, value);
+ }
+
+ static ResetOption reset(String key) {
+ return new ResetOption(key);
+ }
+
+ static UpdateComment updateComment(String comment) {
+ return new UpdateComment(comment);
+ }
+
+ /**
+ * A database change to set the database option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER DATABASE <database_name> SET '<key>' =
'<value>';
+ * </pre>
+ */
+ class SetOption implements DatabaseChange {
+
+ private final String key;
+ private final String value;
+
+ private SetOption(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /** Returns the Option key to set. */
+ public String getKey() {
+ return key;
+ }
+
+ /** Returns the Option value to set. */
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SetOption)) {
+ return false;
+ }
+ SetOption setOption = (SetOption) o;
+ return Objects.equals(key, setOption.key) && Objects.equals(value,
setOption.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SetOption{" + "key='" + key + '\'' + ", value='" + value +
'\'' + '}';
+ }
+ }
+
+ /**
+ * A database change to reset the database option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER DATABASE <database_name> RESET '<key>'
+ * </pre>
+ */
+ class ResetOption implements DatabaseChange {
+
+ private final String key;
+
+ public ResetOption(String key) {
+ this.key = key;
+ }
+
+ /** Returns the Option key to reset. */
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ResetOption)) {
+ return false;
+ }
+ ResetOption that = (ResetOption) o;
+ return Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+
+ @Override
+ public String toString() {
+ return "ResetOption{" + "key='" + key + '\'' + '}';
+ }
+ }
+
+ /**
+ * A database change to set the database comment.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER DATABASE <database_name> SET COMMENT '<comment>';
+ * </pre>
+ */
+ class UpdateComment implements DatabaseChange {
+
+ private final String comment;
+
+ private UpdateComment(String comment) {
+ this.comment = comment;
+ }
+
+ /** Returns the comment to set. */
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UpdateComment)) {
+ return false;
+ }
+ UpdateComment that = (UpdateComment) o;
+ return Objects.equals(comment, that.comment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(comment);
+ }
+
+ @Override
+ public String toString() {
+ return "UpdateComment{" + "comment='" + comment + '\'' + '}';
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 5d81c5e5e..3cb46aa78 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -29,6 +29,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.procedure.ProcedureManager;
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
@@ -278,9 +279,55 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public void alterDatabase(String databaseName, CatalogDatabase
catalogDatabase, boolean b)
+ public void alterDatabase(
+ String databaseName, CatalogDatabase catalogDatabase, boolean
ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ // Get current database info
+ DatabaseDescriptor currentDescriptor =
+
admin.getDatabaseInfo(databaseName).get().getDatabaseDescriptor();
+
+ List<DatabaseChange> databaseChanges = new ArrayList<>();
+
+ // Check comment changes
+ String oldComment = currentDescriptor.getComment().orElse(null);
+ String newComment = catalogDatabase.getComment();
+ if (!Objects.equals(oldComment, newComment)) {
+ databaseChanges.add(DatabaseChange.updateComment(newComment));
+ }
+
+ // Check custom properties changes
+ Map<String, String> oldProps =
currentDescriptor.getCustomProperties();
+ Map<String, String> newProps = catalogDatabase.getProperties();
+
+ newProps.forEach(
+ (k, v) -> {
+ if (!oldProps.containsKey(k) ||
!oldProps.get(k).equals(v)) {
+ databaseChanges.add(DatabaseChange.set(k, v));
+ }
+ });
+
+ oldProps.keySet()
+ .forEach(
+ (k) -> {
+ if (!newProps.containsKey(k)) {
+
databaseChanges.add(DatabaseChange.reset(k));
+ }
+ });
+
+ admin.alterDatabase(databaseName, databaseChanges,
ignoreIfNotExists).get();
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(getName(),
databaseName);
+ }
+ } else {
+ throw new CatalogException(
+ String.format("Failed to alter database %s in %s",
databaseName, getName()),
+ t);
+ }
+ }
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index af1e2eaaa..8f3ed9d3c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -691,6 +692,37 @@ abstract class FlinkCatalogITCase {
assertThat(databases.toString()).isEqualTo(String.format("[+I[%s]]",
DEFAULT_DB));
}
+ @Test
+ void testAlterDatabase() throws Exception {
+ String dbName = "test_alter_db";
+ // Create database with initial properties
+ tEnv.executeSql(
+ String.format(
+ "create database %s comment 'initial comment' with
('key1' = 'value1', 'key2' = 'value2')",
+ dbName));
+
+ // Verify initial state
+ CatalogDatabase currentDb = catalog.getDatabase(dbName);
+ assertThat(currentDb.getProperties()).containsEntry("key1", "value1");
+ assertThat(currentDb.getProperties()).containsEntry("key2", "value2");
+ assertThat(currentDb.getComment()).isEqualTo("initial comment");
+
+ // Alter database: add new property and update existing property
+ String alterSql1 =
+ "alter database " + dbName + " set ('key3' = 'value3', 'key1'
= 'updated_value1')";
+ tEnv.executeSql(alterSql1);
+
+ // Verify first alteration
+ CatalogDatabase alteredDb1 = catalog.getDatabase(dbName);
+ assertThat(alteredDb1.getProperties()).containsEntry("key1",
"updated_value1");
+ assertThat(alteredDb1.getProperties()).containsEntry("key2", "value2");
+ assertThat(alteredDb1.getProperties()).containsEntry("key3", "value3");
+ assertThat(alteredDb1.getComment()).isEqualTo("initial comment");
+
+ // Drop database for cleanup
+ tEnv.executeSql("drop database " + dbName);
+ }
+
@Test
void testFactoryCannotFindForCreateTemporaryTable() {
// create fluss temporary table is not supported
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 775182b5d..0c8d91918 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -636,9 +636,83 @@ class FlinkCatalogTest {
.isInstanceOf(DatabaseNotExistException.class)
.hasMessage("Database %s does not exist in Catalog %s.",
"unknown", CATALOG_NAME);
assertThatThrownBy(() -> catalog.alterDatabase("db2", null, false))
- .isInstanceOf(UnsupportedOperationException.class);
+ .isInstanceOf(DatabaseNotExistException.class);
assertThat(catalog.getDefaultDatabase()).isEqualTo(DEFAULT_DB);
+ }
+
+ @Test
+ void testAlterDatabase() throws Exception {
+ // Create database with initial properties
+ String dbName = "test_alter_db";
+ Map<String, String> initialProps = new HashMap<>();
+ initialProps.put("key1", "value1");
+ initialProps.put("key2", "value2");
+
+ catalog.createDatabase(dbName, new CatalogDatabaseImpl(initialProps,
null), false);
+
+ // Verify initial state
+ CatalogDatabase currentDb = catalog.getDatabase(dbName);
+ assertThat(currentDb.getProperties()).containsEntry("key1", "value1");
+ assertThat(currentDb.getProperties()).containsEntry("key2", "value2");
+ assertThat(currentDb.getComment()).isNull();
+
+ // Alter database: add new property and update existing property
+ Map<String, String> newProps1 = new
HashMap<>(currentDb.getProperties());
+ newProps1.put("key3", "value3");
+ newProps1.put("key1", "updated_value1");
+
+ CatalogDatabase newDb1 = new CatalogDatabaseImpl(newProps1, null);
+ catalog.alterDatabase(dbName, newDb1, false);
+
+ // Verify first alteration
+ CatalogDatabase alteredDb1 = catalog.getDatabase(dbName);
+ assertThat(alteredDb1.getProperties()).containsEntry("key1",
"updated_value1");
+ assertThat(alteredDb1.getProperties()).containsEntry("key2", "value2");
+ assertThat(alteredDb1.getProperties()).containsEntry("key3", "value3");
+ assertThat(alteredDb1.getComment()).isNull();
+
+ // Alter database: add comment
+ Map<String, String> newProps2 = new
HashMap<>(alteredDb1.getProperties());
+ CatalogDatabase newDb2 = new CatalogDatabaseImpl(newProps2, "test
comment");
+ catalog.alterDatabase(dbName, newDb2, false);
+
+ // Verify comment change
+ CatalogDatabase alteredDb2 = catalog.getDatabase(dbName);
+ assertThat(alteredDb2.getProperties()).containsEntry("key1",
"updated_value1");
+ assertThat(alteredDb2.getProperties()).containsEntry("key2", "value2");
+ assertThat(alteredDb2.getProperties()).containsEntry("key3", "value3");
+ assertThat(alteredDb2.getComment()).isEqualTo("test comment");
+
+ // Alter database: reset a property (remove key2)
+ Map<String, String> newProps3 = new HashMap<>();
+ newProps3.put("key1", "updated_value1");
+ newProps3.put("key3", "value3");
+
+ CatalogDatabase newDb3 = new CatalogDatabaseImpl(newProps3, "test
comment");
+ catalog.alterDatabase(dbName, newDb3, false);
+
+ // Verify reset
+ CatalogDatabase alteredDb3 = catalog.getDatabase(dbName);
+ assertThat(alteredDb3.getProperties()).containsEntry("key1",
"updated_value1");
+ assertThat(alteredDb3.getProperties()).containsEntry("key3", "value3");
+ assertThat(alteredDb3.getProperties()).doesNotContainKey("key2");
+ assertThat(alteredDb3.getComment()).isEqualTo("test comment");
+
+ // Test database not exist
+ assertThatThrownBy(() -> catalog.alterDatabase("non_exist_db", newDb3,
false))
+ .isInstanceOf(DatabaseNotExistException.class)
+ .hasMessage(
+ "Database %s does not exist in Catalog %s.",
"non_exist_db", CATALOG_NAME);
+ // Test with ignoreIfNotExists = true
+ catalog.alterDatabase("non_exist_db", newDb3, true);
+
+ // Clean up
+ catalog.dropDatabase(dbName, false, true);
+ }
+
+ @Test
+ void testCatalogWithNullDefaultDatabase() throws Exception {
// Test catalog with null default database
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
assertThatThrownBy(
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
index c8dbc0dc0..708226545 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
@@ -34,6 +34,7 @@ import org.apache.fluss.cluster.rebalance.RebalanceProgress;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.ConfigEntry;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
@@ -100,6 +101,12 @@ public class TestAdminAdapter implements Admin {
throw new UnsupportedOperationException("Not implemented in
TestAdminAdapter");
}
+ @Override
+ public CompletableFuture<Void> alterDatabase(
+ String databaseName, List<DatabaseChange> databaseChanges, boolean
ignoreIfNotExists) {
+ throw new UnsupportedOperationException("Not implemented in
TestAdminAdapter");
+ }
+
@Override
public CompletableFuture<DatabaseInfo> getDatabaseInfo(String
databaseName) {
throw new UnsupportedOperationException("Not implemented in
TestAdminAdapter");
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
index d8ef3fde6..21d286aa4 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
@@ -23,6 +23,8 @@ import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.AddServerTagResponse;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.AlterTableResponse;
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
@@ -74,6 +76,14 @@ public interface AdminGateway extends AdminReadOnlyGateway {
@RPC(api = ApiKeys.CREATE_DATABASE)
CompletableFuture<CreateDatabaseResponse>
createDatabase(CreateDatabaseRequest request);
+ /**
+ * Alter a database.
+ *
+ * @param request the request to alter a database.
+ */
+ @RPC(api = ApiKeys.ALTER_DATABASE)
+ CompletableFuture<AlterDatabaseResponse>
alterDatabase(AlterDatabaseRequest request);
+
/**
* Drop a database.
*
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index f3fadec64..605967f67 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -101,7 +101,8 @@ public enum ApiKeys {
ACQUIRE_KV_SNAPSHOT_LEASE(1056, 0, 0, PUBLIC),
RELEASE_KV_SNAPSHOT_LEASE(1057, 0, 0, PUBLIC),
DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC),
- GET_TABLE_STATS(1059, 0, 0, PUBLIC);
+ GET_TABLE_STATS(1059, 0, 0, PUBLIC),
+ ALTER_DATABASE(1060, 0, 0, PUBLIC);
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 20b699fb1..db78f65fb 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -64,6 +64,17 @@ message CreateDatabaseRequest {
message CreateDatabaseResponse {
}
+// alter database request and response
+message AlterDatabaseRequest {
+ required string database_name = 1;
+ required bool ignore_if_not_exists = 2;
+ repeated PbAlterConfig config_changes = 3;
+ optional string comment = 4;
+}
+
+message AlterDatabaseResponse {
+}
+
// get table request and response
message GetDatabaseInfoRequest {
required string database_name = 1;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7c6040d8c..83016cb52 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -44,6 +44,7 @@ import org.apache.fluss.fs.FsPath;
import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
@@ -63,6 +64,8 @@ import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.AlterTableResponse;
import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
@@ -152,6 +155,7 @@ import
org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager;
import org.apache.fluss.server.coordinator.producer.ProducerOffsetsManager;
import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
+import org.apache.fluss.server.entity.DatabasePropertyChanges;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -209,6 +213,7 @@ import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAcls
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableConfigChanges;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableSchemaChanges;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toDatabaseChanges;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
import static
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
@@ -365,6 +370,48 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
return CompletableFuture.completedFuture(response);
}
+ @Override
+ public CompletableFuture<AlterDatabaseResponse>
alterDatabase(AlterDatabaseRequest request) {
+ String databaseName = request.getDatabaseName();
+ if (authorizer != null) {
+ authorizer.authorize(
+ currentSession(), OperationType.ALTER,
Resource.database(databaseName));
+ }
+
+ List<DatabaseChange> databaseChanges = toDatabaseChanges(request);
+ DatabasePropertyChanges databasePropertyChanges =
+ toDatabasePropertyChanges(databaseChanges);
+
+ metadataManager.alterDatabaseProperties(
+ databaseName, databasePropertyChanges,
request.isIgnoreIfNotExists());
+
+ return CompletableFuture.completedFuture(new AlterDatabaseResponse());
+ }
+
+ private DatabasePropertyChanges toDatabasePropertyChanges(
+ List<DatabaseChange> databaseChanges) {
+ DatabasePropertyChanges.Builder builder =
DatabasePropertyChanges.builder();
+ if (databaseChanges.isEmpty()) {
+ return builder.build();
+ }
+
+ for (DatabaseChange databaseChange : databaseChanges) {
+ if (databaseChange instanceof DatabaseChange.SetOption) {
+ DatabaseChange.SetOption setOption =
(DatabaseChange.SetOption) databaseChange;
+ builder.setCustomProperty(setOption.getKey(),
setOption.getValue());
+ } else if (databaseChange instanceof DatabaseChange.ResetOption) {
+ DatabaseChange.ResetOption resetOption =
+ (DatabaseChange.ResetOption) databaseChange;
+ builder.resetCustomProperty(resetOption.getKey());
+ } else if (databaseChange instanceof DatabaseChange.UpdateComment)
{
+ DatabaseChange.UpdateComment updateComment =
+ (DatabaseChange.UpdateComment) databaseChange;
+ builder.setComment(updateComment.getComment());
+ }
+ }
+ return builder.build();
+ }
+
@Override
public CompletableFuture<DropDatabaseResponse>
dropDatabase(DropDatabaseRequest request) {
authorizeDatabase(OperationType.DROP, request.getDatabaseName());
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index cc3808d87..43029c3fc 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -47,6 +47,7 @@ import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.DatabasePropertyChanges;
import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.DatabaseRegistration;
@@ -136,8 +137,93 @@ public class MetadataManager {
}
}
+ public void alterDatabaseProperties(
+ String databaseName,
+ DatabasePropertyChanges databasePropertyChanges,
+ boolean ignoreIfNotExists) {
+ try {
+ // Check if database exists
+ if (!databaseExists(databaseName)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new DatabaseNotExistException("Database " + databaseName
+ " not exists.");
+ }
+
+ DatabaseRegistration databaseRegistration =
getDatabaseRegistration(databaseName);
+ DatabaseDescriptor currentDescriptor =
databaseRegistration.toDatabaseDescriptor();
+
+ // Create updated descriptor
+ DatabaseDescriptor newDescriptor =
+ getUpdatedDatabaseDescriptor(currentDescriptor,
databasePropertyChanges);
+
+ if (newDescriptor != null) {
+ // Update the database in ZooKeeper
+ DatabaseRegistration updatedRegistration =
+ databaseRegistration.newProperties(newDescriptor);
+ zookeeperClient.updateDatabase(databaseName,
updatedRegistration);
+ LOG.info("Successfully altered database properties for
database: {}", databaseName);
+ } else {
+ LOG.info(
+ "No properties changed when alter database {}, skip
update.", databaseName);
+ }
+ } catch (Exception e) {
+ if (e instanceof DatabaseNotExistException) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw (DatabaseNotExistException) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new FlussRuntimeException("Failed to alter database: " +
databaseName, e);
+ }
+ }
+ }
+
+ @Nullable
+ private DatabaseDescriptor getUpdatedDatabaseDescriptor(
+ DatabaseDescriptor currentDescriptor, DatabasePropertyChanges
changes) {
+ Map<String, String> newCustomProperties =
+ new HashMap<>(currentDescriptor.getCustomProperties());
+ // set properties
+ newCustomProperties.putAll(changes.customPropertiesToSet);
+ // reset properties
+
newCustomProperties.keySet().removeAll(changes.customPropertiesToReset);
+
+ if (newCustomProperties.equals(currentDescriptor.getCustomProperties())
+ && changes.commentToSet == null) {
+ return null;
+ }
+
+ String newComment;
+ if (changes.commentToSet != null) {
+ // If comment is set to empty string, it means to reset the comment
+ if (changes.commentToSet.isEmpty()) {
+ newComment = null;
+ } else {
+ newComment = changes.commentToSet;
+ }
+ } else {
+ newComment = currentDescriptor.getComment().orElse(null);
+ }
+
+ return DatabaseDescriptor.builder()
+ .customProperties(newCustomProperties)
+ .comment(newComment)
+ .build();
+ }
+
public DatabaseInfo getDatabase(String databaseName) throws
DatabaseNotExistException {
+ DatabaseRegistration databaseReg =
getDatabaseRegistration(databaseName);
+ return new DatabaseInfo(
+ databaseName,
+ databaseReg.toDatabaseDescriptor(),
+ databaseReg.createdTime,
+ databaseReg.modifiedTime);
+ }
+ public DatabaseRegistration getDatabaseRegistration(String databaseName) {
Optional<DatabaseRegistration> optionalDB;
try {
optionalDB = zookeeperClient.getDatabase(databaseName);
@@ -149,13 +235,7 @@ public class MetadataManager {
if (!optionalDB.isPresent()) {
throw new DatabaseNotExistException("Database '" + databaseName +
"' does not exist.");
}
-
- DatabaseRegistration databaseReg = optionalDB.get();
- return new DatabaseInfo(
- databaseName,
- databaseReg.toDatabaseDescriptor(),
- databaseReg.createdTime,
- databaseReg.modifiedTime);
+ return optionalDB.get();
}
public boolean databaseExists(String databaseName) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
b/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
new file mode 100644
index 000000000..01fe96ec7
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.entity;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** To describe the changes of the properties of a database. */
+public class DatabasePropertyChanges {
+
+ public final Map<String, String> customPropertiesToSet;
+ public final Set<String> customPropertiesToReset;
+
+ public final String commentToSet;
+
+ protected DatabasePropertyChanges(
+ Map<String, String> customPropertiesToSet,
+ Set<String> customPropertiesToReset,
+ @Nullable String commentToSet) {
+ this.customPropertiesToSet = customPropertiesToSet;
+ this.customPropertiesToReset = customPropertiesToReset;
+ this.commentToSet = commentToSet;
+ }
+
+ public static DatabasePropertyChanges.Builder builder() {
+ return new DatabasePropertyChanges.Builder();
+ }
+
+ /** The builder for {@link DatabasePropertyChanges}. */
+ public static class Builder {
+ private final Map<String, String> customPropertiesToSet = new
HashMap<>();
+ private final Set<String> customPropertiesToReset = new HashSet<>();
+
+ private String commentToSet = null;
+
+ public void setCustomProperty(String key, String value) {
+ customPropertiesToSet.put(key, value);
+ }
+
+ public void resetCustomProperty(String key) {
+ customPropertiesToReset.add(key);
+ }
+
+ public void setComment(String comment) {
+ this.commentToSet = comment;
+ }
+
+ public DatabasePropertyChanges build() {
+ return new DatabasePropertyChanges(
+ customPropertiesToSet, customPropertiesToReset,
commentToSet);
+ }
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 6f5672ff4..bb48bbec1 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -30,6 +30,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
@@ -62,6 +63,7 @@ import
org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -292,7 +294,7 @@ public class ServerRpcMessageUtils {
case SUBTRACT:
default:
throw new IllegalArgumentException(
- "Unsupported alter configs op type " +
pbAlterConfig.getOpType());
+ "Unsupported alter table configs op type " +
pbAlterConfig.getOpType());
}
}
@@ -303,6 +305,36 @@ public class ServerRpcMessageUtils {
.collect(Collectors.toList());
}
+ private static DatabaseChange toDatabaseChange(PbAlterConfig
pbAlterConfig) {
+ AlterConfigOpType opType =
AlterConfigOpType.from(pbAlterConfig.getOpType());
+ String configKey = pbAlterConfig.getConfigKey();
+ switch (opType) {
+ case SET: // SET_OPTION or SET_COMMENT
+ return DatabaseChange.set(configKey,
pbAlterConfig.getConfigValue());
+ case DELETE: // RESET_OPTION or RESET_COMMENT
+ return DatabaseChange.reset(configKey);
+ case APPEND:
+ case SUBTRACT:
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported alter database configs op type " +
pbAlterConfig.getOpType());
+ }
+ }
+
+ public static List<DatabaseChange> toDatabaseChanges(AlterDatabaseRequest
request) {
+ List<DatabaseChange> databaseChanges =
+ request.getConfigChangesList().stream()
+ .filter(Objects::nonNull)
+ .map(ServerRpcMessageUtils::toDatabaseChange)
+ .collect(Collectors.toList());
+
+ if (request.hasComment()) {
+
databaseChanges.add(DatabaseChange.updateComment(request.getComment()));
+ }
+
+ return databaseChanges;
+ }
+
public static List<TableChange>
toAlterTableSchemaChanges(AlterTableRequest request) {
List<TableChange> alterTableSchemaChanges = new ArrayList<>();
alterTableSchemaChanges.addAll(toAddColumns(request.getAddColumnsList()));
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index e6efb053e..037e06479 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -472,6 +472,13 @@ public class ZooKeeperClient implements AutoCloseable {
LOG.info("Registered database {}", database);
}
+ public void updateDatabase(String database, DatabaseRegistration
databaseRegistration)
+ throws Exception {
+ String path = DatabaseZNode.path(database);
+ zkClient.setData().forPath(path,
DatabaseZNode.encode(databaseRegistration));
+ LOG.info("Updated database {}", database);
+ }
+
/** Get the database in ZK. */
public Optional<DatabaseRegistration> getDatabase(String database) throws
Exception {
String path = DatabaseZNode.path(database);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
index 89d2c69d2..9986c7eb5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
@@ -54,6 +54,14 @@ public class DatabaseRegistration {
return builder.build();
}
+ public DatabaseRegistration newProperties(DatabaseDescriptor
databaseDescriptor) {
+ return new DatabaseRegistration(
+ databaseDescriptor.getComment().orElse(null),
+ databaseDescriptor.getCustomProperties(),
+ createdTime,
+ System.currentTimeMillis());
+ }
+
public static DatabaseRegistration of(DatabaseDescriptor
databaseDescriptor) {
final long currentMillis = System.currentTimeMillis();
return new DatabaseRegistration(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 6aeee7137..6fc51fc44 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -30,6 +30,8 @@ import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
import org.apache.fluss.rpc.messages.AlterTableRequest;
import org.apache.fluss.rpc.messages.AlterTableResponse;
import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -162,6 +164,11 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<AlterDatabaseResponse>
alterDatabase(AlterDatabaseRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public CompletableFuture<DropDatabaseResponse>
dropDatabase(DropDatabaseRequest request) {
throw new UnsupportedOperationException();