lirui-apache commented on a change in pull request #16416:
URL: https://github.com/apache/flink/pull/16416#discussion_r666656070



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1388,23 +1723,41 @@ private Serializable analyzeAlterTableRenameCol(
         }
 
         String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified);
-        return HiveParserAlterTableDesc.changeColumn(
-                tblName,
-                HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName),
-                HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName),
-                newType,
-                newComment,
-                first,
-                flagCol,
-                isCascade);
+
+        ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
+        CatalogBaseTable catalogBaseTable = 
getCatalogBaseTable(tableIdentifier);
+        if (catalogBaseTable instanceof CatalogView) {

Review comment:
       Can we do this check in the outer method so that we don't have to do 
this in each alter table methods?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1441,16 +1853,24 @@ private Serializable analyzeAlterTableDropParts(
 
         validateAlterTableType(tab, 
AlterTableDesc.AlterTableTypes.DROPPARTITION, expectView);
 
-        return new DropPartitionDesc(qualified[0], qualified[1], partSpecs, 
ifExists);
+        ObjectIdentifier tableIdentifier =
+                catalogManager.qualifyIdentifier(
+                        UnresolvedIdentifier.of(qualified[0], qualified[1]));
+        CatalogBaseTable catalogBaseTable = 
getCatalogBaseTable(tableIdentifier);
+        if (catalogBaseTable instanceof CatalogView) {
+            throw new ValidationException("DROP PARTITION for a view is not 
supported");
+        }
+        List<CatalogPartitionSpec> specs =
+                
partSpecs.stream().map(CatalogPartitionSpec::new).collect(Collectors.toList());
+        return new DropPartitionsOperation(tableIdentifier, ifExists, specs);
     }
 
     /**
      * Add one or more partitions to a table. Useful when the data has been 
copied to the right
      * location by some other process.
      */
-    private Serializable analyzeAlterTableAddParts(
-            String[] qualified, CommonTree ast, boolean expectView) throws 
SemanticException {
-
+    private Operation convertAlterTableAddParts(
+            String[] qualified, CommonTree ast, boolean expectView) {

Review comment:
       We don't support partitions for views. So let's remove the `expectView` 
parameter.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -841,25 +1141,36 @@ private Serializable 
analyzeDropDatabase(HiveParserASTNode ast) {
             ifCascade = true;
         }
 
-        return new HiveParserDropDatabaseDesc(dbName, ifExists, ifCascade);
+        return new DropDatabaseOperation(
+                catalogManager.getCurrentCatalog(), dbName, ifExists, 
ifCascade);
     }
 
-    private Serializable analyzeSwitchDatabase(HiveParserASTNode ast) {
+    private Operation convertSwitchDatabase(HiveParserASTNode ast) {
         String dbName =
                 
HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText());
-        SwitchDatabaseDesc switchDatabaseDesc = new SwitchDatabaseDesc(dbName);
-        return new DDLWork(new HashSet<>(), new HashSet<>(), 
switchDatabaseDesc);
+        return new UseDatabaseOperation(catalogManager.getCurrentCatalog(), 
dbName);
     }
 
-    private Serializable analyzeDropTable(HiveParserASTNode ast, TableType 
expectedType) {
+    private Operation convertDropTable(HiveParserASTNode ast, TableType 
expectedType) {
         String tableName =
                 HiveParserBaseSemanticAnalyzer.getUnescapedName(
                         (HiveParserASTNode) ast.getChild(0));
         boolean ifExists = 
(ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null);
 
-        boolean ifPurge = (ast.getFirstChildWithType(HiveASTParser.KW_PURGE) 
!= null);
-        return new HiveParserDropTableDesc(
-                tableName, expectedType == TableType.VIRTUAL_VIEW, ifExists, 
ifPurge);
+        ObjectIdentifier identifier = parseObjectIdentifier(tableName);
+        CatalogBaseTable baseTable = getCatalogBaseTable(identifier, true);
+
+        if (expectedType == TableType.VIRTUAL_VIEW) {
+            if (baseTable instanceof CatalogTable) {
+                throw new ValidationException("DROP VIEW for a table is not 
allowed");
+            }
+            return new DropViewOperation(identifier, ifExists, false);
+        } else {
+            if (baseTable instanceof CatalogView) {
+                throw new ValidationException("DROP TABLE for a view is not 
allowed");
+            }
+            return new DropTableOperation(identifier, ifExists, false);
+        }
     }
 
     private void validateAlterTableType(

Review comment:
       This method can be simplified and only checks whether a table is 
non-native

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1414,10 +1767,69 @@ private Serializable analyzeAlterTableModifyCols(
             isCascade = true;
         }
 
-        return HiveParserAlterTableDesc.addReplaceColumns(tblName, newCols, 
replace, isCascade);
+        ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
+        CatalogBaseTable catalogBaseTable = 
getCatalogBaseTable(tableIdentifier);
+        if (catalogBaseTable instanceof CatalogView) {
+            throw new ValidationException("ALTER TABLE for a view is not 
allowed");
+        }
+        CatalogTable oldTable = (CatalogTable) catalogBaseTable;
+
+        // prepare properties
+        Map<String, String> props = new HashMap<>(oldTable.getOptions());
+        props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
+        if (isCascade) {
+            props.put(ALTER_COL_CASCADE, "true");
+        }
+        TableSchema oldSchema = oldTable.getSchema();
+        final int numPartCol = oldTable.getPartitionKeys().size();
+        TableSchema.Builder builder = TableSchema.builder();
+        // add existing non-part col if we're not replacing
+        if (!replace) {
+            List<TableColumn> nonPartCols =
+                    oldSchema.getTableColumns().subList(0, 
oldSchema.getFieldCount() - numPartCol);
+            for (TableColumn column : nonPartCols) {
+                builder.add(column);
+            }
+            setWatermarkAndPK(builder, oldSchema);
+        }
+        // add new cols
+        for (FieldSchema col : newCols) {
+            builder.add(
+                    TableColumn.physical(
+                            col.getName(),
+                            HiveTypeUtil.toFlinkType(
+                                    
TypeInfoUtils.getTypeInfoFromTypeString(col.getType()))));
+        }
+        // add part cols
+        List<TableColumn> partCols =
+                oldSchema
+                        .getTableColumns()
+                        .subList(oldSchema.getFieldCount() - numPartCol, 
oldSchema.getFieldCount());
+        for (TableColumn column : partCols) {
+            builder.add(column);
+        }
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                new CatalogTableImpl(
+                        builder.build(),
+                        oldTable.getPartitionKeys(),
+                        props,
+                        oldTable.getComment()));
+    }
+
+    private static void setWatermarkAndPK(TableSchema.Builder builder, 
TableSchema schema) {
+        for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
+            builder.watermark(watermarkSpec);
+        }
+        schema.getPrimaryKey()
+                .ifPresent(
+                        pk -> {
+                            builder.primaryKey(
+                                    pk.getName(), pk.getColumns().toArray(new 
String[0]));
+                        });
     }
 
-    private Serializable analyzeAlterTableDropParts(
+    private Operation convertAlterTableDropParts(

Review comment:
       Same here, remove `expectView`

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1495,25 +1920,96 @@ private Serializable analyzeAlterTableAddParts(
 
         // add the last one
         if (currentPart != null) {
-            addPartitionDesc.addPartition(currentPart, currentLocation);
+            specs.add(new CatalogPartitionSpec(currentPart));
+            Map<String, String> props = new HashMap<>();
+            if (currentLocation != null) {
+                props.put(TABLE_LOCATION_URI, currentLocation);
+            }
+            partitions.add(new CatalogPartitionImpl(props, null));
         }
 
-        return new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
+        ObjectIdentifier tableIdentifier =
+                tab.getDbName() == null
+                        ? parseObjectIdentifier(tab.getTableName())
+                        : catalogManager.qualifyIdentifier(
+                                UnresolvedIdentifier.of(tab.getDbName(), 
tab.getTableName()));
+        CatalogBaseTable catalogBaseTable = 
getCatalogBaseTable(tableIdentifier);
+        if (catalogBaseTable instanceof CatalogView) {

Review comment:
       No need to do the check here. If it's a view, we should have thrown 
"alter table for view is not allowed"

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1495,25 +1920,96 @@ private Serializable analyzeAlterTableAddParts(
 
         // add the last one
         if (currentPart != null) {
-            addPartitionDesc.addPartition(currentPart, currentLocation);
+            specs.add(new CatalogPartitionSpec(currentPart));
+            Map<String, String> props = new HashMap<>();
+            if (currentLocation != null) {
+                props.put(TABLE_LOCATION_URI, currentLocation);
+            }
+            partitions.add(new CatalogPartitionImpl(props, null));
         }
 
-        return new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
+        ObjectIdentifier tableIdentifier =
+                tab.getDbName() == null

Review comment:
       we can just get DB and table name from `qualified` parameter?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1285,59 +1647,32 @@ private Serializable 
analyzeShowTables(HiveParserASTNode ast, boolean expectView
      *
      * @param ast The parsed command tree.
      */
-    private Serializable analyzeShowFunctions(HiveParserASTNode ast) {
-        ShowFunctionsDesc showFuncsDesc;
-        if (ast.getChildCount() == 1) {
-            String funcNames =
-                    
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
-            showFuncsDesc = new ShowFunctionsDesc(getResFile(), funcNames);
-        } else if (ast.getChildCount() == 2) {
+    private Operation convertShowFunctions(HiveParserASTNode ast) {
+        if (ast.getChildCount() == 2) {
             assert (ast.getChild(0).getType() == HiveASTParser.KW_LIKE);
             throw new ValidationException("SHOW FUNCTIONS LIKE is not 
supported yet");
-        } else {
-            showFuncsDesc = new ShowFunctionsDesc(getResFile());
-        }
-        return new DDLWork(getInputs(), getOutputs(), showFuncsDesc);
-    }
-
-    /**
-     * Add the task according to the parsed command tree. This is used for the 
CLI command "DESCRIBE
-     * FUNCTION;".
-     *
-     * @param ast The parsed command tree.
-     */
-    private Serializable analyzeDescFunction(HiveParserASTNode ast) {
-        String funcName;
-        boolean isExtended;
-
-        if (ast.getChildCount() == 1) {
-            funcName = 
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
-            isExtended = false;
-        } else if (ast.getChildCount() == 2) {
-            funcName = 
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
-            isExtended = true;
-        } else {
-            throw new ValidationException("Unexpected Tokens at DESCRIBE 
FUNCTION");
         }
-
-        DescFunctionDesc descFuncDesc = new DescFunctionDesc(getResFile(), 
funcName, isExtended);
-        return new DDLWork(getInputs(), getOutputs(), descFuncDesc);
+        return new ShowFunctionsOperation();
     }
 
-    private Serializable analyzeAlterTableRename(
+    private Operation convertAlterTableRename(
             String[] source, HiveParserASTNode ast, boolean expectView) throws 
SemanticException {
         String[] target =
                 HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
                         (HiveParserASTNode) ast.getChild(0));
 
         String sourceName = HiveParserBaseSemanticAnalyzer.getDotName(source);
         String targetName = HiveParserBaseSemanticAnalyzer.getDotName(target);
+        ObjectIdentifier objectIdentifier = parseObjectIdentifier(sourceName);
+        checkAlterTableLegal(objectIdentifier, expectView);
 
-        return HiveParserAlterTableDesc.rename(sourceName, targetName, 
expectView);
+        return expectView
+                ? new AlterViewRenameOperation(objectIdentifier, 
parseObjectIdentifier(targetName))
+                : new AlterTableRenameOperation(
+                        objectIdentifier, parseObjectIdentifier(targetName));
     }
 
-    private Serializable analyzeAlterTableRenameCol(
-            String[] qualified, HiveParserASTNode ast, HashMap<String, String> 
partSpec)
+    private Operation convertAlterTableRenameCol(String[] qualified, 
HiveParserASTNode ast)

Review comment:
       It does more than just rename. So let's call it 
`convertAlterTableChangeCol`

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -716,29 +836,188 @@ private Serializable 
analyzeCreateTable(HiveParserASTNode ast) throws SemanticEx
             case ctas: // create table as select
                 tblProps = addDefaultProperties(tblProps);
 
-                HiveParserCreateTableDesc createTableDesc =
-                        new HiveParserCreateTableDesc(
-                                dbDotTab,
-                                isExt,
-                                ifNotExists,
-                                isTemporary,
-                                cols,
-                                partCols,
-                                comment,
-                                location,
-                                tblProps,
-                                rowFormatParams,
-                                storageFormat,
-                                primaryKeys,
-                                notNulls);
-                return new CreateTableASDesc(createTableDesc, selectStmt);
+                // analyze the query
+                HiveParserCalcitePlanner calcitePlanner =
+                        hiveParser.createCalcitePlanner(context, queryState, 
hiveShim);
+                calcitePlanner.setCtasCols(cols);
+                RelNode queryRelNode = 
calcitePlanner.genLogicalPlan(selectStmt);
+                // create a table to represent the dest table
+                String[] dbTblName = dbDotTab.split("\\.");
+                Table destTable = new Table(Table.getEmptyTable(dbTblName[0], 
dbTblName[1]));
+                destTable.getSd().setCols(cols);
+                // create the insert operation
+                CatalogSinkModifyOperation insertOperation =
+                        dmlHelper.createInsertOperation(
+                                queryRelNode,
+                                destTable,
+                                Collections.emptyMap(),
+                                Collections.emptyList(),
+                                false);
+
+                CreateTableOperation createTableOperation =
+                        (CreateTableOperation)

Review comment:
       Make `convertCreateTable` return `CreateTableOperation`, no need to do 
the cast

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -449,15 +527,24 @@ private Serializable 
analyzerCreateFunction(HiveParserASTNode ast) {
                     "Temporary function cannot be created with a qualified 
name.");
         }
 
-        CreateFunctionDesc desc = new CreateFunctionDesc();
-        desc.setFunctionName(functionName);
-        desc.setTemp(isTemporaryFunction);
-        desc.setClassName(className);
-        desc.setResources(Collections.emptyList());
-        return new FunctionWork(desc);
+        if (isTemporaryFunction) {
+            // hive's temporary function is more like flink's temp system 
function, e.g. doesn't
+            // belong to a catalog/db
+            // the DDL analyzer makes sure temp function name is not a 
compound one

Review comment:
       this comment should be deleted

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1466,14 +1886,19 @@ private Serializable analyzeAlterTableAddParts(
         Map<String, String> currentPart = null;

Review comment:
       rename it to `currentSpec`

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -295,52 +375,46 @@ public Serializable analyzeInternal(HiveParserASTNode 
input) throws SemanticExce
                     break;
                 }
             case HiveASTParser.TOK_DROPTABLE:
-                res = analyzeDropTable(ast, null);
+                res = convertDropTable(ast, null);
                 break;
             case HiveASTParser.TOK_DESCTABLE:
-                res = analyzeDescribeTable(ast);
+                res = convertDescribeTable(ast);
                 break;
             case HiveASTParser.TOK_SHOWDATABASES:
-                res = analyzeShowDatabases(ast);
+                res = convertShowDatabases();
                 break;
             case HiveASTParser.TOK_SHOWTABLES:
-                res = analyzeShowTables(ast, false);
+                res = convertShowTables(ast, false);
                 break;
             case HiveASTParser.TOK_SHOWFUNCTIONS:
-                res = analyzeShowFunctions(ast);
+                res = convertShowFunctions(ast);
                 break;
             case HiveASTParser.TOK_SHOWVIEWS:
-                res = analyzeShowTables(ast, true);
-                break;
-            case HiveASTParser.TOK_DESCFUNCTION:
-                res = analyzeDescFunction(ast);
-                break;
-            case HiveASTParser.TOK_DESCDATABASE:
-                res = analyzeDescDatabase(ast);
+                res = convertShowTables(ast, true);
                 break;
             case HiveASTParser.TOK_DROPVIEW:
-                res = analyzeDropTable(ast, TableType.VIRTUAL_VIEW);
+                res = convertDropTable(ast, TableType.VIRTUAL_VIEW);
                 break;
             case HiveASTParser.TOK_ALTERVIEW:

Review comment:
       extract this into a separate method

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1224,23 +1590,19 @@ private Serializable 
analyzeShowPartitions(HiveParserASTNode ast) throws Semanti
 
         validateTable(tableName, null);

Review comment:
       We don't really need this method. Catalog will check whether the table 
exists when the operation is executed.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -197,18 +287,9 @@ private ObjectPath toObjectPath(String name) throws 
SemanticException {
         return new ObjectPath(parts[0], parts[1]);
     }
 
-    private HashSet<ReadEntity> getInputs() {
-        return new HashSet<>();
-    }
-
-    private HashSet<WriteEntity> getOutputs() {
-        return new HashSet<>();
-    }
-
-    public Serializable analyzeInternal(HiveParserASTNode input) throws 
SemanticException {
-
+    public Operation convertToOperation(HiveParserASTNode input) throws 
SemanticException {
         HiveParserASTNode ast = input;
-        Serializable res = null;
+        Operation res = null;
         switch (ast.getType()) {
             case HiveASTParser.TOK_ALTERTABLE:

Review comment:
       Let's extract this into a separate method to handle all the ALTER TABLE 
operations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to