lirui-apache commented on a change in pull request #16416:
URL: https://github.com/apache/flink/pull/16416#discussion_r666656070
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1388,23 +1723,41 @@ private Serializable analyzeAlterTableRenameCol(
}
String tblName = HiveParserBaseSemanticAnalyzer.getDotName(qualified);
- return HiveParserAlterTableDesc.changeColumn(
- tblName,
- HiveParserBaseSemanticAnalyzer.unescapeIdentifier(oldColName),
- HiveParserBaseSemanticAnalyzer.unescapeIdentifier(newColName),
- newType,
- newComment,
- first,
- flagCol,
- isCascade);
+
+ ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
+ CatalogBaseTable catalogBaseTable =
getCatalogBaseTable(tableIdentifier);
+ if (catalogBaseTable instanceof CatalogView) {
Review comment:
Can we do this check in the outer method so that we don't have to do
this in each alter table methods?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1441,16 +1853,24 @@ private Serializable analyzeAlterTableDropParts(
validateAlterTableType(tab,
AlterTableDesc.AlterTableTypes.DROPPARTITION, expectView);
- return new DropPartitionDesc(qualified[0], qualified[1], partSpecs,
ifExists);
+ ObjectIdentifier tableIdentifier =
+ catalogManager.qualifyIdentifier(
+ UnresolvedIdentifier.of(qualified[0], qualified[1]));
+ CatalogBaseTable catalogBaseTable =
getCatalogBaseTable(tableIdentifier);
+ if (catalogBaseTable instanceof CatalogView) {
+ throw new ValidationException("DROP PARTITION for a view is not
supported");
+ }
+ List<CatalogPartitionSpec> specs =
+
partSpecs.stream().map(CatalogPartitionSpec::new).collect(Collectors.toList());
+ return new DropPartitionsOperation(tableIdentifier, ifExists, specs);
}
/**
* Add one or more partitions to a table. Useful when the data has been
copied to the right
* location by some other process.
*/
- private Serializable analyzeAlterTableAddParts(
- String[] qualified, CommonTree ast, boolean expectView) throws
SemanticException {
-
+ private Operation convertAlterTableAddParts(
+ String[] qualified, CommonTree ast, boolean expectView) {
Review comment:
We don't support partitions for views. So let's remove the `expectView`
parameter.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -841,25 +1141,36 @@ private Serializable
analyzeDropDatabase(HiveParserASTNode ast) {
ifCascade = true;
}
- return new HiveParserDropDatabaseDesc(dbName, ifExists, ifCascade);
+ return new DropDatabaseOperation(
+ catalogManager.getCurrentCatalog(), dbName, ifExists,
ifCascade);
}
- private Serializable analyzeSwitchDatabase(HiveParserASTNode ast) {
+ private Operation convertSwitchDatabase(HiveParserASTNode ast) {
String dbName =
HiveParserBaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0).getText());
- SwitchDatabaseDesc switchDatabaseDesc = new SwitchDatabaseDesc(dbName);
- return new DDLWork(new HashSet<>(), new HashSet<>(),
switchDatabaseDesc);
+ return new UseDatabaseOperation(catalogManager.getCurrentCatalog(),
dbName);
}
- private Serializable analyzeDropTable(HiveParserASTNode ast, TableType
expectedType) {
+ private Operation convertDropTable(HiveParserASTNode ast, TableType
expectedType) {
String tableName =
HiveParserBaseSemanticAnalyzer.getUnescapedName(
(HiveParserASTNode) ast.getChild(0));
boolean ifExists =
(ast.getFirstChildWithType(HiveASTParser.TOK_IFEXISTS) != null);
- boolean ifPurge = (ast.getFirstChildWithType(HiveASTParser.KW_PURGE)
!= null);
- return new HiveParserDropTableDesc(
- tableName, expectedType == TableType.VIRTUAL_VIEW, ifExists,
ifPurge);
+ ObjectIdentifier identifier = parseObjectIdentifier(tableName);
+ CatalogBaseTable baseTable = getCatalogBaseTable(identifier, true);
+
+ if (expectedType == TableType.VIRTUAL_VIEW) {
+ if (baseTable instanceof CatalogTable) {
+ throw new ValidationException("DROP VIEW for a table is not
allowed");
+ }
+ return new DropViewOperation(identifier, ifExists, false);
+ } else {
+ if (baseTable instanceof CatalogView) {
+ throw new ValidationException("DROP TABLE for a view is not
allowed");
+ }
+ return new DropTableOperation(identifier, ifExists, false);
+ }
}
private void validateAlterTableType(
Review comment:
This method can be simplified and only checks whether a table is
non-native
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1414,10 +1767,69 @@ private Serializable analyzeAlterTableModifyCols(
isCascade = true;
}
- return HiveParserAlterTableDesc.addReplaceColumns(tblName, newCols,
replace, isCascade);
+ ObjectIdentifier tableIdentifier = parseObjectIdentifier(tblName);
+ CatalogBaseTable catalogBaseTable =
getCatalogBaseTable(tableIdentifier);
+ if (catalogBaseTable instanceof CatalogView) {
+ throw new ValidationException("ALTER TABLE for a view is not
allowed");
+ }
+ CatalogTable oldTable = (CatalogTable) catalogBaseTable;
+
+ // prepare properties
+ Map<String, String> props = new HashMap<>(oldTable.getOptions());
+ props.put(ALTER_TABLE_OP, ALTER_COLUMNS.name());
+ if (isCascade) {
+ props.put(ALTER_COL_CASCADE, "true");
+ }
+ TableSchema oldSchema = oldTable.getSchema();
+ final int numPartCol = oldTable.getPartitionKeys().size();
+ TableSchema.Builder builder = TableSchema.builder();
+ // add existing non-part col if we're not replacing
+ if (!replace) {
+ List<TableColumn> nonPartCols =
+ oldSchema.getTableColumns().subList(0,
oldSchema.getFieldCount() - numPartCol);
+ for (TableColumn column : nonPartCols) {
+ builder.add(column);
+ }
+ setWatermarkAndPK(builder, oldSchema);
+ }
+ // add new cols
+ for (FieldSchema col : newCols) {
+ builder.add(
+ TableColumn.physical(
+ col.getName(),
+ HiveTypeUtil.toFlinkType(
+
TypeInfoUtils.getTypeInfoFromTypeString(col.getType()))));
+ }
+ // add part cols
+ List<TableColumn> partCols =
+ oldSchema
+ .getTableColumns()
+ .subList(oldSchema.getFieldCount() - numPartCol,
oldSchema.getFieldCount());
+ for (TableColumn column : partCols) {
+ builder.add(column);
+ }
+ return new AlterTableSchemaOperation(
+ tableIdentifier,
+ new CatalogTableImpl(
+ builder.build(),
+ oldTable.getPartitionKeys(),
+ props,
+ oldTable.getComment()));
+ }
+
+ private static void setWatermarkAndPK(TableSchema.Builder builder,
TableSchema schema) {
+ for (WatermarkSpec watermarkSpec : schema.getWatermarkSpecs()) {
+ builder.watermark(watermarkSpec);
+ }
+ schema.getPrimaryKey()
+ .ifPresent(
+ pk -> {
+ builder.primaryKey(
+ pk.getName(), pk.getColumns().toArray(new
String[0]));
+ });
}
- private Serializable analyzeAlterTableDropParts(
+ private Operation convertAlterTableDropParts(
Review comment:
Same here, remove `expectView`
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1495,25 +1920,96 @@ private Serializable analyzeAlterTableAddParts(
// add the last one
if (currentPart != null) {
- addPartitionDesc.addPartition(currentPart, currentLocation);
+ specs.add(new CatalogPartitionSpec(currentPart));
+ Map<String, String> props = new HashMap<>();
+ if (currentLocation != null) {
+ props.put(TABLE_LOCATION_URI, currentLocation);
+ }
+ partitions.add(new CatalogPartitionImpl(props, null));
}
- return new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
+ ObjectIdentifier tableIdentifier =
+ tab.getDbName() == null
+ ? parseObjectIdentifier(tab.getTableName())
+ : catalogManager.qualifyIdentifier(
+ UnresolvedIdentifier.of(tab.getDbName(),
tab.getTableName()));
+ CatalogBaseTable catalogBaseTable =
getCatalogBaseTable(tableIdentifier);
+ if (catalogBaseTable instanceof CatalogView) {
Review comment:
No need to do the check here. If it's a view, we should have thrown
"alter table for view is not allowed"
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1495,25 +1920,96 @@ private Serializable analyzeAlterTableAddParts(
// add the last one
if (currentPart != null) {
- addPartitionDesc.addPartition(currentPart, currentLocation);
+ specs.add(new CatalogPartitionSpec(currentPart));
+ Map<String, String> props = new HashMap<>();
+ if (currentLocation != null) {
+ props.put(TABLE_LOCATION_URI, currentLocation);
+ }
+ partitions.add(new CatalogPartitionImpl(props, null));
}
- return new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
+ ObjectIdentifier tableIdentifier =
+ tab.getDbName() == null
Review comment:
we can just get DB and table name from `qualified` parameter?
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1285,59 +1647,32 @@ private Serializable
analyzeShowTables(HiveParserASTNode ast, boolean expectView
*
* @param ast The parsed command tree.
*/
- private Serializable analyzeShowFunctions(HiveParserASTNode ast) {
- ShowFunctionsDesc showFuncsDesc;
- if (ast.getChildCount() == 1) {
- String funcNames =
-
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
- showFuncsDesc = new ShowFunctionsDesc(getResFile(), funcNames);
- } else if (ast.getChildCount() == 2) {
+ private Operation convertShowFunctions(HiveParserASTNode ast) {
+ if (ast.getChildCount() == 2) {
assert (ast.getChild(0).getType() == HiveASTParser.KW_LIKE);
throw new ValidationException("SHOW FUNCTIONS LIKE is not
supported yet");
- } else {
- showFuncsDesc = new ShowFunctionsDesc(getResFile());
- }
- return new DDLWork(getInputs(), getOutputs(), showFuncsDesc);
- }
-
- /**
- * Add the task according to the parsed command tree. This is used for the
CLI command "DESCRIBE
- * FUNCTION;".
- *
- * @param ast The parsed command tree.
- */
- private Serializable analyzeDescFunction(HiveParserASTNode ast) {
- String funcName;
- boolean isExtended;
-
- if (ast.getChildCount() == 1) {
- funcName =
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
- isExtended = false;
- } else if (ast.getChildCount() == 2) {
- funcName =
HiveParserBaseSemanticAnalyzer.stripQuotes(ast.getChild(0).getText());
- isExtended = true;
- } else {
- throw new ValidationException("Unexpected Tokens at DESCRIBE
FUNCTION");
}
-
- DescFunctionDesc descFuncDesc = new DescFunctionDesc(getResFile(),
funcName, isExtended);
- return new DDLWork(getInputs(), getOutputs(), descFuncDesc);
+ return new ShowFunctionsOperation();
}
- private Serializable analyzeAlterTableRename(
+ private Operation convertAlterTableRename(
String[] source, HiveParserASTNode ast, boolean expectView) throws
SemanticException {
String[] target =
HiveParserBaseSemanticAnalyzer.getQualifiedTableName(
(HiveParserASTNode) ast.getChild(0));
String sourceName = HiveParserBaseSemanticAnalyzer.getDotName(source);
String targetName = HiveParserBaseSemanticAnalyzer.getDotName(target);
+ ObjectIdentifier objectIdentifier = parseObjectIdentifier(sourceName);
+ checkAlterTableLegal(objectIdentifier, expectView);
- return HiveParserAlterTableDesc.rename(sourceName, targetName,
expectView);
+ return expectView
+ ? new AlterViewRenameOperation(objectIdentifier,
parseObjectIdentifier(targetName))
+ : new AlterTableRenameOperation(
+ objectIdentifier, parseObjectIdentifier(targetName));
}
- private Serializable analyzeAlterTableRenameCol(
- String[] qualified, HiveParserASTNode ast, HashMap<String, String>
partSpec)
+ private Operation convertAlterTableRenameCol(String[] qualified,
HiveParserASTNode ast)
Review comment:
It does more than just rename. So let's call it
`convertAlterTableChangeCol`
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -716,29 +836,188 @@ private Serializable
analyzeCreateTable(HiveParserASTNode ast) throws SemanticEx
case ctas: // create table as select
tblProps = addDefaultProperties(tblProps);
- HiveParserCreateTableDesc createTableDesc =
- new HiveParserCreateTableDesc(
- dbDotTab,
- isExt,
- ifNotExists,
- isTemporary,
- cols,
- partCols,
- comment,
- location,
- tblProps,
- rowFormatParams,
- storageFormat,
- primaryKeys,
- notNulls);
- return new CreateTableASDesc(createTableDesc, selectStmt);
+ // analyze the query
+ HiveParserCalcitePlanner calcitePlanner =
+ hiveParser.createCalcitePlanner(context, queryState,
hiveShim);
+ calcitePlanner.setCtasCols(cols);
+ RelNode queryRelNode =
calcitePlanner.genLogicalPlan(selectStmt);
+ // create a table to represent the dest table
+ String[] dbTblName = dbDotTab.split("\\.");
+ Table destTable = new Table(Table.getEmptyTable(dbTblName[0],
dbTblName[1]));
+ destTable.getSd().setCols(cols);
+ // create the insert operation
+ CatalogSinkModifyOperation insertOperation =
+ dmlHelper.createInsertOperation(
+ queryRelNode,
+ destTable,
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ false);
+
+ CreateTableOperation createTableOperation =
+ (CreateTableOperation)
Review comment:
Make `convertCreateTable` return `CreateTableOperation`, no need to do
the cast
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -449,15 +527,24 @@ private Serializable
analyzerCreateFunction(HiveParserASTNode ast) {
"Temporary function cannot be created with a qualified
name.");
}
- CreateFunctionDesc desc = new CreateFunctionDesc();
- desc.setFunctionName(functionName);
- desc.setTemp(isTemporaryFunction);
- desc.setClassName(className);
- desc.setResources(Collections.emptyList());
- return new FunctionWork(desc);
+ if (isTemporaryFunction) {
+ // hive's temporary function is more like flink's temp system
function, e.g. doesn't
+ // belong to a catalog/db
+ // the DDL analyzer makes sure temp function name is not a
compound one
Review comment:
this comment should be deleted
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1466,14 +1886,19 @@ private Serializable analyzeAlterTableAddParts(
Map<String, String> currentPart = null;
Review comment:
rename it to `currentSpec`
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -295,52 +375,46 @@ public Serializable analyzeInternal(HiveParserASTNode
input) throws SemanticExce
break;
}
case HiveASTParser.TOK_DROPTABLE:
- res = analyzeDropTable(ast, null);
+ res = convertDropTable(ast, null);
break;
case HiveASTParser.TOK_DESCTABLE:
- res = analyzeDescribeTable(ast);
+ res = convertDescribeTable(ast);
break;
case HiveASTParser.TOK_SHOWDATABASES:
- res = analyzeShowDatabases(ast);
+ res = convertShowDatabases();
break;
case HiveASTParser.TOK_SHOWTABLES:
- res = analyzeShowTables(ast, false);
+ res = convertShowTables(ast, false);
break;
case HiveASTParser.TOK_SHOWFUNCTIONS:
- res = analyzeShowFunctions(ast);
+ res = convertShowFunctions(ast);
break;
case HiveASTParser.TOK_SHOWVIEWS:
- res = analyzeShowTables(ast, true);
- break;
- case HiveASTParser.TOK_DESCFUNCTION:
- res = analyzeDescFunction(ast);
- break;
- case HiveASTParser.TOK_DESCDATABASE:
- res = analyzeDescDatabase(ast);
+ res = convertShowTables(ast, true);
break;
case HiveASTParser.TOK_DROPVIEW:
- res = analyzeDropTable(ast, TableType.VIRTUAL_VIEW);
+ res = convertDropTable(ast, TableType.VIRTUAL_VIEW);
break;
case HiveASTParser.TOK_ALTERVIEW:
Review comment:
extract this into a separate method
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -1224,23 +1590,19 @@ private Serializable
analyzeShowPartitions(HiveParserASTNode ast) throws Semanti
validateTable(tableName, null);
Review comment:
We don't really need this method. Catalog will check whether the table
exists when the operation is executed.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
##########
@@ -197,18 +287,9 @@ private ObjectPath toObjectPath(String name) throws
SemanticException {
return new ObjectPath(parts[0], parts[1]);
}
- private HashSet<ReadEntity> getInputs() {
- return new HashSet<>();
- }
-
- private HashSet<WriteEntity> getOutputs() {
- return new HashSet<>();
- }
-
- public Serializable analyzeInternal(HiveParserASTNode input) throws
SemanticException {
-
+ public Operation convertToOperation(HiveParserASTNode input) throws
SemanticException {
HiveParserASTNode ast = input;
- Serializable res = null;
+ Operation res = null;
switch (ast.getType()) {
case HiveASTParser.TOK_ALTERTABLE:
Review comment:
Let's extract this into a separate method to handle all the ALTER TABLE
operations
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]