This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 452986dd05b193d492e0fc7b817f61b9695871d2 Author: Jark Wu <[email protected]> AuthorDate: Sun Mar 19 18:13:17 2023 +0800 [FLINK-31507][table] Move execution logic of DescribeTableOperation out from TableEnvironmentImpl This closes #22215 --- .../src/test/resources/sql/table.q | 4 +- .../src/test/resources/sql/table.q | 4 +- .../table/api/internal/TableEnvironmentImpl.java | 118 --------------------- .../table/operations/DescribeTableOperation.java | 11 +- 4 files changed, 14 insertions(+), 123 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q b/flink-table/flink-sql-client/src/test/resources/sql/table.q index 1fb69431a81..301c9dc51c7 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q @@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with identifier 'default_c describe non_exist; [ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist +org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist. !error desc non_exist; [ERROR] Could not execute SQL statement. Reason: -org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist +org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist. !error alter table non_exist rename to non_exist2; diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q index 821f679fa36..f338864fddd 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q @@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with identifier 'default_c describe non_exist; !output -org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist +org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist. !error desc non_exist; !output -org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist +org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist. !error alter table non_exist rename to non_exist2; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1e75fce6e34..644cf578776 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -57,7 +57,6 @@ import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; -import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; @@ -79,7 +78,6 @@ import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.DeleteFromFilterOperation; -import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExecutableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -106,7 +104,6 @@ import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceValidation; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.table.utils.print.PrintStyle; import org.apache.flink.types.Row; @@ -970,18 +967,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) .data(Collections.singletonList(Row.of(explanation))) .build(); - } else if (operation instanceof DescribeTableOperation) { - DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation; - Optional<ContextResolvedTable> result = - catalogManager.getTable(describeTableOperation.getSqlIdentifier()); - if (result.isPresent()) { - return buildDescribeResult(result.get().getResolvedSchema()); - } else { - throw new ValidationException( - String.format( - "Tables or views with the identifier '%s' doesn't exist", - describeTableOperation.getSqlIdentifier().asSummaryString())); - } } else if (operation instanceof QueryOperation) { return executeQueryOperation((QueryOperation) operation); } else if (operation instanceof ExecutePlanOperation) { @@ -1068,109 +1053,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } - private TableResultInternal buildDescribeResult(ResolvedSchema schema) { - Object[][] rows = buildTableColumns(schema); - boolean nonComments = isSchemaNonColumnComments(schema); - return buildResult( - generateTableColumnsNames(nonComments), - generateTableColumnsDataTypes(nonComments), - rows); - } - - private DataType[] generateTableColumnsDataTypes(boolean nonComments) { - final ArrayList<DataType> result = - new ArrayList<>( - Arrays.asList( - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.BOOLEAN(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING())); - if (!nonComments) { - result.add(DataTypes.STRING()); - } - return result.toArray(new DataType[0]); - } - - private String[] generateTableColumnsNames(boolean nonComments) { - final ArrayList<String> result = - new ArrayList<>( - Arrays.asList("name", "type", "null", "key", "extras", "watermark")); - if (!nonComments) { - result.add("comment"); - } - return result.toArray(new String[0]); - } - - private Object[][] buildTableColumns(ResolvedSchema schema) { - Map<String, String> fieldToWatermark = - schema.getWatermarkSpecs().stream() - .collect( - Collectors.toMap( - WatermarkSpec::getRowtimeAttribute, - spec -> spec.getWatermarkExpression().asSummaryString())); - - Map<String, String> fieldToPrimaryKey = new HashMap<>(); - schema.getPrimaryKey() - .ifPresent( - (p) -> { - List<String> columns = p.getColumns(); - columns.forEach( - (c) -> - fieldToPrimaryKey.put( - c, - String.format( - "PRI(%s)", - String.join(", ", columns)))); - }); - boolean nonComments = isSchemaNonColumnComments(schema); - return schema.getColumns().stream() - .map( - (c) -> { - final LogicalType logicalType = c.getDataType().getLogicalType(); - final ArrayList<Object> result = - new ArrayList<>( - Arrays.asList( - c.getName(), - logicalType.copy(true).asSummaryString(), - logicalType.isNullable(), - fieldToPrimaryKey.getOrDefault( - c.getName(), null), - c.explainExtras().orElse(null), - fieldToWatermark.getOrDefault( - c.getName(), null))); - if (!nonComments) { - result.add(c.getComment().orElse(null)); - } - return result.toArray(); - }) - .toArray(Object[][]::new); - } - - private boolean isSchemaNonColumnComments(ResolvedSchema schema) { - return schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent); - } - - private TableResultInternal buildResult(String[] headers, DataType[] types, Object[][] rows) { - ResolvedSchema schema = ResolvedSchema.physical(headers, types); - ResultProvider provider = - new StaticResultProvider( - Arrays.stream(rows).map(Row::of).collect(Collectors.toList())); - return TableResultImpl.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .schema(ResolvedSchema.physical(headers, types)) - .resultProvider(provider) - .setPrintStyle( - PrintStyle.tableauWithDataInferredColumnWidths( - schema, - provider.getRowDataStringConverter(), - Integer.MAX_VALUE, - true, - false)) - .build(); - } - /** * extract sink identifier names from {@link ModifyOperation}s and deduplicate them with {@link * #deduplicateSinkIdentifierNames(List)}. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java index 37118f874b2..c4b92cbd6d3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeTableOperation.java @@ -18,6 +18,7 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.catalog.ObjectIdentifier; import java.util.Collections; @@ -28,7 +29,7 @@ import java.util.Map; * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier * statement. */ -public class DescribeTableOperation implements Operation { +public class DescribeTableOperation implements Operation, ExecutableOperation { private final ObjectIdentifier sqlIdentifier; private final boolean isExtended; @@ -54,4 +55,12 @@ public class DescribeTableOperation implements Operation { return OperationUtils.formatWithChildren( "DESCRIBE", params, Collections.emptyList(), Operation::asSummaryString); } + + @Override + public TableResultInternal execute(Context ctx) { + // DESCRIBE <table> is a synonym for SHOW COLUMNS without LIKE pattern. + ShowColumnsOperation showColumns = + new ShowColumnsOperation(sqlIdentifier, null, false, false, "FROM"); + return showColumns.execute(ctx); + } }
