godfreyhe commented on code in PR #20363:
URL: https://github.com/apache/flink/pull/20363#discussion_r935155784


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java:
##########
@@ -1247,6 +1268,178 @@ private Operation 
convertCompileAndExecutePlan(SqlCompileAndExecutePlan compileA
                         compileAndExecutePlan.getOperandList().get(0)));
     }
 
+    private Operation convertAnalyzeTable(SqlAnalyzeTable analyzeTable) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(analyzeTable.fullTableName());
+        ObjectIdentifier tableIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        Optional<ContextResolvedTable> optionalCatalogTable =
+                catalogManager.getTable(tableIdentifier);
+        if (!optionalCatalogTable.isPresent()) {
+            throw new ValidationException(
+                    String.format("Table %s doesn't exist.", tableIdentifier));
+        }
+
+        CatalogBaseTable baseTable = optionalCatalogTable.get().getTable();
+        if (baseTable instanceof CatalogView) {
+            throw new ValidationException("ANALYZE TABLE for a view is not 
allowed");
+        }
+        CatalogTable table = (CatalogTable) baseTable;
+        ResolvedSchema schema =
+                
baseTable.getUnresolvedSchema().resolve(catalogManager.getSchemaResolver());
+
+        LinkedHashMap<String, String> partitions = 
analyzeTable.getPartitions();
+        List<CatalogPartitionSpec> targetPartitionSpecs = null;
+        if (table.isPartitioned()) {
+            if (!new 
ArrayList<>(partitions.keySet()).equals(table.getPartitionKeys())) {
+                throw new ValidationException(
+                        String.format(
+                                "For partition table, all partition keys 
should be specified explicitly. "
+                                        + "The given partition keys: [%s] are 
not match the target partition keys: [%s]",
+                                String.join(",", partitions.keySet()),
+                                String.join(",", table.getPartitionKeys())));
+            }
+
+            try {
+                targetPartitionSpecs = getPartitionSpecs(tableIdentifier, 
schema, partitions);
+            } catch (Exception e) {
+                throw new ValidationException(e.getMessage(), e);
+            }
+        } else if (!partitions.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table: %s is not a partition table, while 
partition values is given",
+                            tableIdentifier));
+        }
+
+        List<String> origColumns =
+                ((RowType) 
schema.toPhysicalRowDataType().getLogicalType()).getFieldNames();
+        String[] columns = analyzeTable.getColumnNames();
+        List<String> targetColumns;
+        if (analyzeTable.isAllColumns()) {

Review Comment:
   compute column and metadata column will be excluded, I will throw more clear 
exception when the given columns contain compute column and metadata column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to