luoyuxia commented on code in PR #1625:
URL: https://github.com/apache/fluss/pull/1625#discussion_r2370978665
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -235,6 +236,25 @@ CompletableFuture<Void> createTable(
*/
CompletableFuture<List<String>> listTables(String databaseName);
+ /**
+ * Alter a table.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code
get()} on returned future.
+ *
+ * <ul>
+ * <li>{@link DatabaseNotExistException} when the database does not
exist.
+ * <li>{@link TableNotExistException} when the table does not exist, if
ignoreIfNotExists is
Review Comment:
```suggestion
* <li>{@link TableNotExistException} when the table does not exist,
and ignoreIfNotExists is
```
##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+ SET(1),
Review Comment:
as the
[FIP](https://cwiki.apache.org/confluence/display/FLUSS/FIP-15%3A+Alter+Table+Interface)
said:
```
// SET=0, DELETE=1, APPEND=2, SUBTRACT=3
```
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -235,6 +236,25 @@ CompletableFuture<Void> createTable(
*/
CompletableFuture<List<String>> listTables(String databaseName);
+ /**
+ * Alter a table.
Review Comment:
nit:
```suggestion
* Alter a table with the given {@code tableChanges}.
```
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -33,9 +35,15 @@ public class FlussConfigUtils {
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
+ public static final List<String> ALTERABLE_TABLE_CONFIG;
+ public static final List<String> ALTERABLE_CLIENT_OPTIONS;
+
static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
+ ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+ ALTERABLE_CLIENT_OPTIONS =
Review Comment:
why add the option in here. IIUC, this pr is just introduce an interface to
allow client to alter table property, but what properties can be changed
shouldn't happend in this pr.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java:
##########
@@ -398,11 +402,39 @@ public void createTable(ObjectPath objectPath,
CatalogBaseTable table, boolean i
}
@Override
- public void alterTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b)
+ public void alterTable(
+ ObjectPath objectPath,
+ CatalogBaseTable newTable,
+ List<org.apache.flink.table.catalog.TableChange> tableChanges,
+ boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ TablePath tablePath = toTablePath(objectPath);
+
+ List<TableChange> flussTableChanges =
+ tableChanges.stream()
+ .filter(Objects::nonNull)
+
.map(FlinkTableChangeToFlussTableChange::toFlussTableChange)
+ .collect(Collectors.toList());
+ try {
+ admin.alterTable(tablePath, flussTableChanges,
ignoreIfNotExists).get();
+ } catch (Exception e) {
+ Throwable t = ExceptionUtils.stripExecutionException(e);
+ if (CatalogExceptionUtils.isTableNotExist(t)) {
+ throw new TableNotExistException(getName(), objectPath);
+ } else if (isTableInvalid(t)) {
+ throw new InvalidTableException(t.getMessage());
+ } else {
+ throw new CatalogException(
+ String.format("Failed to create table %s in %s",
objectPath, getName()), t);
+ }
+ }
}
+ @Override
+ public void alterTable(
Review Comment:
why overwrite this method with empty body?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -302,6 +306,101 @@ public long createTable(
"Fail to create table " + tablePath);
}
+ public void alterTableProperties(
+ TablePath tablePath,
+ List<TableChange.SetOption> setOptions,
+ List<TableChange.ResetOption> resetOptions,
+ boolean ignoreIfNotExists) {
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(
+ "Database " + tablePath.getDatabaseName() + " does not
exist.");
+ }
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ }
+ }
+
+ try {
+ TableRegistration updatedTableRegistration =
+ getUpdatedTableRegistration(tablePath, setOptions,
resetOptions);
+ if (updatedTableRegistration != null) {
+ zookeeperClient.updateTable(tablePath,
updatedTableRegistration);
+ } else {
+ LOG.info(
+ "No properties changed when alter table {}, skip
update table.", tablePath);
+ }
+ } catch (Exception e) {
+ if (e instanceof KeeperException.NoNodeException) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ } else {
+ throw new FlussRuntimeException("Failed to alter table: " +
tablePath, e);
+ }
+ }
+ }
+
+ private TableRegistration getUpdatedTableRegistration(
+ TablePath tablePath,
+ List<TableChange.SetOption> setOptions,
+ List<TableChange.ResetOption> resetOptions) {
+
+ TableRegistration existTableReg = getTableRegistration(tablePath);
+
+ Map<String, String> newProperties = new
HashMap<>(existTableReg.properties);
+ Map<String, String> newCustomProperties = new
HashMap<>(existTableReg.customProperties);
+
+ boolean propertiesChanged = false;
+ boolean customPropertiesChanged = false;
+ for (TableChange.SetOption setOption : setOptions) {
+ String key = setOption.getKey();
+ if (ALTERABLE_TABLE_CONFIG.contains(key)) {
+ // only alterable configs can be updated, other properties
keep unchanged.
+ String curValue = newProperties.get(key);
+ String updatedValue = setOption.getValue();
+ if (!updatedValue.equals(curValue)) {
+ propertiesChanged = true;
+ newProperties.put(key, updatedValue);
+ }
+ } else if (ALTERABLE_CLIENT_OPTIONS.contains(key)) {
Review Comment:
Why should we take `ALTERABLE_CLIENT_OPTIONS` as a branch? If the property
key is not in `TABLE_OPTIONS`, then it can be considered as custom options.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -108,6 +108,24 @@ message CreateTableRequest {
message CreateTableResponse {
}
+// alter table request and response
+message AlterTableConfigsRequest {
+ required PbTablePath table_path = 1;
+ required bool ignore_if_not_exists = 2;
+ repeated PbAlterConfigsRequestInfo config_changes = 3;
+}
+
+message PbAlterConfigsRequestInfo {
+ required string config_key = 1;
+ optional string config_value = 2;
+ required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
+}
+
+message AlterTablePropertiesResponse {
Review Comment:
```suggestion
message AlterTableConfigsResponse {
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -294,6 +301,54 @@ public CompletableFuture<CreateTableResponse>
createTable(CreateTableRequest req
return CompletableFuture.completedFuture(new CreateTableResponse());
}
+ @Override
+ public CompletableFuture<AlterTablePropertiesResponse> alterTable(
+ AlterTableConfigsRequest request) {
+ TablePath tablePath = toTablePath(request.getTablePath());
+ tablePath.validate();
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
+ }
+
+ AlterTablePropertiesResponse alterTableResponse = new
AlterTablePropertiesResponse();
+
+ handleFlussTableChanges(
+ tablePath, request.getConfigChangesList(),
request.isIgnoreIfNotExists());
+
+ return CompletableFuture.completedFuture(alterTableResponse);
+ }
+
+ private void handleFlussTableChanges(
+ TablePath tablePath,
+ List<PbAlterConfigsRequestInfo> configsRequestInfos,
+ boolean ignoreIfNotExists) {
+
+ List<TableChange> tableChanges =
+ configsRequestInfos.stream()
+ .filter(Objects::nonNull)
+ .map(ServerRpcMessageUtils::toFlussTableChange)
+ .collect(Collectors.toList());
+
+ List<TableChange.SetOption> setOptions = new ArrayList<>();
+ List<TableChange.ResetOption> resetOptions = new ArrayList<>();
+
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ setOptions.add((TableChange.SetOption) tableChange);
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ resetOptions.add((TableChange.ResetOption) tableChange);
+ }
+ // add more FlussTableChange type
Review Comment:
I suggest define an error type in Errors:
```
INVALID_ALTER_TABLE_EXCEPTION(
--
55, "The alter table is invalid.", InvalidAlterTableException::new);
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -294,6 +301,54 @@ public CompletableFuture<CreateTableResponse>
createTable(CreateTableRequest req
return CompletableFuture.completedFuture(new CreateTableResponse());
}
+ @Override
+ public CompletableFuture<AlterTablePropertiesResponse> alterTable(
+ AlterTableConfigsRequest request) {
+ TablePath tablePath = toTablePath(request.getTablePath());
+ tablePath.validate();
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
+ }
+
+ AlterTablePropertiesResponse alterTableResponse = new
AlterTablePropertiesResponse();
+
+ handleFlussTableChanges(
+ tablePath, request.getConfigChangesList(),
request.isIgnoreIfNotExists());
+
+ return CompletableFuture.completedFuture(alterTableResponse);
+ }
+
+ private void handleFlussTableChanges(
+ TablePath tablePath,
+ List<PbAlterConfigsRequestInfo> configsRequestInfos,
+ boolean ignoreIfNotExists) {
+
+ List<TableChange> tableChanges =
+ configsRequestInfos.stream()
+ .filter(Objects::nonNull)
+ .map(ServerRpcMessageUtils::toFlussTableChange)
+ .collect(Collectors.toList());
+
+ List<TableChange.SetOption> setOptions = new ArrayList<>();
+ List<TableChange.ResetOption> resetOptions = new ArrayList<>();
+
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ setOptions.add((TableChange.SetOption) tableChange);
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ resetOptions.add((TableChange.ResetOption) tableChange);
+ }
+ // add more FlussTableChange type
Review Comment:
I suggest to throw exception if meets any unsupported table change instead
of ignoing it. It'll cause the response of alter statement return suceesss, but
the alter never happens, which is really strange.
##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+ SET(1),
+ DELETE(2),
+ APPEND(3),
+ SUBTRACT(4);
+
+ public final int value;
+
+ AlterTableConfigsOpType(int value) {
+ this.value = value;
+ }
+
+ public static AlterTableConfigsOpType fromInt(int opType) {
+ switch (opType) {
+ case 1:
+ return SET;
+ case 2:
+ return DELETE;
+ case 3:
+ return APPEND;
+ case 4:
+ return SUBTRACT;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported AlterTableConfigsOpType: " + opType);
+ }
+ }
+
+ public int toInt() {
Review Comment:
```suggestion
public int value() {
```
?
##########
fluss-common/src/main/java/org/apache/fluss/metadata/TableChange.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link TableChange} represents the modification of the Fluss Table. */
+public interface TableChange {
Review Comment:
Add public in`PublicEvolving` and `@since` in java doc
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -302,6 +306,101 @@ public long createTable(
"Fail to create table " + tablePath);
}
+ public void alterTableProperties(
+ TablePath tablePath,
+ List<TableChange.SetOption> setOptions,
+ List<TableChange.ResetOption> resetOptions,
+ boolean ignoreIfNotExists) {
+
+ if (!databaseExists(tablePath.getDatabaseName())) {
+ throw new DatabaseNotExistException(
+ "Database " + tablePath.getDatabaseName() + " does not
exist.");
+ }
+ if (!tableExists(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ }
+ }
+
+ try {
+ TableRegistration updatedTableRegistration =
+ getUpdatedTableRegistration(tablePath, setOptions,
resetOptions);
+ if (updatedTableRegistration != null) {
+ zookeeperClient.updateTable(tablePath,
updatedTableRegistration);
+ } else {
+ LOG.info(
+ "No properties changed when alter table {}, skip
update table.", tablePath);
+ }
+ } catch (Exception e) {
+ if (e instanceof KeeperException.NoNodeException) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+ throw new TableNotExistException("Table " + tablePath + " does
not exists.");
+ } else {
+ throw new FlussRuntimeException("Failed to alter table: " +
tablePath, e);
+ }
+ }
+ }
+
+ private TableRegistration getUpdatedTableRegistration(
Review Comment:
@Nullable
Add java doc explain when it'll then null
##########
fluss-common/src/main/java/org/apache/fluss/metadata/AlterTableConfigsOpType.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+/** The operation type of altering table configurations. */
+public enum AlterTableConfigsOpType {
+ SET(1),
+ DELETE(2),
+ APPEND(3),
+ SUBTRACT(4);
+
+ public final int value;
+
+ AlterTableConfigsOpType(int value) {
+ this.value = value;
+ }
+
+ public static AlterTableConfigsOpType fromInt(int opType) {
Review Comment:
not used in this pr, can be removed?
##########
fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java:
##########
@@ -33,9 +35,15 @@ public class FlussConfigUtils {
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
+ public static final List<String> ALTERABLE_TABLE_CONFIG;
+ public static final List<String> ALTERABLE_CLIENT_OPTIONS;
+
static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
+ ALTERABLE_TABLE_CONFIG = Collections.emptyList();
+ ALTERABLE_CLIENT_OPTIONS =
Review Comment:
Also, we don't store client option in table.
##########
fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java:
##########
@@ -238,6 +240,22 @@ public static ServerNode toServerNode(PbServerNode
pbServerNode, ServerType serv
pbServerNode.hasRack() ? pbServerNode.getRack() : null);
}
+ public static TableChange toFlussTableChange(
+ PbAlterConfigsRequestInfo pbAlterConfigsRequestInfo) {
+ switch (pbAlterConfigsRequestInfo.getOpType()) {
+ case 1: // SET_OPTION
Review Comment:
case 0 should be SET_OPTION
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]