godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935155784
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation
convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
compileAndExecutePlan.getOperandList().get(0)));
}
+ private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+ UnresolvedIdentifier unresolvedIdentifier =
+ UnresolvedIdentifier.of(analyzeTable.fullTableName());
+ ObjectIdentifier tableIdentifier =
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+ Optional<ContextResolvedTable> optionalCatalogTable =
+ catalogManager.getTable(tableIdentifier);
+ if (!optionalCatalogTable.isPresent()) {
+ throw new ValidationException(
+ String.format("Table %s doesn't exist.", tableIdentifier));
+ }
+
+ CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+ if (baseTable instanceof CatalogView) {
+ throw new ValidationException("ANALYZE TABLE for a view is not
allowed");
+ }
+ CatalogTable table = (CatalogTable) baseTable;
+ ResolvedSchema schema =
+
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+ LinkedHashMap<String, String> partitions =
analyzeTable.getPartitions();
+ List<CatalogPartitionSpec> targetPartitionSpecs = null;
+ if (table.isPartitioned()) {
+ if (!new
ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+ throw new ValidationException(
+ String.format(
+ "For partition table, all partition keys
should be specified explicitly. "
+ + "The given partition keys: [%s] are
not match the target partition keys: [%s]",
+ String.join(",", partitions.keySet()),
+ String.join(",", table.getPartitionKeys())));
+ }
+
+ try {
+ targetPartitionSpecs = getPartitionSpecs(tableIdentifier,
schema, partitions);
+ } catch (Exception e) {
+ throw new ValidationException(e.getMessage(), e);
+ }
+ } else if (!partitions.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Table: %s is not a partition table, while
partition values is given",
+ tableIdentifier));
+ }
+
+ List<String> origColumns =
+ ((RowType)
schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+ String[] columns = analyzeTable.getColumnNames();
+ List<String> targetColumns;
+ if (analyzeTable.isAllColumns()) {
Review Comment:
compute column and metadata column will be excluded, I will throw more clear
exception when the given columns contain compute column and metadata column.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]