This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 9df7b723e [server] Support alter table properties (#1625)
9df7b723e is described below
commit 9df7b723e930d7d33877c5827d7b4986f7c7b8f2
Author: Yang Guo <[email protected]>
AuthorDate: Wed Sep 24 12:46:33 2025 +0800
[server] Support alter table properties (#1625)
---
.../java/org/apache/fluss/client/admin/Admin.java | 23 ++++
.../org/apache/fluss/client/admin/FlussAdmin.java | 24 ++++
.../utils/FlussTableChangeProtoConverter.java | 44 +++++++
.../fluss/client/admin/FlussAdminITCase.java | 67 +++++++++++
.../org/apache/fluss/config/FlussConfigUtils.java | 5 +
.../exception/InvalidAlterTableException.java | 31 +++++
.../fluss/metadata/AlterTableConfigsOpType.java | 52 ++++++++
.../org/apache/fluss/metadata/TableChange.java | 129 ++++++++++++++++++++
.../apache/fluss/flink/catalog/FlinkCatalog.java | 39 +++++-
.../utils/FlinkTableChangeToFlussTableChange.java | 52 ++++++++
.../fluss/flink/catalog/FlinkCatalogITCase.java | 55 +++++++++
.../fluss/flink/catalog/FlinkCatalogTest.java | 3 -
.../org/apache/fluss/rpc/gateway/AdminGateway.java | 11 ++
.../org/apache/fluss/rpc/protocol/ApiKeys.java | 3 +-
.../java/org/apache/fluss/rpc/protocol/Errors.java | 5 +-
fluss-rpc/src/main/proto/FlussApi.proto | 18 +++
.../server/coordinator/CoordinatorService.java | 76 ++++++++++++
.../fluss/server/coordinator/MetadataManager.java | 134 +++++++++++++++++++++
.../fluss/server/utils/ServerRpcMessageUtils.java | 23 ++++
.../server/utils/TableDescriptorValidation.java | 18 +++
.../fluss/server/zk/data/TableRegistration.java | 14 +++
.../server/coordinator/TableManagerITCase.java | 48 ++++++++
.../server/coordinator/TestCoordinatorGateway.java | 8 ++
.../server/testutils/RpcMessageTestUtils.java | 16 +++
24 files changed, 891 insertions(+), 7 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index baaab3507..d00094acf 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidReplicationFactorException;
@@ -48,6 +49,7 @@ import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -235,6 +237,27 @@ public interface Admin extends AutoCloseable {
*/
CompletableFuture<List<String>> listTables(String databaseName);
+ /**
+ * Alter a table with the given {@code tableChanges}.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code
get()} on returned future.
+ *
+ * <ul>
+ * <li>{@link DatabaseNotExistException} when the database does not
exist.
+ * <li>{@link TableNotExistException} when the table does not exist, and
ignoreIfNotExists is
+ * false.
+ * <li>{@link InvalidAlterTableException} if the alter operation is
invalid, such as alter set
+ * a table option which is not supported to modify currently.
+ * </ul>
+ *
+ * @param tablePath The table path of the table.
+ * @param tableChanges The table changes.
+ * @param ignoreIfNotExists if it is true, do nothing if table does not
exist. If false, throw a
+ * TableNotExistException.
+ */
+ CompletableFuture<Void> alterTable(
+ TablePath tablePath, List<TableChange> tableChanges, boolean
ignoreIfNotExists);
+
/**
* List all partitions in the given table in fluss cluster asynchronously.
*
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 0f9c007aa..d04829acc 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
+import org.apache.fluss.client.utils.FlussTableChangeProtoConverter;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.exception.LeaderNotAvailableException;
@@ -33,6 +34,7 @@ import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -41,6 +43,7 @@ import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -62,6 +65,7 @@ import org.apache.fluss.rpc.messages.ListOffsetsRequest;
import org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.ListTablesResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTablePath;
@@ -81,6 +85,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
import static
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -235,6 +240,25 @@ public class FlussAdmin implements Admin {
return gateway.createTable(request).thenApply(r -> null);
}
+ @Override
+ public CompletableFuture<Void> alterTable(
+ TablePath tablePath, List<TableChange> tableChanges, boolean
ignoreIfNotExists) {
+ tablePath.validate();
+ AlterTableConfigsRequest request = new AlterTableConfigsRequest();
+
+ List<PbAlterConfigsRequestInfo> pbFlussTableChanges =
+ tableChanges.stream()
+
.map(FlussTableChangeProtoConverter::toPbAlterConfigsRequestInfo)
+ .collect(Collectors.toList());
+
+ request.addAllConfigChanges(pbFlussTableChanges)
+ .setIgnoreIfNotExists(ignoreIfNotExists)
+ .setTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ return gateway.alterTableConfigs(request).thenApply(r -> null);
+ }
+
@Override
public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
GetTableInfoRequest request = new GetTableInfoRequest();
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
b/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
new file mode 100644
index 000000000..2a9f59612
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/utils/FlussTableChangeProtoConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
+
+/** Convert {@link TableChange} to proto. */
+public class FlussTableChangeProtoConverter {
+
+ public static PbAlterConfigsRequestInfo
toPbAlterConfigsRequestInfo(TableChange tableChange) {
+ PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+ if (tableChange instanceof TableChange.SetOption) {
+ TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
+ info.setConfigKey(setOption.getKey());
+ info.setConfigValue(setOption.getValue());
+ info.setOpType(AlterTableConfigsOpType.SET.value());
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ TableChange.ResetOption resetOption = (TableChange.ResetOption)
tableChange;
+ info.setConfigKey(resetOption.getKey());
+ info.setOpType(AlterTableConfigsOpType.DELETE.value());
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported table change: " + tableChange.getClass());
+ }
+ return info;
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 0422396c1..d4a117bab 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -55,6 +55,7 @@ import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -197,6 +198,72 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.isBetween(timestampBeforeCreate, timestampAfterCreate);
}
+ @Test
+ void testAlterTable() throws Exception {
+ // create table
+ TablePath tablePath = TablePath.of("test_db", "alter_table_1");
+ admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
+
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+
+ TableDescriptor existingTableDescriptor =
tableInfo.toTableDescriptor();
+ Map<String, String> updateProperties =
+ new HashMap<>(existingTableDescriptor.getProperties());
+ Map<String, String> updateCustomProperties =
+ new HashMap<>(existingTableDescriptor.getCustomProperties());
+ updateCustomProperties.put("client.connect-timeout", "240s");
+
+ TableDescriptor newTableDescriptor =
+ TableDescriptor.builder()
+ .schema(existingTableDescriptor.getSchema())
+
.comment(existingTableDescriptor.getComment().orElse("test table"))
+
.partitionedBy(existingTableDescriptor.getPartitionKeys())
+ .distributedBy(
+ existingTableDescriptor
+ .getTableDistribution()
+ .get()
+ .getBucketCount()
+ .orElse(3),
+ existingTableDescriptor.getBucketKeys())
+ .properties(updateProperties)
+ .customProperties(updateCustomProperties)
+ .build();
+
+ List<TableChange> tableChanges = new ArrayList<>();
+ TableChange tableChange = TableChange.set("client.connect-timeout",
"240s");
+ tableChanges.add(tableChange);
+ // alter table
+ admin.alterTable(tablePath, tableChanges, false).get();
+
+ TableInfo alteredTableInfo = admin.getTableInfo(tablePath).get();
+ TableDescriptor alteredTableDescriptor =
alteredTableInfo.toTableDescriptor();
+ assertThat(alteredTableDescriptor).isEqualTo(newTableDescriptor);
+
+ // throw exception if table not exist
+ assertThatThrownBy(
+ () ->
+ admin.alterTable(
+ TablePath.of("test_db",
"alter_table_not_exist"),
+ tableChanges,
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(TableNotExistException.class);
+
+ // throw exception if database not exist
+ assertThatThrownBy(
+ () ->
+ admin.alterTable(
+ TablePath.of(
+ "test_db_not_exist",
+
"alter_table_not_exist"),
+ tableChanges,
+ false)
+ .get())
+ .cause()
+ .isInstanceOf(DatabaseNotExistException.class);
+ }
+
@Test
void testCreateInvalidDatabaseAndTable() {
assertThatThrownBy(
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 38118397b..18fff5e64 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -21,7 +21,9 @@ import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Utilities of Fluss {@link ConfigOptions}. */
@@ -33,9 +35,12 @@ public class FlussConfigUtils {
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
+ public static final List<String> ALTERABLE_TABLE_CONFIG;
+
static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
+ ALTERABLE_TABLE_CONFIG = Collections.emptyList();
}
@VisibleForTesting
diff --git
a/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
new file mode 100644
index 000000000..411a68d77
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/InvalidAlterTableException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception for invalid alter table operation. */
+public class InvalidAlterTableException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidAlterTableException(String message) {
+ this(message, null);
+ }
+
+ public InvalidAlterTableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
new file mode 100644
index 000000000..9307d38e8
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+ SET(0),
+ DELETE(1),
+ APPEND(2),
+ SUBTRACT(3);
+
+ public final int value;
+
+ AlterTableConfigsOpType(int value) {
+ this.value = value;
+ }
+
+ public static AlterTableConfigsOpType from(int opType) {
+ switch (opType) {
+ case 0:
+ return SET;
+ case 1:
+ return DELETE;
+ case 2:
+ return APPEND;
+ case 3:
+ return SUBTRACT;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported AlterTableConfigsOpType: " + opType);
+ }
+ }
+
+ public int value() {
+ return this.value;
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
new file mode 100644
index 000000000..98e907e5f
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the Fluss Table. */
+public interface TableChange {
+
+ static SetOption set(String key, String value) {
+ return new SetOption(key, value);
+ }
+
+ static ResetOption reset(String key) {
+ return new ResetOption(key);
+ }
+
+ /**
+ * A table change to set the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> SET '<key>' = '<value>';
+ * </pre>
+ */
+ class SetOption implements TableChange {
+
+ private final String key;
+ private final String value;
+
+ private SetOption(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /** Returns the Option key to set. */
+ public String getKey() {
+ return key;
+ }
+
+ /** Returns the Option value to set. */
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SetOption)) {
+ return false;
+ }
+ SetOption setOption = (SetOption) o;
+ return Objects.equals(key, setOption.key) && Objects.equals(value,
setOption.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public String toString() {
+ return "SetOption{" + "key='" + key + '\'' + ", value='" + value +
'\'' + '}';
+ }
+ }
+
+ /**
+ * A table change to reset the table option.
+ *
+ * <p>It is equal to the following statement:
+ *
+ * <pre>
+ * ALTER TABLE <table_name> RESET '<key>'
+ * </pre>
+ */
+ class ResetOption implements TableChange {
+
+ private final String key;
+
+ public ResetOption(String key) {
+ this.key = key;
+ }
+
+ /** Returns the Option key to reset. */
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ResetOption)) {
+ return false;
+ }
+ ResetOption that = (ResetOption) o;
+ return Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key);
+ }
+
+ @Override
+ public String toString() {
+ return "ResetOption{" + "key='" + key + '\'' + '}';
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 82a58bd43..051e7249e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -29,9 +29,11 @@ import org.apache.fluss.flink.procedure.ProcedureManager;
import org.apache.fluss.flink.utils.CatalogExceptionUtils;
import org.apache.fluss.flink.utils.DataLakeUtils;
import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.flink.utils.FlinkTableChangeToFlussTableChange;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.PartitionSpec;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -78,7 +80,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
@@ -398,9 +402,40 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public void alterTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b)
+ public void alterTable(
+ ObjectPath objectPath,
+ CatalogBaseTable newTable,
+ List<org.apache.flink.table.catalog.TableChange> tableChanges,
+ boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ TablePath tablePath = toTablePath(objectPath);
+
+ List<TableChange> flussTableChanges =
+ tableChanges.stream()
+ .filter(Objects::nonNull)
+
.map(FlinkTableChangeToFlussTableChange::toFlussTableChange)
+ .collect(Collectors.toList());
+ try {
+ admin.alterTable(tablePath, flussTableChanges,
ignoreIfNotExists).get();
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isTableNotExist(t)) {
+ throw new TableNotExistException(getName(), objectPath);
+ } else if (isTableInvalid(t)) {
+ throw new InvalidTableException(t.getMessage());
+ } else {
+ throw new CatalogException(
+ String.format("Failed to alter table %s in %s",
objectPath, getName()), t);
+ }
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath objectPath, CatalogBaseTable newTable, boolean
ignoreIfNotExist)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
+ "alterTable(objectPath, newTable, ignoreIfNotExist) method is
not supported, please upgrade your Flink to 1.18+. ");
}
@SuppressWarnings("checkstyle:WhitespaceAround")
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
new file mode 100644
index 000000000..9e3548e53
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkTableChangeToFlussTableChange.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.utils;
+
+import org.apache.fluss.metadata.TableChange;
+
+/** convert Flink's TableChange class to {@link TableChange}. */
+public class FlinkTableChangeToFlussTableChange {
+
+ public static TableChange toFlussTableChange(
+ org.apache.flink.table.catalog.TableChange tableChange) {
+ TableChange flussTableChange;
+ if (tableChange instanceof
org.apache.flink.table.catalog.TableChange.SetOption) {
+ flussTableChange =
+ convertSetOption(
+
(org.apache.flink.table.catalog.TableChange.SetOption) tableChange);
+ } else if (tableChange instanceof
org.apache.flink.table.catalog.TableChange.ResetOption) {
+ flussTableChange =
+ convertResetOption(
+
(org.apache.flink.table.catalog.TableChange.ResetOption) tableChange);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unsupported flink table change: %s.",
tableChange));
+ }
+ return flussTableChange;
+ }
+
+ private static TableChange.SetOption convertSetOption(
+ org.apache.flink.table.catalog.TableChange.SetOption
flinkSetOption) {
+ return TableChange.set(flinkSetOption.getKey(),
flinkSetOption.getValue());
+ }
+
+ private static TableChange.ResetOption convertResetOption(
+ org.apache.flink.table.catalog.TableChange.ResetOption
flinkResetOption) {
+ return TableChange.reset(flinkResetOption.getKey());
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 948ec3635..5ee54ac21 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.catalog;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -183,6 +184,60 @@ abstract class FlinkCatalogITCase {
assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
}
+ @Test
+ void testAlterTable() throws Exception {
+ String ddl =
+ "create table test_alter_table_append_only ("
+ + "a string, "
+ + "b int) "
+ + "with ('bucket.num' = '5')";
+ tEnv.executeSql(ddl);
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("a", DataTypes.STRING()).column("b",
DataTypes.INT());
+ Schema expectedSchema = schemaBuilder.build();
+ CatalogTable table =
+ (CatalogTable)
+ catalog.getTable(
+ new ObjectPath(DEFAULT_DB,
"test_alter_table_append_only"));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+ Map<String, String> expectedOptions = new HashMap<>();
+ expectedOptions.put("bucket.num", "5");
+ assertOptionsEqual(table.getOptions(), expectedOptions);
+
+ // alter table
+ String dml =
+ "alter table test_alter_table_append_only set
('client.connect-timeout' = '240s')";
+ tEnv.executeSql(dml);
+ table =
+ (CatalogTable)
+ catalog.getTable(
+ new ObjectPath(DEFAULT_DB,
"test_alter_table_append_only"));
+ assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema);
+ expectedOptions = new HashMap<>();
+
+ // bucket.num is unchanged, but timeout should change
+ expectedOptions.put("bucket.num", "5");
+ expectedOptions.put("client.connect-timeout", "240s"); // updated
+ assertOptionsEqual(table.getOptions(), expectedOptions);
+
+ // alter table set an unsupported modification option should throw
exception
+ String unSupportedDml1 =
+ "alter table test_alter_table_append_only set
('table.auto-partition.enabled' = 'true')";
+
+ assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml1))
+ .rootCause()
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessage(
+ "The option 'table.auto-partition.enabled' is not
supported to alter set.");
+
+ String unSupportedDml2 =
+ "alter table test_alter_table_append_only set ('k1' = 'v1',
'table.kv.format' = 'indexed')";
+ assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml2))
+ .rootCause()
+ .isInstanceOf(InvalidAlterTableException.class)
+ .hasMessage("The option 'table.kv.format' is not supported to
alter set.");
+ }
+
@Test
void testCreateUnSupportedTable() {
// test invalid property
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index dec2601a2..0f0621cbf 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -256,9 +256,6 @@ class FlinkCatalogTest {
assertThatThrownBy(() -> catalog.renameTable(this.tableInDefaultDb,
"newName", false))
.isInstanceOf(UnsupportedOperationException.class);
- assertThatThrownBy(() -> catalog.alterTable(this.tableInDefaultDb,
null, false))
- .isInstanceOf(UnsupportedOperationException.class);
-
// Test lake table handling - should throw TableNotExistException for
non-existent lake
// table
ObjectPath lakePath = new ObjectPath(DEFAULT_DB, "regularTable$lake");
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
index bda096dfe..1ec13ffd7 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
@@ -17,6 +17,8 @@
package org.apache.fluss.rpc.gateway;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateAclsResponse;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -64,6 +66,15 @@ public interface AdminGateway extends AdminReadOnlyGateway {
@RPC(api = ApiKeys.CREATE_TABLE)
CompletableFuture<CreateTableResponse> createTable(CreateTableRequest
request);
+ /**
+ * Alter a table.
+ *
+ * @param request the request to alter the configs of a table.
+ */
+ @RPC(api = ApiKeys.ALTER_TABLE_CONFIGS)
+ CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+ AlterTableConfigsRequest request);
+
/**
* Drop a table.
*
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index 8526581ae..e39dc1b34 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -71,7 +71,8 @@ public enum ApiKeys {
LIST_ACLS(1040, 0, 0, PUBLIC),
DROP_ACLS(1041, 0, 0, PUBLIC),
LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
- CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
+ CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE),
+ ALTER_TABLE_CONFIGS(1044, 0, 0, PUBLIC);
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 5ec892a69..3b899baee 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -29,6 +29,7 @@ import org.apache.fluss.exception.DuplicateSequenceException;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FencedTieringEpochException;
import org.apache.fluss.exception.IneligibleReplicaException;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -222,7 +223,9 @@ public enum Errors {
INELIGIBLE_REPLICA_EXCEPTION(
55,
"The new ISR contains at least one ineligible replica.",
- IneligibleReplicaException::new);
+ IneligibleReplicaException::new),
+ INVALID_ALTER_TABLE_EXCEPTION(
+ 56, "The alter table is invalid.",
InvalidAlterTableException::new);
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 200b8b520..00797b93a 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -108,6 +108,24 @@ message CreateTableRequest {
message CreateTableResponse {
}
+// alter table request and response
+message AlterTableConfigsRequest {
+ required PbTablePath table_path = 1;
+ required bool ignore_if_not_exists = 2;
+ repeated PbAlterConfigsRequestInfo config_changes = 3;
+}
+
+message PbAlterConfigsRequestInfo {
+ required string config_key = 1;
+ optional string config_value = 2;
+ required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
+}
+
+message AlterTableConfigsResponse {
+}
+
+
+
// get table request and response
message GetTableInfoRequest {
required PbTablePath table_path = 1;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index dd3eb62ab..e944105e3 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -21,6 +21,7 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.TabletServerInfo;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidTableException;
@@ -36,6 +37,7 @@ import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePartition;
@@ -43,6 +45,8 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -71,6 +75,7 @@ import
org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable;
import org.apache.fluss.rpc.netty.server.Session;
@@ -98,6 +103,7 @@ import org.apache.fluss.server.metadata.BucketMetadata;
import org.apache.fluss.server.metadata.PartitionMetadata;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metadata.TableMetadata;
+import org.apache.fluss.server.utils.ServerRpcMessageUtils;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -114,10 +120,13 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
import static
org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
@@ -294,6 +303,73 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
return CompletableFuture.completedFuture(new CreateTableResponse());
}
+ @Override
+ public CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+ AlterTableConfigsRequest request) {
+ TablePath tablePath = toTablePath(request.getTablePath());
+ tablePath.validate();
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
+ }
+
+ AlterTableConfigsResponse alterTableResponse = new
AlterTableConfigsResponse();
+
+ handleFlussTableChanges(
+ tablePath, request.getConfigChangesList(),
request.isIgnoreIfNotExists());
+
+ return CompletableFuture.completedFuture(alterTableResponse);
+ }
+
+ private void handleFlussTableChanges(
+ TablePath tablePath,
+ List<PbAlterConfigsRequestInfo> configsRequestInfos,
+ boolean ignoreIfNotExists) {
+ if (configsRequestInfos.isEmpty()) {
+ return;
+ }
+
+ List<TableChange> tableChanges =
+ configsRequestInfos.stream()
+ .filter(Objects::nonNull)
+ .map(ServerRpcMessageUtils::toFlussTableChange)
+ .collect(Collectors.toList());
+
+ MetadataManager.TablePropertyChanges.Builder propertyChangesBuilder =
+ MetadataManager.TablePropertyChanges.builder();
+
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
+ String optionKey = setOption.getKey();
+ if (TABLE_OPTIONS.containsKey(optionKey)) {
+ // currently, we don't support alter any table options
understand by Fluss
+ throw new InvalidAlterTableException(
+ "The option '" + optionKey + "' is not supported
to alter set.");
+ } else {
+ // otherwise, it's considered as custom property
+ propertyChangesBuilder.setCustomProperty(optionKey,
setOption.getValue());
+ }
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
+ String optionKey = resetOption.getKey();
+ if (TABLE_OPTIONS.containsKey(optionKey)) {
+ // currently, we don't support alter any table options
understand by Fluss
+ throw new InvalidAlterTableException(
+ "The option " + optionKey + " is not supported to
alter reset.");
+ } else {
+ // otherwise, it's considered as custom property
+ propertyChangesBuilder.resetCustomProperty(optionKey);
+ }
+ } else {
+ throw new InvalidAlterTableException(
+ "Unsupported alter table change: " + tableChange);
+ }
+ }
+
+ metadataManager.alterTableProperties(
+ tablePath, propertyChangesBuilder.build(), ignoreIfNotExists);
+ }
+
private TableDescriptor applySystemDefaults(TableDescriptor
tableDescriptor) {
TableDescriptor newDescriptor = tableDescriptor;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index a216a7eb1..1c6a18527 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -57,12 +57,14 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import static
org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
import static
org.apache.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
/** A manager for metadata. */
@@ -302,6 +304,80 @@ public class MetadataManager {
"Fail to create table " + tablePath);
}
+ public void alterTableProperties(
+ TablePath tablePath,
+ TablePropertyChanges tablePropertyChanges,
+ boolean ignoreIfNotExists) {
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(
+ "Database " + tablePath.getDatabaseName() + " does not
exist.");
+ }
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ }
+ }
+
+ try {
+ TableRegistration updatedTableRegistration =
+ getUpdatedTableRegistration(tablePath,
tablePropertyChanges);
+ if (updatedTableRegistration != null) {
+ zookeeperClient.updateTable(tablePath,
updatedTableRegistration);
+ } else {
+ LOG.debug(
+ "No properties changed when alter table {}, skip
update table.", tablePath);
+ }
+ } catch (Exception e) {
+ if (e instanceof KeeperException.NoNodeException) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ } else {
+ throw new FlussRuntimeException("Failed to alter table: " +
tablePath, e);
+ }
+ }
+ }
+
+ /**
+ * Get a new TableRegistration with updated properties.
+ *
+ * @param tablePath the table path.
+ * @param tablePropertyChanges the changes for the table properties
+ * @return the updated TableRegistration, or null if no properties updated.
+ */
+ private @Nullable TableRegistration getUpdatedTableRegistration(
+ TablePath tablePath, TablePropertyChanges tablePropertyChanges) {
+ TableRegistration existTableReg = getTableRegistration(tablePath);
+
+ Map<String, String> newProperties = new
HashMap<>(existTableReg.properties);
+ Map<String, String> newCustomProperties = new
HashMap<>(existTableReg.customProperties);
+
+ // set properties
+ newProperties.putAll(tablePropertyChanges.tablePropertiesToSet);
+ newCustomProperties.putAll(tablePropertyChanges.customPropertiesToSet);
+
+ // reset properties
+ for (String key : tablePropertyChanges.tablePropertiesToReset) {
+ newProperties.remove(key);
+ }
+
+ for (String key : tablePropertyChanges.customPropertiesToReset) {
+ newCustomProperties.remove(key);
+ }
+
+ // no properties change happen
+ if (newProperties.equals(existTableReg.properties)
+ && newCustomProperties.equals(existTableReg.customProperties))
{
+ return null;
+ }
+
+ validateAlterTableProperties(newProperties);
+ return existTableReg.newProperties(newProperties, newCustomProperties);
+ }
+
public TableInfo getTable(TablePath tablePath) throws
TableNotExistException {
Optional<TableRegistration> optionalTable;
try {
@@ -551,4 +627,62 @@ public class MetadataManager {
throw new FlussRuntimeException(errorMsg, e);
}
}
+
+ /** To describe the changes of the properties of a table. */
+ public static class TablePropertyChanges {
+
+ private final Map<String, String> tablePropertiesToSet;
+ private final Set<String> tablePropertiesToReset;
+
+ private final Map<String, String> customPropertiesToSet;
+ private final Set<String> customPropertiesToReset;
+
+ private TablePropertyChanges(
+ Map<String, String> tablePropertiesToSet,
+ Set<String> tablePropertiesToReset,
+ Map<String, String> customPropertiesToSet,
+ Set<String> customPropertiesToReset) {
+ this.tablePropertiesToSet = tablePropertiesToSet;
+ this.tablePropertiesToReset = tablePropertiesToReset;
+ this.customPropertiesToSet = customPropertiesToSet;
+ this.customPropertiesToReset = customPropertiesToReset;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** The builder for {@link TablePropertyChanges}. */
+ public static class Builder {
+ private final Map<String, String> tablePropertiesToSet = new
HashMap<>();
+ private final Set<String> tablePropertiesToReset = new HashSet<>();
+
+ private final Map<String, String> customPropertiesToSet = new
HashMap<>();
+ private final Set<String> customPropertiesToReset = new
HashSet<>();
+
+ public void setTableProperty(String key, String value) {
+ tablePropertiesToSet.put(key, value);
+ }
+
+ public void resetTableProperty(String key) {
+ tablePropertiesToReset.add(key);
+ }
+
+ public void setCustomProperty(String key, String value) {
+ customPropertiesToSet.put(key, value);
+ }
+
+ public void resetCustomProperty(String key) {
+ customPropertiesToReset.add(key);
+ }
+
+ public TablePropertyChanges build() {
+ return new TablePropertyChanges(
+ tablePropertiesToSet,
+ tablePropertiesToReset,
+ customPropertiesToSet,
+ customPropertiesToReset);
+ }
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 88126f266..0769e464f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -23,10 +23,12 @@ import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
import org.apache.fluss.metadata.PartitionSpec;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
@@ -80,6 +82,7 @@ import org.apache.fluss.rpc.messages.PbAdjustIsrReqForBucket;
import org.apache.fluss.rpc.messages.PbAdjustIsrReqForTable;
import org.apache.fluss.rpc.messages.PbAdjustIsrRespForBucket;
import org.apache.fluss.rpc.messages.PbAdjustIsrRespForTable;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
import org.apache.fluss.rpc.messages.PbCreateAclRespInfo;
import org.apache.fluss.rpc.messages.PbDropAclsFilterResult;
@@ -238,6 +241,26 @@ public class ServerRpcMessageUtils {
pbServerNode.hasRack() ? pbServerNode.getRack() : null);
}
+ public static TableChange toFlussTableChange(
+ PbAlterConfigsRequestInfo pbAlterConfigsRequestInfo) {
+ AlterTableConfigsOpType opType =
+
AlterTableConfigsOpType.from(pbAlterConfigsRequestInfo.getOpType());
+ switch (opType) {
+ case SET: // SET_OPTION
+ return TableChange.set(
+ pbAlterConfigsRequestInfo.getConfigKey(),
+ pbAlterConfigsRequestInfo.getConfigValue());
+ case DELETE: // RESET_OPTION
+ return
TableChange.reset(pbAlterConfigsRequestInfo.getConfigKey());
+ case APPEND:
+ case SUBTRACT:
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported alter configs op type "
+ + pbAlterConfigsRequestInfo.getOpType());
+ }
+ }
+
public static MetadataResponse buildMetadataResponse(
@Nullable ServerNode coordinatorServer,
Set<ServerNode> aliveTabletServers,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 1f4cd014a..4c11dc0cf 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -93,6 +94,23 @@ public class TableDescriptorValidation {
checkSystemColumns(schema);
}
+ public static void validateAlterTableProperties(Map<String, String>
properties) {
+ Configuration tableConf = Configuration.fromMap(properties);
+ // check properties should only contain table.* options,
+ // and this cluster know it,
+ // and value is valid
+ for (String key : tableConf.keySet()) {
+ if (!TABLE_OPTIONS.containsKey(key)) {
+ throw new InvalidConfigException(
+ String.format(
+ "'%s' is not a Fluss table property. Please
use '.customProperty(..)' to set custom properties.",
+ key));
+ }
+ ConfigOption<?> option = TABLE_OPTIONS.get(key);
+ validateOptionValue(tableConf, option);
+ }
+ }
+
private static void checkSystemColumns(RowType schema) {
List<String> fieldNames = schema.getFieldNames();
List<String> unsupportedColumns =
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
index bbbecdc83..547326f6a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java
@@ -132,6 +132,20 @@ public class TableRegistration {
currentMillis);
}
+ public TableRegistration newProperties(
+ Map<String, String> newProperties, Map<String, String>
newCustomProperties) {
+ final long currentMillis = System.currentTimeMillis();
+ return new TableRegistration(
+ tableId,
+ comment,
+ partitionKeys,
+ new TableDistribution(bucketCount, bucketKeys),
+ newProperties,
+ newCustomProperties,
+ createdTime,
+ currentMillis);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
index 93352b20c..440ef9a64 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java
@@ -32,6 +32,7 @@ import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.metadata.AlterTableConfigsOpType;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableDescriptor;
@@ -48,6 +49,7 @@ import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
import org.apache.fluss.rpc.messages.ListDatabasesRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.MetadataResponse;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
import org.apache.fluss.rpc.messages.PbPartitionMetadata;
import org.apache.fluss.rpc.messages.PbServerNode;
@@ -86,6 +88,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.fluss.config.ConfigOptions.DEFAULT_LISTENER_NAME;
+import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newAlterTableConfigsRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateDatabaseRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newCreateTableRequest;
import static
org.apache.fluss.server.testutils.RpcMessageTestUtils.newDatabaseExistsRequest;
@@ -284,6 +287,29 @@ class TableManagerITCase {
TableDescriptor gottenTable =
TableDescriptor.fromJsonBytes(response.getTableJson());
assertThat(gottenTable).isEqualTo(tableDescriptor.withReplicationFactor(1));
+ // alter table
+ Map<String, String> setProperties = new HashMap<>();
+ setProperties.put("client.connect-timeout", "240s");
+
+ List<String> resetProperties = new ArrayList<>();
+
+ adminGateway
+ .alterTableConfigs(
+ newAlterTableConfigsRequest(
+ tablePath,
+ alterTableProperties(setProperties,
resetProperties),
+ false))
+ .get();
+ // get the table and check it
+ GetTableInfoResponse responseAfterAlter =
+ gateway.getTableInfo(newGetTableInfoRequest(tablePath)).get();
+ TableDescriptor gottenTableAfterAlter =
+
TableDescriptor.fromJsonBytes(responseAfterAlter.getTableJson());
+
+ String valueAfterAlter =
+
gottenTableAfterAlter.getCustomProperties().get("client.connect-timeout");
+ assertThat(valueAfterAlter).isEqualTo("240s");
+
// check assignment, just check replica numbers, don't care about
actual assignment
checkAssignmentWithReplicaFactor(
zkClient.getTableAssignment(response.getTableId()).get(),
@@ -733,6 +759,28 @@ class TableManagerITCase {
.build();
}
+ private static List<PbAlterConfigsRequestInfo> alterTableProperties(
+ Map<String, String> setProperties, List<String> resetProperties) {
+ List<PbAlterConfigsRequestInfo> res = new ArrayList<>();
+
+ for (Map.Entry<String, String> entry : setProperties.entrySet()) {
+ PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+ info.setConfigKey(entry.getKey());
+ info.setConfigValue(entry.getValue());
+ info.setOpType(AlterTableConfigsOpType.SET.value());
+ res.add(info);
+ }
+
+ for (String resetProperty : resetProperties) {
+ PbAlterConfigsRequestInfo info = new PbAlterConfigsRequestInfo();
+ info.setConfigKey(resetProperty);
+ info.setOpType(AlterTableConfigsOpType.DELETE.value());
+ res.add(info);
+ }
+
+ return res;
+ }
+
private static Schema newPkSchema() {
return Schema.newBuilder()
.column("a", DataTypes.INT())
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 5ef93cd71..15222305f 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -23,6 +23,8 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterTableConfigsResponse;
import org.apache.fluss.rpc.messages.ApiVersionsRequest;
import org.apache.fluss.rpc.messages.ApiVersionsResponse;
import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
@@ -139,6 +141,12 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<AlterTableConfigsResponse> alterTableConfigs(
+ AlterTableConfigsRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public CompletableFuture<DropTableResponse> dropTable(DropTableRequest
request) {
throw new UnsupportedOperationException();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
index a3d6d4492..fa0482b10 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/RpcMessageTestUtils.java
@@ -27,6 +27,7 @@ import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.record.bytesview.MemorySegmentBytesView;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.AlterTableConfigsRequest;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
import org.apache.fluss.rpc.messages.CreatePartitionRequest;
import org.apache.fluss.rpc.messages.CreateTableRequest;
@@ -45,6 +46,7 @@ import
org.apache.fluss.rpc.messages.ListPartitionInfosRequest;
import org.apache.fluss.rpc.messages.ListTablesRequest;
import org.apache.fluss.rpc.messages.LookupRequest;
import org.apache.fluss.rpc.messages.MetadataRequest;
+import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo;
import org.apache.fluss.rpc.messages.PbFetchLogReqForBucket;
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
@@ -139,6 +141,20 @@ public class RpcMessageTestUtils {
return createTableRequest;
}
+ public static AlterTableConfigsRequest newAlterTableConfigsRequest(
+ TablePath tablePath,
+ List<PbAlterConfigsRequestInfo> pbAlterConfigsRequestInfos,
+ boolean ignoreIfExists) {
+ AlterTableConfigsRequest alterTableConfigsRequest = new
AlterTableConfigsRequest();
+ alterTableConfigsRequest
+ .addAllConfigChanges(pbAlterConfigsRequestInfos)
+ .setIgnoreIfNotExists(ignoreIfExists)
+ .setTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ return alterTableConfigsRequest;
+ }
+
public static MetadataRequest newMetadataRequest(List<TablePath>
tablePaths) {
MetadataRequest metadataRequest = new MetadataRequest();
metadataRequest.addAllTablePaths(