wuchong commented on code in PR #2010:
URL: https://github.com/apache/fluss/pull/2010#discussion_r2555097557


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java:
##########
@@ -323,59 +325,86 @@ public long createTable(
 
     public void alterTableProperties(
             TablePath tablePath,
+            @Nullable Integer schemaId,
             List<TableChange> tableChanges,
             TablePropertyChanges tablePropertyChanges,
+            List<TableChange.SchemaChange> schemaChanges,
             boolean ignoreIfNotExists,
             @Nullable LakeCatalog lakeCatalog,
             @Nullable DataLakeFormat dataLakeFormat,
             LakeTableTieringManager lakeTableTieringManager,
             LakeCatalog.Context lakeCatalogContext) {
         try {
+
             // it throws TableNotExistException if the table or database not 
exists
             TableRegistration tableReg = getTableRegistration(tablePath);
             SchemaInfo schemaInfo = getLatestSchema(tablePath);
-            // we can't use MetadataManager#getTable here, because it will add 
the default
+            // we can't use MetadataManager#getTable here, because it willadd 
the default
             // lake options to the table properties, which may cause the 
validation failure
             TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
 
-            // validate the changes
+            // validate the table properties changes
             validateAlterTableProperties(
                     tableInfo,
                     tablePropertyChanges.tableKeysToChange(),
                     tablePropertyChanges.customKeysToChange());
 
             TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
+            // validate the table column changes
+            Schema newSchema = null;
+            if (!schemaChanges.isEmpty()) {
+                SchemaUpdate schemaUpdate = new SchemaUpdate(tableInfo);
+                for (TableChange.SchemaChange schemaChange : schemaChanges) {
+                    schemaUpdate.applySchemaChange(schemaChange);
+                }
+                newSchema = schemaUpdate.getSchema();
+                if (schemaId == null) {
+                    throw new InvalidTableException("Can't alter table schema 
without schema id");
+                }
+            }

Review Comment:
   Let's have a separate method `addColumn` to handle the schema changes. 
Mixing them together makes this logic very complex and error-prone. 



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -915,6 +917,16 @@ message PbAlterConfig {
   required int32 op_type = 3; // SET=0, DELETE=1, APPEND=2, SUBTRACT=3
 }
 
+message PbAlterColumn{
+  required int32 alter_type = 1; // ADD Column=0, DROP COLUMN =1, ALTER 
COLUMN=2
+  optional string modified_column_name = 2;
+  // ColumnPosition
+  optional bytes column_json = 3;
+  optional int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
+  optional string column_after_name = 5;
+}

Review Comment:
   This mixes different alter column messages into one structure which looks 
very complex (don't know which field is required at which type).  
   
   Can we just introduce `PbAddColumn`, `PbRenameColumn`, `PbDropColumn`, 
etc...? This makes each operation clear. And they can be directly put in 
`AlterTableRequest` as `repeated` fields. 



##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -113,6 +113,8 @@ message AlterTableRequest {
   required PbTablePath table_path = 1;
   required bool ignore_if_not_exists = 2;
   repeated PbAlterConfig config_changes = 3;
+  optional int32 schema_id = 4;

Review Comment:
   schema id is not necessary, the lastest schema id should be determined by 
sever. 



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -248,19 +249,44 @@ public CompletableFuture<Void> createTable(
     public CompletableFuture<Void> alterTable(
             TablePath tablePath, List<TableChange> tableChanges, boolean 
ignoreIfNotExists) {
         tablePath.validate();
-        AlterTableRequest request = new AlterTableRequest();
-
-        List<PbAlterConfig> pbFlussTableChanges =
-                tableChanges.stream()
-                        .map(ClientRpcMessageUtils::toPbAlterConfigs)
-                        .collect(Collectors.toList());
-
-        request.addAllConfigChanges(pbFlussTableChanges)
-                .setIgnoreIfNotExists(ignoreIfNotExists)
-                .setTablePath()
-                .setDatabaseName(tablePath.getDatabaseName())
-                .setTableName(tablePath.getTableName());
-        return gateway.alterTable(request).thenApply(r -> null);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        CompletableFuture.runAsync(

Review Comment:
   why run in async mode? We should avoid use `CompletableFuture.runAsync`, 
because it users a global thread pool which is out of management. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to