Aitozi commented on code in PR #21522:
URL: https://github.com/apache/flink/pull/21522#discussion_r1129605975


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java:
##########
@@ -1966,61 +1982,60 @@ private Operation convertAlterTableModifyCols(
         }
 
         ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
-        CatalogTable oldTable = (CatalogTable) alteredTable;
+        ResolvedCatalogTable oldTable =
+                catalogManager.resolveCatalogTable((CatalogTable) 
alteredTable);
 
         // prepare properties
         Map<String, String> props = new HashMap<>(oldTable.getOptions());
         props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
         if (isCascade) {
             props.put(ALTER_COL_CASCADE, "true");
         }
-        TableSchema oldSchema = oldTable.getSchema();
+        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
         final int numPartCol = oldTable.getPartitionKeys().size();
-        TableSchema.Builder builder = TableSchema.builder();
         // add existing non-part col if we're not replacing
+        List<Column> newColumns = new ArrayList<>();
         if (!replace) {
-            List<TableColumn> nonPartCols =
-                    oldSchema.getTableColumns().subList(0, 
oldSchema.getFieldCount() - numPartCol);
-            for (TableColumn column : nonPartCols) {
-                builder.add(column);
-            }
-            setWatermarkAndPK(builder, oldSchema);
+            List<Column> nonPartCols =
+                    oldSchema.getColumns().subList(0, 
oldSchema.getColumnCount() - numPartCol);
+
+            newColumns.addAll(nonPartCols);
         }
         // add new cols
         for (FieldSchema col : newCols) {
-            builder.add(
-                    TableColumn.physical(
+            newColumns.add(
+                    Column.physical(
                             col.getName(),
                             HiveTypeUtil.toFlinkType(
                                     
TypeInfoUtils.getTypeInfoFromTypeString(col.getType()))));
         }
         // add part cols
-        List<TableColumn> partCols =
+        List<Column> partCols =
                 oldSchema
-                        .getTableColumns()
-                        .subList(oldSchema.getFieldCount() - numPartCol, 
oldSchema.getFieldCount());
-        for (TableColumn column : partCols) {
-            builder.add(column);
+                        .getColumns()
+                        .subList(
+                                oldSchema.getColumnCount() - numPartCol,
+                                oldSchema.getColumnCount());
+        newColumns.addAll(partCols);
+        ResolvedSchema newSchema;
+        if (!replace) {
+            newSchema =
+                    new ResolvedSchema(
+                            newColumns,
+                            oldSchema.getWatermarkSpecs(),
+                            oldSchema.getPrimaryKey().orElse(null));
+        } else {
+            newSchema = ResolvedSchema.of(newColumns);
         }
         return new AlterTableSchemaOperation(
                 tableIdentifier,
-                new CatalogTableImpl(
-                        builder.build(),
-                        oldTable.getPartitionKeys(),
-                        props,
-                        oldTable.getComment()));
-    }
-
-    private static void setWatermarkAndPK(TableSchema.Builder builder, 
TableSchema schema) {
-        for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
-            builder.watermark(watermarkSpec);
-        }
-        schema.getPrimaryKey()
-                .ifPresent(
-                        pk -> {
-                            builder.primaryKey(
-                                    pk.getName(), pk.getColumns().toArray(new 
String[0]));
-                        });
+                new ResolvedCatalogTable(
+                        CatalogTable.of(
+                                
Schema.newBuilder().fromResolvedSchema(newSchema).build(),
+                                oldTable.getComment(),
+                                oldTable.getPartitionKeys(),
+                                props),
+                        newSchema));
     }

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to