This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 452986dd05b193d492e0fc7b817f61b9695871d2
Author: Jark Wu <[email protected]>
AuthorDate: Sun Mar 19 18:13:17 2023 +0800

    [FLINK-31507][table] Move execution logic of DescribeTableOperation out 
from TableEnvironmentImpl
    
    This closes #22215
---
 .../src/test/resources/sql/table.q                 |   4 +-
 .../src/test/resources/sql/table.q                 |   4 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 118 ---------------------
 .../table/operations/DescribeTableOperation.java   |  11 +-
 4 files changed, 14 insertions(+), 123 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q 
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index 1fb69431a81..301c9dc51c7 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with 
identifier 'default_c
 
 describe non_exist;
 [ERROR] Could not execute SQL statement. Reason:
-org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist
+org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist.
 !error
 
 desc non_exist;
 [ERROR] Could not execute SQL statement. Reason:
-org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist
+org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist.
 !error
 
 alter table non_exist rename to non_exist2;
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
index 821f679fa36..f338864fddd 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with 
identifier 'default_c
 
 describe non_exist;
 !output
-org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist
+org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist.
 !error
 
 desc non_exist;
 !output
-org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist
+org.apache.flink.table.api.ValidationException: Tables or views with the 
identifier 'default_catalog.default_database.non_exist' doesn't exist.
 !error
 
 alter table non_exist rename to non_exist2;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 1e75fce6e34..644cf578776 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -57,7 +57,6 @@ import 
org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
@@ -79,7 +78,6 @@ import 
org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
 import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.DeleteFromFilterOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExecutableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -106,7 +104,6 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.utils.print.PrintStyle;
 import org.apache.flink.types.Row;
@@ -970,18 +967,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                     .schema(ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())))
                     .data(Collections.singletonList(Row.of(explanation)))
                     .build();
-        } else if (operation instanceof DescribeTableOperation) {
-            DescribeTableOperation describeTableOperation = 
(DescribeTableOperation) operation;
-            Optional<ContextResolvedTable> result =
-                    
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
-            if (result.isPresent()) {
-                return buildDescribeResult(result.get().getResolvedSchema());
-            } else {
-                throw new ValidationException(
-                        String.format(
-                                "Tables or views with the identifier '%s' 
doesn't exist",
-                                
describeTableOperation.getSqlIdentifier().asSummaryString()));
-            }
         } else if (operation instanceof QueryOperation) {
             return executeQueryOperation((QueryOperation) operation);
         } else if (operation instanceof ExecutePlanOperation) {
@@ -1068,109 +1053,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
-    private TableResultInternal buildDescribeResult(ResolvedSchema schema) {
-        Object[][] rows = buildTableColumns(schema);
-        boolean nonComments = isSchemaNonColumnComments(schema);
-        return buildResult(
-                generateTableColumnsNames(nonComments),
-                generateTableColumnsDataTypes(nonComments),
-                rows);
-    }
-
-    private DataType[] generateTableColumnsDataTypes(boolean nonComments) {
-        final ArrayList<DataType> result =
-                new ArrayList<>(
-                        Arrays.asList(
-                                DataTypes.STRING(),
-                                DataTypes.STRING(),
-                                DataTypes.BOOLEAN(),
-                                DataTypes.STRING(),
-                                DataTypes.STRING(),
-                                DataTypes.STRING()));
-        if (!nonComments) {
-            result.add(DataTypes.STRING());
-        }
-        return result.toArray(new DataType[0]);
-    }
-
-    private String[] generateTableColumnsNames(boolean nonComments) {
-        final ArrayList<String> result =
-                new ArrayList<>(
-                        Arrays.asList("name", "type", "null", "key", "extras", 
"watermark"));
-        if (!nonComments) {
-            result.add("comment");
-        }
-        return result.toArray(new String[0]);
-    }
-
-    private Object[][] buildTableColumns(ResolvedSchema schema) {
-        Map<String, String> fieldToWatermark =
-                schema.getWatermarkSpecs().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        WatermarkSpec::getRowtimeAttribute,
-                                        spec -> 
spec.getWatermarkExpression().asSummaryString()));
-
-        Map<String, String> fieldToPrimaryKey = new HashMap<>();
-        schema.getPrimaryKey()
-                .ifPresent(
-                        (p) -> {
-                            List<String> columns = p.getColumns();
-                            columns.forEach(
-                                    (c) ->
-                                            fieldToPrimaryKey.put(
-                                                    c,
-                                                    String.format(
-                                                            "PRI(%s)",
-                                                            String.join(", ", 
columns))));
-                        });
-        boolean nonComments = isSchemaNonColumnComments(schema);
-        return schema.getColumns().stream()
-                .map(
-                        (c) -> {
-                            final LogicalType logicalType = 
c.getDataType().getLogicalType();
-                            final ArrayList<Object> result =
-                                    new ArrayList<>(
-                                            Arrays.asList(
-                                                    c.getName(),
-                                                    
logicalType.copy(true).asSummaryString(),
-                                                    logicalType.isNullable(),
-                                                    
fieldToPrimaryKey.getOrDefault(
-                                                            c.getName(), null),
-                                                    
c.explainExtras().orElse(null),
-                                                    
fieldToWatermark.getOrDefault(
-                                                            c.getName(), 
null)));
-                            if (!nonComments) {
-                                result.add(c.getComment().orElse(null));
-                            }
-                            return result.toArray();
-                        })
-                .toArray(Object[][]::new);
-    }
-
-    private boolean isSchemaNonColumnComments(ResolvedSchema schema) {
-        return 
schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent);
-    }
-
-    private TableResultInternal buildResult(String[] headers, DataType[] 
types, Object[][] rows) {
-        ResolvedSchema schema = ResolvedSchema.physical(headers, types);
-        ResultProvider provider =
-                new StaticResultProvider(
-                        
Arrays.stream(rows).map(Row::of).collect(Collectors.toList()));
-        return TableResultImpl.builder()
-                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-                .schema(ResolvedSchema.physical(headers, types))
-                .resultProvider(provider)
-                .setPrintStyle(
-                        PrintStyle.tableauWithDataInferredColumnWidths(
-                                schema,
-                                provider.getRowDataStringConverter(),
-                                Integer.MAX_VALUE,
-                                true,
-                                false))
-                .build();
-    }
-
     /**
      * extract sink identifier names from {@link ModifyOperation}s and 
deduplicate them with {@link
      * #deduplicateSinkIdentifierNames(List)}.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
index 37118f874b2..c4b92cbd6d3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
 import java.util.Collections;
@@ -28,7 +29,7 @@ import java.util.Map;
  * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier
  * statement.
  */
-public class DescribeTableOperation implements Operation {
+public class DescribeTableOperation implements Operation, ExecutableOperation {
 
     private final ObjectIdentifier sqlIdentifier;
     private final boolean isExtended;
@@ -54,4 +55,12 @@ public class DescribeTableOperation implements Operation {
         return OperationUtils.formatWithChildren(
                 "DESCRIBE", params, Collections.emptyList(), 
Operation::asSummaryString);
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // DESCRIBE <table> is a synonym for SHOW COLUMNS without LIKE pattern.
+        ShowColumnsOperation showColumns =
+                new ShowColumnsOperation(sqlIdentifier, null, false, false, 
"FROM");
+        return showColumns.execute(ctx);
+    }
 }

Reply via email to