JingsongLi commented on a change in pull request #12108:
URL: https://github.com/apache/flink/pull/12108#discussion_r426141533



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -328,6 +329,17 @@ public CatalogBaseTable getTable() {
                return Optional.empty();
        }
 
+       public Optional<CatalogPartition> getPartition(ObjectIdentifier 
tableIdentifier, CatalogPartitionSpec partitionSpec) {

Review comment:
       Add comments to public method.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -231,82 +239,102 @@ private Operation convertDropTable(SqlDropTable 
sqlDropTable) {
        private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
                UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
                ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+               Optional<CatalogManager.TableLookupResult> optionalCatalogTable 
= catalogManager.getTable(tableIdentifier);
+               if (!optionalCatalogTable.isPresent() || 
optionalCatalogTable.get().isTemporary()) {
+                       throw new ValidationException(String.format("Table %s 
doesn't exist or is a temporary table.",
+                                       tableIdentifier.toString()));
+               }
+               CatalogBaseTable baseTable = 
optionalCatalogTable.get().getTable();
                if (sqlAlterTable instanceof SqlAlterTableRename) {
                        UnresolvedIdentifier newUnresolvedIdentifier =
                                UnresolvedIdentifier.of(((SqlAlterTableRename) 
sqlAlterTable).fullNewTableName());
                        ObjectIdentifier newTableIdentifier = 
catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
                        return new AlterTableRenameOperation(tableIdentifier, 
newTableIdentifier);
                } else if (sqlAlterTable instanceof SqlAlterTableProperties) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable = catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               CatalogTable originalCatalogTable = 
(CatalogTable) optionalCatalogTable.get().getTable();
-                               Map<String, String> properties = new 
HashMap<>(originalCatalogTable.getOptions());
-                               ((SqlAlterTableProperties) 
sqlAlterTable).getPropertyList().getList().forEach(p ->
-                                       properties.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
-                               CatalogTable catalogTable = new 
CatalogTableImpl(
-                                       originalCatalogTable.getSchema(),
-                                       originalCatalogTable.getPartitionKeys(),
-                                       properties,
-                                       originalCatalogTable.getComment());
-                               return new 
AlterTablePropertiesOperation(tableIdentifier, catalogTable);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                       tableIdentifier.toString()));
-                       }
+                       return convertAlterTableProperties(
+                                       tableIdentifier,
+                                       baseTable,
+                                       (SqlAlterTableProperties) 
sqlAlterTable);
                } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) 
{
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
-                                               .getConstraint();
-                               validateTableConstraint(constraint);
-                               TableSchema oriSchema = 
optionalCatalogTable.get().getTable().getSchema();
-                               // Sanity check for constraint.
-                               TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
-                               if (constraint.getConstraintName().isPresent()) 
{
-                                       builder.primaryKey(
-                                                       
constraint.getConstraintName().get(),
-                                                       
constraint.getColumnNames());
-                               } else {
-                                       
builder.primaryKey(constraint.getColumnNames());
-                               }
-                               builder.build();
-                               return new AlterTableAddConstraintOperation(
-                                               tableIdentifier,
-                                               
constraint.getConstraintName().orElse(null),
+                       SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
+                                       .getConstraint();
+                       validateTableConstraint(constraint);
+                       TableSchema oriSchema = baseTable.getSchema();
+                       // Sanity check for constraint.
+                       TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
+                       if (constraint.getConstraintName().isPresent()) {
+                               builder.primaryKey(
+                                               
constraint.getConstraintName().get(),
                                                constraint.getColumnNames());
                        } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                               builder.primaryKey(constraint.getColumnNames());
                        }
+                       builder.build();
+                       return new AlterTableAddConstraintOperation(
+                                       tableIdentifier,
+                                       
constraint.getConstraintName().orElse(null),
+                                       constraint.getColumnNames());
                } else if (sqlAlterTable instanceof 
SqlAlterTableDropConstraint) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
-                               String constraintName = 
dropConstraint.getConstraintName().getSimple();
-                               CatalogTable oriCatalogTable = (CatalogTable) 
optionalCatalogTable.get().getTable();
-                               TableSchema oriSchema = 
oriCatalogTable.getSchema();
-                               if (!oriSchema.getPrimaryKey()
-                                               .filter(pk -> 
pk.getName().equals(constraintName))
-                                               .isPresent()) {
-                                       throw new ValidationException(
-                                                       
String.format("CONSTRAINT [%s] does not exist", constraintName));
-                               }
-                               return new AlterTableDropConstraintOperation(
-                                               tableIdentifier,
-                                               constraintName);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                       SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
+                       String constraintName = 
dropConstraint.getConstraintName().getSimple();
+                       TableSchema oriSchema = baseTable.getSchema();
+                       if (!oriSchema.getPrimaryKey()
+                                       .filter(pk -> 
pk.getName().equals(constraintName))
+                                       .isPresent()) {
+                               throw new ValidationException(
+                                               String.format("CONSTRAINT [%s] 
does not exist", constraintName));
                        }
+                       return new AlterTableDropConstraintOperation(
+                                       tableIdentifier,
+                                       constraintName);
+               } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+                       return OperationConverterUtils.convertAddReplaceColumns(
+                                       tableIdentifier,
+                                       (SqlAddReplaceColumns) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
+               } else if (sqlAlterTable instanceof SqlChangeColumn) {
+                       return OperationConverterUtils.convertChangeColumn(
+                                       tableIdentifier,
+                                       (SqlChangeColumn) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
                } else {
                        throw new ValidationException(
                                        String.format("[%s] needs to implement",
                                                        
sqlAlterTable.toSqlString(CalciteSqlDialect.DEFAULT)));
                }
        }
 
+       private Operation convertAlterTableProperties(ObjectIdentifier 
tableIdentifier, CatalogBaseTable baseTable,
+                       SqlAlterTableProperties alterTableProperties) {
+               LinkedHashMap<String, String> partitionKVs = 
alterTableProperties.getPartitionKVs();
+               // it's altering partitions
+               if (partitionKVs != null) {
+                       CatalogPartitionSpec partitionSpec = new 
CatalogPartitionSpec(partitionKVs);
+                       CatalogPartition catalogPartition = 
catalogManager.getPartition(tableIdentifier, partitionSpec)
+                                       .orElseThrow(() -> new 
ValidationException(String.format("Partition %s of table %s doesn't exist",
+                                                       
partitionSpec.getPartitionSpec(), tableIdentifier)));
+                       Map<String, String> props = 
catalogPartition.getProperties();
+                       
alterTableProperties.getPropertyList().getList().forEach(p ->
+                                       props.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
+                       return new 
AlterPartitionPropertiesOperation(tableIdentifier, partitionSpec, 
catalogPartition);
+               }
+               // it's altering a table
+               if (baseTable instanceof CatalogTable) {
+                       CatalogTable oldTable = (CatalogTable) baseTable;
+                       Map<String, String> newProperties = new 
HashMap<>(oldTable.getProperties());
+                       
newProperties.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+                       CatalogTable newTable = new CatalogTableImpl(

Review comment:
       `oldTable.copy`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -231,82 +239,102 @@ private Operation convertDropTable(SqlDropTable 
sqlDropTable) {
        private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
                UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
                ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+               Optional<CatalogManager.TableLookupResult> optionalCatalogTable 
= catalogManager.getTable(tableIdentifier);
+               if (!optionalCatalogTable.isPresent() || 
optionalCatalogTable.get().isTemporary()) {
+                       throw new ValidationException(String.format("Table %s 
doesn't exist or is a temporary table.",
+                                       tableIdentifier.toString()));
+               }
+               CatalogBaseTable baseTable = 
optionalCatalogTable.get().getTable();
                if (sqlAlterTable instanceof SqlAlterTableRename) {
                        UnresolvedIdentifier newUnresolvedIdentifier =
                                UnresolvedIdentifier.of(((SqlAlterTableRename) 
sqlAlterTable).fullNewTableName());
                        ObjectIdentifier newTableIdentifier = 
catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
                        return new AlterTableRenameOperation(tableIdentifier, 
newTableIdentifier);
                } else if (sqlAlterTable instanceof SqlAlterTableProperties) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable = catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               CatalogTable originalCatalogTable = 
(CatalogTable) optionalCatalogTable.get().getTable();
-                               Map<String, String> properties = new 
HashMap<>(originalCatalogTable.getOptions());
-                               ((SqlAlterTableProperties) 
sqlAlterTable).getPropertyList().getList().forEach(p ->
-                                       properties.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
-                               CatalogTable catalogTable = new 
CatalogTableImpl(
-                                       originalCatalogTable.getSchema(),
-                                       originalCatalogTable.getPartitionKeys(),
-                                       properties,
-                                       originalCatalogTable.getComment());
-                               return new 
AlterTablePropertiesOperation(tableIdentifier, catalogTable);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                       tableIdentifier.toString()));
-                       }
+                       return convertAlterTableProperties(
+                                       tableIdentifier,
+                                       baseTable,
+                                       (SqlAlterTableProperties) 
sqlAlterTable);
                } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) 
{
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
-                                               .getConstraint();
-                               validateTableConstraint(constraint);
-                               TableSchema oriSchema = 
optionalCatalogTable.get().getTable().getSchema();
-                               // Sanity check for constraint.
-                               TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
-                               if (constraint.getConstraintName().isPresent()) 
{
-                                       builder.primaryKey(
-                                                       
constraint.getConstraintName().get(),
-                                                       
constraint.getColumnNames());
-                               } else {
-                                       
builder.primaryKey(constraint.getColumnNames());
-                               }
-                               builder.build();
-                               return new AlterTableAddConstraintOperation(
-                                               tableIdentifier,
-                                               
constraint.getConstraintName().orElse(null),
+                       SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
+                                       .getConstraint();
+                       validateTableConstraint(constraint);
+                       TableSchema oriSchema = baseTable.getSchema();
+                       // Sanity check for constraint.
+                       TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
+                       if (constraint.getConstraintName().isPresent()) {
+                               builder.primaryKey(
+                                               
constraint.getConstraintName().get(),
                                                constraint.getColumnNames());
                        } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                               builder.primaryKey(constraint.getColumnNames());
                        }
+                       builder.build();
+                       return new AlterTableAddConstraintOperation(
+                                       tableIdentifier,
+                                       
constraint.getConstraintName().orElse(null),
+                                       constraint.getColumnNames());
                } else if (sqlAlterTable instanceof 
SqlAlterTableDropConstraint) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
-                               String constraintName = 
dropConstraint.getConstraintName().getSimple();
-                               CatalogTable oriCatalogTable = (CatalogTable) 
optionalCatalogTable.get().getTable();
-                               TableSchema oriSchema = 
oriCatalogTable.getSchema();
-                               if (!oriSchema.getPrimaryKey()
-                                               .filter(pk -> 
pk.getName().equals(constraintName))
-                                               .isPresent()) {
-                                       throw new ValidationException(
-                                                       
String.format("CONSTRAINT [%s] does not exist", constraintName));
-                               }
-                               return new AlterTableDropConstraintOperation(
-                                               tableIdentifier,
-                                               constraintName);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                       SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
+                       String constraintName = 
dropConstraint.getConstraintName().getSimple();
+                       TableSchema oriSchema = baseTable.getSchema();
+                       if (!oriSchema.getPrimaryKey()
+                                       .filter(pk -> 
pk.getName().equals(constraintName))
+                                       .isPresent()) {
+                               throw new ValidationException(
+                                               String.format("CONSTRAINT [%s] 
does not exist", constraintName));
                        }
+                       return new AlterTableDropConstraintOperation(
+                                       tableIdentifier,
+                                       constraintName);
+               } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+                       return OperationConverterUtils.convertAddReplaceColumns(
+                                       tableIdentifier,
+                                       (SqlAddReplaceColumns) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
+               } else if (sqlAlterTable instanceof SqlChangeColumn) {
+                       return OperationConverterUtils.convertChangeColumn(
+                                       tableIdentifier,
+                                       (SqlChangeColumn) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
                } else {
                        throw new ValidationException(
                                        String.format("[%s] needs to implement",
                                                        
sqlAlterTable.toSqlString(CalciteSqlDialect.DEFAULT)));
                }
        }
 
+       private Operation convertAlterTableProperties(ObjectIdentifier 
tableIdentifier, CatalogBaseTable baseTable,
+                       SqlAlterTableProperties alterTableProperties) {
+               LinkedHashMap<String, String> partitionKVs = 
alterTableProperties.getPartitionKVs();
+               // it's altering partitions
+               if (partitionKVs != null) {
+                       CatalogPartitionSpec partitionSpec = new 
CatalogPartitionSpec(partitionKVs);
+                       CatalogPartition catalogPartition = 
catalogManager.getPartition(tableIdentifier, partitionSpec)
+                                       .orElseThrow(() -> new 
ValidationException(String.format("Partition %s of table %s doesn't exist",
+                                                       
partitionSpec.getPartitionSpec(), tableIdentifier)));
+                       Map<String, String> props = 
catalogPartition.getProperties();
+                       
alterTableProperties.getPropertyList().getList().forEach(p ->
+                                       props.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
+                       return new 
AlterPartitionPropertiesOperation(tableIdentifier, partitionSpec, 
catalogPartition);
+               }
+               // it's altering a table
+               if (baseTable instanceof CatalogTable) {

Review comment:
       else if

##########
File path: 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionUtils.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.util.NlsString;
+
+import java.util.LinkedHashMap;
+
+/**
+ * Utils methods for partition DDLs.
+ */
+public class SqlPartitionUtils {
+
+       private SqlPartitionUtils() {
+       }
+
+       /** Get static partition key value pair as strings.
+        *
+        * <p>For character literals we return the unquoted and unescaped 
values.
+        * For other types we use {@link SqlLiteral#toString()} to get
+        * the string format of the value literal.
+        *
+        * @return the mapping of column names to values of partition 
specifications,
+        * returns an empty map if there is no partition specifications.
+        */
+       public static LinkedHashMap<String, String> getPartitionKVs(SqlNodeList 
partitionSpec) {

Review comment:
       `getPartitionSpec`?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -231,82 +239,102 @@ private Operation convertDropTable(SqlDropTable 
sqlDropTable) {
        private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
                UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
                ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+               Optional<CatalogManager.TableLookupResult> optionalCatalogTable 
= catalogManager.getTable(tableIdentifier);
+               if (!optionalCatalogTable.isPresent() || 
optionalCatalogTable.get().isTemporary()) {
+                       throw new ValidationException(String.format("Table %s 
doesn't exist or is a temporary table.",
+                                       tableIdentifier.toString()));
+               }
+               CatalogBaseTable baseTable = 
optionalCatalogTable.get().getTable();
                if (sqlAlterTable instanceof SqlAlterTableRename) {
                        UnresolvedIdentifier newUnresolvedIdentifier =
                                UnresolvedIdentifier.of(((SqlAlterTableRename) 
sqlAlterTable).fullNewTableName());
                        ObjectIdentifier newTableIdentifier = 
catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
                        return new AlterTableRenameOperation(tableIdentifier, 
newTableIdentifier);
                } else if (sqlAlterTable instanceof SqlAlterTableProperties) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable = catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               CatalogTable originalCatalogTable = 
(CatalogTable) optionalCatalogTable.get().getTable();
-                               Map<String, String> properties = new 
HashMap<>(originalCatalogTable.getOptions());
-                               ((SqlAlterTableProperties) 
sqlAlterTable).getPropertyList().getList().forEach(p ->
-                                       properties.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
-                               CatalogTable catalogTable = new 
CatalogTableImpl(
-                                       originalCatalogTable.getSchema(),
-                                       originalCatalogTable.getPartitionKeys(),
-                                       properties,
-                                       originalCatalogTable.getComment());
-                               return new 
AlterTablePropertiesOperation(tableIdentifier, catalogTable);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                       tableIdentifier.toString()));
-                       }
+                       return convertAlterTableProperties(
+                                       tableIdentifier,
+                                       baseTable,
+                                       (SqlAlterTableProperties) 
sqlAlterTable);
                } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) 
{
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
-                                               .getConstraint();
-                               validateTableConstraint(constraint);
-                               TableSchema oriSchema = 
optionalCatalogTable.get().getTable().getSchema();
-                               // Sanity check for constraint.
-                               TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
-                               if (constraint.getConstraintName().isPresent()) 
{
-                                       builder.primaryKey(
-                                                       
constraint.getConstraintName().get(),
-                                                       
constraint.getColumnNames());
-                               } else {
-                                       
builder.primaryKey(constraint.getColumnNames());
-                               }
-                               builder.build();
-                               return new AlterTableAddConstraintOperation(
-                                               tableIdentifier,
-                                               
constraint.getConstraintName().orElse(null),
+                       SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
+                                       .getConstraint();
+                       validateTableConstraint(constraint);
+                       TableSchema oriSchema = baseTable.getSchema();
+                       // Sanity check for constraint.
+                       TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
+                       if (constraint.getConstraintName().isPresent()) {
+                               builder.primaryKey(
+                                               
constraint.getConstraintName().get(),
                                                constraint.getColumnNames());
                        } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                               builder.primaryKey(constraint.getColumnNames());
                        }
+                       builder.build();
+                       return new AlterTableAddConstraintOperation(
+                                       tableIdentifier,
+                                       
constraint.getConstraintName().orElse(null),
+                                       constraint.getColumnNames());
                } else if (sqlAlterTable instanceof 
SqlAlterTableDropConstraint) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
-                               String constraintName = 
dropConstraint.getConstraintName().getSimple();
-                               CatalogTable oriCatalogTable = (CatalogTable) 
optionalCatalogTable.get().getTable();
-                               TableSchema oriSchema = 
oriCatalogTable.getSchema();
-                               if (!oriSchema.getPrimaryKey()
-                                               .filter(pk -> 
pk.getName().equals(constraintName))
-                                               .isPresent()) {
-                                       throw new ValidationException(
-                                                       
String.format("CONSTRAINT [%s] does not exist", constraintName));
-                               }
-                               return new AlterTableDropConstraintOperation(
-                                               tableIdentifier,
-                                               constraintName);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                       SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
+                       String constraintName = 
dropConstraint.getConstraintName().getSimple();
+                       TableSchema oriSchema = baseTable.getSchema();
+                       if (!oriSchema.getPrimaryKey()
+                                       .filter(pk -> 
pk.getName().equals(constraintName))
+                                       .isPresent()) {
+                               throw new ValidationException(
+                                               String.format("CONSTRAINT [%s] 
does not exist", constraintName));
                        }
+                       return new AlterTableDropConstraintOperation(
+                                       tableIdentifier,
+                                       constraintName);
+               } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+                       return OperationConverterUtils.convertAddReplaceColumns(
+                                       tableIdentifier,
+                                       (SqlAddReplaceColumns) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
+               } else if (sqlAlterTable instanceof SqlChangeColumn) {
+                       return OperationConverterUtils.convertChangeColumn(
+                                       tableIdentifier,
+                                       (SqlChangeColumn) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
                } else {
                        throw new ValidationException(
                                        String.format("[%s] needs to implement",
                                                        
sqlAlterTable.toSqlString(CalciteSqlDialect.DEFAULT)));
                }
        }
 
+       private Operation convertAlterTableProperties(ObjectIdentifier 
tableIdentifier, CatalogBaseTable baseTable,
+                       SqlAlterTableProperties alterTableProperties) {
+               LinkedHashMap<String, String> partitionKVs = 
alterTableProperties.getPartitionKVs();
+               // it's altering partitions
+               if (partitionKVs != null) {
+                       CatalogPartitionSpec partitionSpec = new 
CatalogPartitionSpec(partitionKVs);
+                       CatalogPartition catalogPartition = 
catalogManager.getPartition(tableIdentifier, partitionSpec)
+                                       .orElseThrow(() -> new 
ValidationException(String.format("Partition %s of table %s doesn't exist",
+                                                       
partitionSpec.getPartitionSpec(), tableIdentifier)));
+                       Map<String, String> props = 
catalogPartition.getProperties();
+                       
alterTableProperties.getPropertyList().getList().forEach(p ->
+                                       props.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
+                       return new 
AlterPartitionPropertiesOperation(tableIdentifier, partitionSpec, 
catalogPartition);
+               }
+               // it's altering a table
+               if (baseTable instanceof CatalogTable) {
+                       CatalogTable oldTable = (CatalogTable) baseTable;
+                       Map<String, String> newProperties = new 
HashMap<>(oldTable.getProperties());
+                       
newProperties.putAll(OperationConverterUtils.extractProperties(alterTableProperties.getPropertyList()));
+                       CatalogTable newTable = new CatalogTableImpl(
+                                       oldTable.getSchema(),
+                                       oldTable.getPartitionKeys(),
+                                       newProperties,
+                                       oldTable.getComment());
+                       return new 
AlterTablePropertiesOperation(tableIdentifier, newTable);
+               }
+               throw new ValidationException("Unsupported CatalogBaseTable 
type: " + baseTable.getClass().getName());

Review comment:
       else

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utils methods for converting sql to operations.
+ */
+public class OperationConverterUtils {
+
+       private OperationConverterUtils() {
+       }
+
+       public static Operation convertAddReplaceColumns(ObjectIdentifier 
tableIdentifier,
+                       SqlAddReplaceColumns addReplaceColumns, CatalogTable 
catalogTable, SqlValidator sqlValidator) {
+               // verify partitions columns appear last in the schema
+               TableSchema oldSchema = catalogTable.getSchema();
+               int numPartCol = catalogTable.getPartitionKeys().size();

Review comment:
       This is not flink partition style, this is hive partition style

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.sql.parser.ddl.SqlAddReplaceColumns;
+import org.apache.flink.sql.parser.ddl.SqlChangeColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.sql.parser.ddl.SqlTableOption;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.AlterTableSchemaOperation;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utils methods for converting sql to operations.
+ */
+public class OperationConverterUtils {
+
+       private OperationConverterUtils() {
+       }
+
+       public static Operation convertAddReplaceColumns(ObjectIdentifier 
tableIdentifier,
+                       SqlAddReplaceColumns addReplaceColumns, CatalogTable 
catalogTable, SqlValidator sqlValidator) {

Review comment:
       one parameter one line.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
##########
@@ -231,82 +239,102 @@ private Operation convertDropTable(SqlDropTable 
sqlDropTable) {
        private Operation convertAlterTable(SqlAlterTable sqlAlterTable) {
                UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(sqlAlterTable.fullTableName());
                ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+               Optional<CatalogManager.TableLookupResult> optionalCatalogTable 
= catalogManager.getTable(tableIdentifier);
+               if (!optionalCatalogTable.isPresent() || 
optionalCatalogTable.get().isTemporary()) {
+                       throw new ValidationException(String.format("Table %s 
doesn't exist or is a temporary table.",
+                                       tableIdentifier.toString()));
+               }
+               CatalogBaseTable baseTable = 
optionalCatalogTable.get().getTable();
                if (sqlAlterTable instanceof SqlAlterTableRename) {
                        UnresolvedIdentifier newUnresolvedIdentifier =
                                UnresolvedIdentifier.of(((SqlAlterTableRename) 
sqlAlterTable).fullNewTableName());
                        ObjectIdentifier newTableIdentifier = 
catalogManager.qualifyIdentifier(newUnresolvedIdentifier);
                        return new AlterTableRenameOperation(tableIdentifier, 
newTableIdentifier);
                } else if (sqlAlterTable instanceof SqlAlterTableProperties) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable = catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               CatalogTable originalCatalogTable = 
(CatalogTable) optionalCatalogTable.get().getTable();
-                               Map<String, String> properties = new 
HashMap<>(originalCatalogTable.getOptions());
-                               ((SqlAlterTableProperties) 
sqlAlterTable).getPropertyList().getList().forEach(p ->
-                                       properties.put(((SqlTableOption) 
p).getKeyString(), ((SqlTableOption) p).getValueString()));
-                               CatalogTable catalogTable = new 
CatalogTableImpl(
-                                       originalCatalogTable.getSchema(),
-                                       originalCatalogTable.getPartitionKeys(),
-                                       properties,
-                                       originalCatalogTable.getComment());
-                               return new 
AlterTablePropertiesOperation(tableIdentifier, catalogTable);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                       tableIdentifier.toString()));
-                       }
+                       return convertAlterTableProperties(
+                                       tableIdentifier,
+                                       baseTable,
+                                       (SqlAlterTableProperties) 
sqlAlterTable);
                } else if (sqlAlterTable instanceof SqlAlterTableAddConstraint) 
{
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
-                                               .getConstraint();
-                               validateTableConstraint(constraint);
-                               TableSchema oriSchema = 
optionalCatalogTable.get().getTable().getSchema();
-                               // Sanity check for constraint.
-                               TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
-                               if (constraint.getConstraintName().isPresent()) 
{
-                                       builder.primaryKey(
-                                                       
constraint.getConstraintName().get(),
-                                                       
constraint.getColumnNames());
-                               } else {
-                                       
builder.primaryKey(constraint.getColumnNames());
-                               }
-                               builder.build();
-                               return new AlterTableAddConstraintOperation(
-                                               tableIdentifier,
-                                               
constraint.getConstraintName().orElse(null),
+                       SqlTableConstraint constraint = 
((SqlAlterTableAddConstraint) sqlAlterTable)
+                                       .getConstraint();
+                       validateTableConstraint(constraint);
+                       TableSchema oriSchema = baseTable.getSchema();
+                       // Sanity check for constraint.
+                       TableSchema.Builder builder = 
TableSchemaUtils.builderWithGivenSchema(oriSchema);
+                       if (constraint.getConstraintName().isPresent()) {
+                               builder.primaryKey(
+                                               
constraint.getConstraintName().get(),
                                                constraint.getColumnNames());
                        } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                               builder.primaryKey(constraint.getColumnNames());
                        }
+                       builder.build();
+                       return new AlterTableAddConstraintOperation(
+                                       tableIdentifier,
+                                       
constraint.getConstraintName().orElse(null),
+                                       constraint.getColumnNames());
                } else if (sqlAlterTable instanceof 
SqlAlterTableDropConstraint) {
-                       Optional<CatalogManager.TableLookupResult> 
optionalCatalogTable =
-                                       
catalogManager.getTable(tableIdentifier);
-                       if (optionalCatalogTable.isPresent() && 
!optionalCatalogTable.get().isTemporary()) {
-                               SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
-                               String constraintName = 
dropConstraint.getConstraintName().getSimple();
-                               CatalogTable oriCatalogTable = (CatalogTable) 
optionalCatalogTable.get().getTable();
-                               TableSchema oriSchema = 
oriCatalogTable.getSchema();
-                               if (!oriSchema.getPrimaryKey()
-                                               .filter(pk -> 
pk.getName().equals(constraintName))
-                                               .isPresent()) {
-                                       throw new ValidationException(
-                                                       
String.format("CONSTRAINT [%s] does not exist", constraintName));
-                               }
-                               return new AlterTableDropConstraintOperation(
-                                               tableIdentifier,
-                                               constraintName);
-                       } else {
-                               throw new 
ValidationException(String.format("Table %s doesn't exist or is a temporary 
table.",
-                                               tableIdentifier.toString()));
+                       SqlAlterTableDropConstraint dropConstraint = 
((SqlAlterTableDropConstraint) sqlAlterTable);
+                       String constraintName = 
dropConstraint.getConstraintName().getSimple();
+                       TableSchema oriSchema = baseTable.getSchema();
+                       if (!oriSchema.getPrimaryKey()
+                                       .filter(pk -> 
pk.getName().equals(constraintName))
+                                       .isPresent()) {
+                               throw new ValidationException(
+                                               String.format("CONSTRAINT [%s] 
does not exist", constraintName));
                        }
+                       return new AlterTableDropConstraintOperation(
+                                       tableIdentifier,
+                                       constraintName);
+               } else if (sqlAlterTable instanceof SqlAddReplaceColumns) {
+                       return OperationConverterUtils.convertAddReplaceColumns(
+                                       tableIdentifier,
+                                       (SqlAddReplaceColumns) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
+               } else if (sqlAlterTable instanceof SqlChangeColumn) {
+                       return OperationConverterUtils.convertChangeColumn(
+                                       tableIdentifier,
+                                       (SqlChangeColumn) sqlAlterTable,
+                                       (CatalogTable) baseTable,
+                                       flinkPlanner.getOrCreateSqlValidator());
                } else {
                        throw new ValidationException(
                                        String.format("[%s] needs to implement",
                                                        
sqlAlterTable.toSqlString(CalciteSqlDialect.DEFAULT)));
                }
        }
 
+       private Operation convertAlterTableProperties(ObjectIdentifier 
tableIdentifier, CatalogBaseTable baseTable,
+                       SqlAlterTableProperties alterTableProperties) {
+               LinkedHashMap<String, String> partitionKVs = 
alterTableProperties.getPartitionKVs();
+               // it's altering partitions
+               if (partitionKVs != null) {
+                       CatalogPartitionSpec partitionSpec = new 
CatalogPartitionSpec(partitionKVs);
+                       CatalogPartition catalogPartition = 
catalogManager.getPartition(tableIdentifier, partitionSpec)
+                                       .orElseThrow(() -> new 
ValidationException(String.format("Partition %s of table %s doesn't exist",
+                                                       
partitionSpec.getPartitionSpec(), tableIdentifier)));
+                       Map<String, String> props = 
catalogPartition.getProperties();

Review comment:
       Should create a new `CatalogPartition` with a new Map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to