This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 96b17a63c51d39a83f24945f4ae15a1280f4f5a8 Author: Jark Wu <[email protected]> AuthorDate: Sun Mar 19 18:10:22 2023 +0800 [FLINK-31507][table] Move execution logic of ShowOperation out from TableEnvironmentImpl This closes #22215 --- .../flink/table/api/internal/ShowCreateUtil.java | 136 ++++++------ .../table/api/internal/TableEnvironmentImpl.java | 236 --------------------- .../flink/table/api/internal/TableResultUtils.java | 66 ++++++ .../table/operations/ShowCatalogsOperation.java | 11 + .../table/operations/ShowColumnsOperation.java | 130 +++++++++++- .../table/operations/ShowCreateTableOperation.java | 24 +++ .../table/operations/ShowCreateViewOperation.java | 25 +++ .../operations/ShowCurrentCatalogOperation.java | 10 + .../operations/ShowCurrentDatabaseOperation.java | 11 + .../table/operations/ShowDatabasesOperation.java | 14 ++ .../table/operations/ShowFunctionsOperation.java | 25 +++ .../table/operations/ShowModulesOperation.java | 29 +++ .../flink/table/operations/ShowOperation.java | 2 +- .../table/operations/ShowPartitionsOperation.java | 44 ++++ .../table/operations/ShowTablesOperation.java | 37 ++++ .../flink/table/operations/ShowViewsOperation.java | 11 + .../operations/command/ShowJarsOperation.java | 13 ++ .../operations/command/ShowJobsOperation.java | 9 + 18 files changed, 526 insertions(+), 307 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index 16feaa2571c..31d85124395 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -38,10 +38,77 @@ import java.util.stream.Collectors; /** SHOW CREATE statement Util. */ @Internal -class ShowCreateUtil { +public class ShowCreateUtil { private ShowCreateUtil() {} + public static String buildShowCreateTableRow( + ResolvedCatalogBaseTable<?> table, + ObjectIdentifier tableIdentifier, + boolean isTemporary) { + if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.", + tableIdentifier.asSerializableString())); + } + final String printIndent = " "; + StringBuilder sb = + new StringBuilder() + .append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier)); + sb.append(extractFormattedColumns(table, printIndent)); + extractFormattedWatermarkSpecs(table, printIndent) + .ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs)); + extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> sb.append(",\n").append(pk)); + sb.append("\n) "); + extractFormattedComment(table) + .ifPresent( + c -> sb.append(String.format("COMMENT '%s'%s", c, System.lineSeparator()))); + extractFormattedPartitionedInfo((ResolvedCatalogTable) table) + .ifPresent( + partitionedInfoFormatted -> + sb.append("PARTITIONED BY (") + .append(partitionedInfoFormatted) + .append(")\n")); + extractFormattedOptions(table, printIndent) + .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); + return sb.toString(); + } + + /** Show create view statement only for views. */ + public static String buildShowCreateViewRow( + ResolvedCatalogBaseTable<?> view, + ObjectIdentifier viewIdentifier, + boolean isTemporary) { + if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) { + throw new TableException( + String.format( + "SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.", + viewIdentifier.asSerializableString())); + } + StringBuilder stringBuilder = new StringBuilder(); + if (view.getOrigin() instanceof QueryOperationCatalogView) { + throw new TableException( + "SHOW CREATE VIEW is not supported for views registered by Table API."); + } else { + stringBuilder.append( + String.format( + "CREATE %sVIEW %s%s as%s%s", + isTemporary ? "TEMPORARY " : "", + viewIdentifier.asSerializableString(), + String.format("(%s)", extractFormattedColumnNames(view)), + System.lineSeparator(), + ((CatalogView) view.getOrigin()).getExpandedQuery())); + } + extractFormattedComment(view) + .ifPresent( + c -> + stringBuilder.append( + String.format( + " COMMENT '%s'%s", c, System.lineSeparator()))); + return stringBuilder.toString(); + } + static String buildCreateFormattedPrefix( String tableType, boolean isTemporary, ObjectIdentifier identifier) { return String.format( @@ -153,71 +220,4 @@ class ShowCreateUtil { .map(EncodingUtils::escapeIdentifier) .collect(Collectors.joining(", ")); } - - static String buildShowCreateTableRow( - ResolvedCatalogBaseTable<?> table, - ObjectIdentifier tableIdentifier, - boolean isTemporary) { - if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) { - throw new TableException( - String.format( - "SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.", - tableIdentifier.asSerializableString())); - } - final String printIndent = " "; - StringBuilder sb = - new StringBuilder() - .append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier)); - sb.append(extractFormattedColumns(table, printIndent)); - extractFormattedWatermarkSpecs(table, printIndent) - .ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs)); - extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> sb.append(",\n").append(pk)); - sb.append("\n) "); - extractFormattedComment(table) - .ifPresent( - c -> sb.append(String.format("COMMENT '%s'%s", c, System.lineSeparator()))); - extractFormattedPartitionedInfo((ResolvedCatalogTable) table) - .ifPresent( - partitionedInfoFormatted -> - sb.append("PARTITIONED BY (") - .append(partitionedInfoFormatted) - .append(")\n")); - extractFormattedOptions(table, printIndent) - .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); - return sb.toString(); - } - - /** Show create view statement only for views. */ - static String buildShowCreateViewRow( - ResolvedCatalogBaseTable<?> view, - ObjectIdentifier viewIdentifier, - boolean isTemporary) { - if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) { - throw new TableException( - String.format( - "SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.", - viewIdentifier.asSerializableString())); - } - StringBuilder stringBuilder = new StringBuilder(); - if (view.getOrigin() instanceof QueryOperationCatalogView) { - throw new TableException( - "SHOW CREATE VIEW is not supported for views registered by Table API."); - } else { - stringBuilder.append( - String.format( - "CREATE %sVIEW %s%s as%s%s", - isTemporary ? "TEMPORARY " : "", - viewIdentifier.asSerializableString(), - String.format("(%s)", extractFormattedColumnNames(view)), - System.lineSeparator(), - ((CatalogView) view.getOrigin()).getExpandedQuery())); - } - extractFormattedComment(view) - .ifPresent( - c -> - stringBuilder.append( - String.format( - " COMMENT '%s'%s", c, System.lineSeparator()))); - return stringBuilder.toString(); - } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index f9cb8beabd5..1e75fce6e34 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -46,7 +46,6 @@ import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ConnectorCatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; @@ -54,14 +53,12 @@ import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.FunctionLanguage; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.ExtendedOperationExecutor; @@ -73,7 +70,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.functions.ScalarFunction; -import org.apache.flink.table.functions.SqlLikeUtils; import org.apache.flink.table.functions.UserDefinedFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; import org.apache.flink.table.module.Module; @@ -91,18 +87,6 @@ import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.operations.ShowCatalogsOperation; -import org.apache.flink.table.operations.ShowColumnsOperation; -import org.apache.flink.table.operations.ShowCreateTableOperation; -import org.apache.flink.table.operations.ShowCreateViewOperation; -import org.apache.flink.table.operations.ShowCurrentCatalogOperation; -import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; -import org.apache.flink.table.operations.ShowDatabasesOperation; -import org.apache.flink.table.operations.ShowFunctionsOperation; -import org.apache.flink.table.operations.ShowModulesOperation; -import org.apache.flink.table.operations.ShowPartitionsOperation; -import org.apache.flink.table.operations.ShowTablesOperation; -import org.apache.flink.table.operations.ShowViewsOperation; import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.StatementSetOperation; @@ -110,7 +94,6 @@ import org.apache.flink.table.operations.TableSourceQueryOperation; import org.apache.flink.table.operations.UnloadModuleOperation; import org.apache.flink.table.operations.command.AddJarOperation; import org.apache.flink.table.operations.command.ExecutePlanOperation; -import org.apache.flink.table.operations.command.ShowJarsOperation; import org.apache.flink.table.operations.ddl.AnalyzeTableOperation; import org.apache.flink.table.operations.ddl.CompilePlanOperation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; @@ -479,13 +462,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } - // TODO: Maybe we should expose listJars as tEnv's API later. - private String[] listJars() { - return resourceManager.getResources().keySet().stream() - .map(ResourceUri::getUri) - .toArray(String[]::new); - } - @Override public void createTemporaryTable(String path, TableDescriptor descriptor) { Preconditions.checkNotNull(path, "Path must not be null."); @@ -971,164 +947,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return executeInternal(((StatementSetOperation) operation).getOperations()); } else if (operation instanceof AddJarOperation) { return addJar((AddJarOperation) operation); - } else if (operation instanceof ShowJarsOperation) { - return buildShowResult("jars", listJars()); } else if (operation instanceof LoadModuleOperation) { return loadModule((LoadModuleOperation) operation); } else if (operation instanceof UnloadModuleOperation) { return unloadModule((UnloadModuleOperation) operation); - } else if (operation instanceof ShowCatalogsOperation) { - return buildShowResult("catalog name", listCatalogs()); - } else if (operation instanceof ShowCreateTableOperation) { - ShowCreateTableOperation showCreateTableOperation = - (ShowCreateTableOperation) operation; - ContextResolvedTable table = - catalogManager - .getTable(showCreateTableOperation.getTableIdentifier()) - .orElseThrow( - () -> - new ValidationException( - String.format( - "Could not execute SHOW CREATE TABLE. Table with identifier %s does not exist.", - showCreateTableOperation - .getTableIdentifier() - .asSerializableString()))); - - return TableResultImpl.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) - .data( - Collections.singletonList( - Row.of( - ShowCreateUtil.buildShowCreateTableRow( - table.getResolvedTable(), - showCreateTableOperation.getTableIdentifier(), - table.isTemporary())))) - .build(); - - } else if (operation instanceof ShowCreateViewOperation) { - ShowCreateViewOperation showCreateViewOperation = (ShowCreateViewOperation) operation; - final ContextResolvedTable table = - catalogManager - .getTable(showCreateViewOperation.getViewIdentifier()) - .orElseThrow( - () -> - new ValidationException( - String.format( - "Could not execute SHOW CREATE VIEW. View with identifier %s does not exist.", - showCreateViewOperation - .getViewIdentifier() - .asSerializableString()))); - - return TableResultImpl.builder() - .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) - .data( - Collections.singletonList( - Row.of( - ShowCreateUtil.buildShowCreateViewRow( - table.getResolvedTable(), - showCreateViewOperation.getViewIdentifier(), - table.isTemporary())))) - .build(); - } else if (operation instanceof ShowCurrentCatalogOperation) { - return buildShowResult( - "current catalog name", new String[] {catalogManager.getCurrentCatalog()}); - } else if (operation instanceof ShowDatabasesOperation) { - return buildShowResult("database name", listDatabases()); - } else if (operation instanceof ShowCurrentDatabaseOperation) { - return buildShowResult( - "current database name", new String[] {catalogManager.getCurrentDatabase()}); - } else if (operation instanceof ShowModulesOperation) { - ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation; - if (showModulesOperation.requireFull()) { - return buildShowFullModulesResult(listFullModules()); - } else { - return buildShowResult("module name", listModules()); - } - } else if (operation instanceof ShowTablesOperation) { - ShowTablesOperation showTablesOperation = (ShowTablesOperation) operation; - if (showTablesOperation.getPreposition() == null) { - return buildShowTablesResult(listTables(), showTablesOperation); - } - final String catalogName = showTablesOperation.getCatalogName(); - final String databaseName = showTablesOperation.getDatabaseName(); - Catalog catalog = getCatalogOrThrowException(catalogName); - if (catalog.databaseExists(databaseName)) { - return buildShowTablesResult( - listTables(catalogName, databaseName), showTablesOperation); - } else { - throw new ValidationException( - String.format( - "Database '%s'.'%s' doesn't exist.", catalogName, databaseName)); - } - } else if (operation instanceof ShowFunctionsOperation) { - ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation; - String[] functionNames = null; - ShowFunctionsOperation.FunctionScope functionScope = - showFunctionsOperation.getFunctionScope(); - switch (functionScope) { - case USER: - functionNames = listUserDefinedFunctions(); - break; - case ALL: - functionNames = listFunctions(); - break; - default: - throw new UnsupportedOperationException( - String.format( - "SHOW FUNCTIONS with %s scope is not supported.", - functionScope)); - } - return buildShowResult("function name", functionNames); - } else if (operation instanceof ShowViewsOperation) { - return buildShowResult("view name", listViews()); - } else if (operation instanceof ShowColumnsOperation) { - ShowColumnsOperation showColumnsOperation = (ShowColumnsOperation) operation; - Optional<ContextResolvedTable> result = - catalogManager.getTable(showColumnsOperation.getTableIdentifier()); - if (result.isPresent()) { - return buildShowColumnsResult( - result.get().getResolvedSchema(), showColumnsOperation); - } else { - throw new ValidationException( - String.format( - "Tables or views with the identifier '%s' doesn't exist.", - showColumnsOperation.getTableIdentifier().asSummaryString())); - } - } else if (operation instanceof ShowPartitionsOperation) { - String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString()); - try { - ShowPartitionsOperation showPartitionsOperation = - (ShowPartitionsOperation) operation; - Catalog catalog = - getCatalogOrThrowException( - showPartitionsOperation.getTableIdentifier().getCatalogName()); - ObjectPath tablePath = showPartitionsOperation.getTableIdentifier().toObjectPath(); - CatalogPartitionSpec partitionSpec = showPartitionsOperation.getPartitionSpec(); - List<CatalogPartitionSpec> partitionSpecs = - partitionSpec == null - ? catalog.listPartitions(tablePath) - : catalog.listPartitions(tablePath, partitionSpec); - List<String> partitionNames = new ArrayList<>(partitionSpecs.size()); - for (CatalogPartitionSpec spec : partitionSpecs) { - List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size()); - for (Map.Entry<String, String> partitionKV : - spec.getPartitionSpec().entrySet()) { - String partitionValue = - partitionKV.getValue() == null - ? showPartitionsOperation.getDefaultPartitionName() - : partitionKV.getValue(); - partitionKVs.add(partitionKV.getKey() + "=" + partitionValue); - } - partitionNames.add(String.join("/", partitionKVs)); - } - return buildShowResult("partition name", partitionNames.toArray(new String[0])); - } catch (TableNotExistException e) { - throw new ValidationException(exMsg, e); - } catch (Exception e) { - throw new TableException(exMsg, e); - } } else if (operation instanceof ExplainOperation) { ExplainOperation explainOperation = (ExplainOperation) operation; ExplainDetail[] explainDetails = @@ -1246,13 +1068,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } - private TableResultInternal buildShowResult(String columnName, String[] objects) { - return buildResult( - new String[] {columnName}, - new DataType[] {DataTypes.STRING()}, - Arrays.stream(objects).map((c) -> new String[] {c}).toArray(String[][]::new)); - } - private TableResultInternal buildDescribeResult(ResolvedSchema schema) { Object[][] rows = buildTableColumns(schema); boolean nonComments = isSchemaNonColumnComments(schema); @@ -1288,57 +1103,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { return result.toArray(new String[0]); } - private TableResultInternal buildShowTablesResult( - String[] tableList, ShowTablesOperation showTablesOp) { - String[] rows = tableList.clone(); - if (showTablesOp.isUseLike()) { - rows = - Arrays.stream(tableList) - .filter( - row -> - showTablesOp.isNotLike() - != SqlLikeUtils.like( - row, - showTablesOp.getLikePattern(), - "\\")) - .toArray(String[]::new); - } - return buildShowResult("table name", rows); - } - - private TableResultInternal buildShowColumnsResult( - ResolvedSchema schema, ShowColumnsOperation showColumnsOp) { - Object[][] rows = buildTableColumns(schema); - if (showColumnsOp.isUseLike()) { - rows = - Arrays.stream(rows) - .filter( - row -> - showColumnsOp.isNotLike() - != SqlLikeUtils.like( - row[0].toString(), - showColumnsOp.getLikePattern(), - "\\")) - .toArray(Object[][]::new); - } - boolean nonComments = isSchemaNonColumnComments(schema); - return buildResult( - generateTableColumnsNames(nonComments), - generateTableColumnsDataTypes(nonComments), - rows); - } - - private TableResultInternal buildShowFullModulesResult(ModuleEntry[] moduleEntries) { - Object[][] rows = - Arrays.stream(moduleEntries) - .map(entry -> new Object[] {entry.name(), entry.used()}) - .toArray(Object[][]::new); - return buildResult( - new String[] {"module name", "used"}, - new DataType[] {DataTypes.STRING(), DataTypes.BOOLEAN()}, - rows); - } - private Object[][] buildTableColumns(ResolvedSchema schema) { Map<String, String> fieldToWatermark = schema.getWatermarkSpecs().stream() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java new file mode 100644 index 00000000000..3e0869cc615 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.internal; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.utils.print.PrintStyle; +import org.apache.flink.types.Row; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** Utilities to build {@link TableResultInternal}. */ +public class TableResultUtils { + + /** + * Build a {@link TableResultInternal} for {@code String[]} values with single column. + * + * @param columnName the column name of the result values. + * @param values the string values of the result. + */ + public static TableResultInternal buildStringArrayResult(String columnName, String[] values) { + return buildTableResult( + new String[] {columnName}, + new DataType[] {DataTypes.STRING()}, + Arrays.stream(values).map((c) -> new String[] {c}).toArray(String[][]::new)); + } + + public static TableResultInternal buildTableResult( + String[] headers, DataType[] types, Object[][] rows) { + ResolvedSchema schema = ResolvedSchema.physical(headers, types); + ResultProvider provider = + new StaticResultProvider( + Arrays.stream(rows).map(Row::of).collect(Collectors.toList())); + return TableResultImpl.builder() + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .schema(ResolvedSchema.physical(headers, types)) + .resultProvider(provider) + .setPrintStyle( + PrintStyle.tableauWithDataInferredColumnWidths( + schema, + provider.getRowDataStringConverter(), + Integer.MAX_VALUE, + true, + false)) + .build(); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java index 053c4682046..e608c3fb5f6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java @@ -18,6 +18,10 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW CATALOGS statement. */ public class ShowCatalogsOperation implements ShowOperation { @@ -25,4 +29,11 @@ public class ShowCatalogsOperation implements ShowOperation { public String asSummaryString() { return "SHOW CATALOGS"; } + + @Override + public TableResultInternal execute(Context ctx) { + String[] catalogs = + ctx.getCatalogManager().listCatalogs().stream().sorted().toArray(String[]::new); + return buildStringArrayResult("catalog name", catalogs); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java index b001ff88d77..71fdc6f1a11 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java @@ -18,7 +18,29 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.functions.SqlLikeUtils; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; /** Show columns from [[catalog.]database.]table. */ public class ShowColumnsOperation implements ShowOperation { @@ -26,12 +48,12 @@ public class ShowColumnsOperation implements ShowOperation { private final ObjectIdentifier tableIdentifier; private final boolean useLike; private final boolean notLike; - private final String likePattern; + private final @Nullable String likePattern; private final String preposition; public ShowColumnsOperation( ObjectIdentifier tableIdentifier, - String likePattern, + @Nullable String likePattern, boolean useLike, boolean notLike, String preposition) { @@ -76,4 +98,108 @@ public class ShowColumnsOperation implements ShowOperation { } return String.format("SHOW COLUMNS %s %s", preposition, tableIdentifier.asSummaryString()); } + + @Override + public TableResultInternal execute(Context ctx) { + Optional<ContextResolvedTable> result = ctx.getCatalogManager().getTable(tableIdentifier); + if (!result.isPresent()) { + throw new ValidationException( + String.format( + "Tables or views with the identifier '%s' doesn't exist.", + tableIdentifier.asSummaryString())); + } + + ResolvedSchema schema = result.get().getResolvedSchema(); + Object[][] rows = generateTableColumnsRows(schema); + if (useLike) { + rows = + Arrays.stream(rows) + .filter( + row -> + notLike + != SqlLikeUtils.like( + row[0].toString(), likePattern, "\\")) + .toArray(Object[][]::new); + } + boolean nonComments = isSchemaNonColumnComments(schema); + return buildTableResult( + generateTableColumnsNames(nonComments), + generateTableColumnsDataTypes(nonComments), + rows); + } + + private Object[][] generateTableColumnsRows(ResolvedSchema schema) { + Map<String, String> fieldToWatermark = + schema.getWatermarkSpecs().stream() + .collect( + Collectors.toMap( + WatermarkSpec::getRowtimeAttribute, + spec -> spec.getWatermarkExpression().asSummaryString())); + + Map<String, String> fieldToPrimaryKey = new HashMap<>(); + schema.getPrimaryKey() + .ifPresent( + (p) -> { + List<String> columns = p.getColumns(); + columns.forEach( + (c) -> + fieldToPrimaryKey.put( + c, + String.format( + "PRI(%s)", + String.join(", ", columns)))); + }); + boolean nonComments = isSchemaNonColumnComments(schema); + return schema.getColumns().stream() + .map( + (c) -> { + final LogicalType logicalType = c.getDataType().getLogicalType(); + final ArrayList<Object> result = + new ArrayList<>( + Arrays.asList( + c.getName(), + logicalType.copy(true).asSummaryString(), + logicalType.isNullable(), + fieldToPrimaryKey.getOrDefault( + c.getName(), null), + c.explainExtras().orElse(null), + fieldToWatermark.getOrDefault( + c.getName(), null))); + if (!nonComments) { + result.add(c.getComment().orElse(null)); + } + return result.toArray(); + }) + .toArray(Object[][]::new); + } + + private boolean isSchemaNonColumnComments(ResolvedSchema schema) { + return schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent); + } + + private String[] generateTableColumnsNames(boolean nonComments) { + final ArrayList<String> result = + new ArrayList<>( + Arrays.asList("name", "type", "null", "key", "extras", "watermark")); + if (!nonComments) { + result.add("comment"); + } + return result.toArray(new String[0]); + } + + private DataType[] generateTableColumnsDataTypes(boolean nonComments) { + final ArrayList<DataType> result = + new ArrayList<>( + Arrays.asList( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.BOOLEAN(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING())); + if (!nonComments) { + result.add(DataTypes.STRING()); + } + return result.toArray(new DataType[0]); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java index 47f6b3eed7d..5514fc1c2e4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java @@ -18,8 +18,14 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.ShowCreateUtil; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW CREATE TABLE statement. */ public class ShowCreateTableOperation implements ShowOperation { @@ -37,4 +43,22 @@ public class ShowCreateTableOperation implements ShowOperation { public String asSummaryString() { return String.format("SHOW CREATE TABLE %s", tableIdentifier.asSummaryString()); } + + @Override + public TableResultInternal execute(Context ctx) { + ContextResolvedTable table = + ctx.getCatalogManager() + .getTable(tableIdentifier) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Could not execute SHOW CREATE TABLE. Table with identifier %s does not exist.", + tableIdentifier.asSerializableString()))); + String resultRow = + ShowCreateUtil.buildShowCreateTableRow( + table.getResolvedTable(), tableIdentifier, table.isTemporary()); + + return buildStringArrayResult("result", new String[] {resultRow}); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java index 312c305cc56..5bfb091f22e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java @@ -18,8 +18,14 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.ShowCreateUtil; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW CREATE VIEW statement. */ public class ShowCreateViewOperation implements ShowOperation { @@ -37,4 +43,23 @@ public class ShowCreateViewOperation implements ShowOperation { public String asSummaryString() { return String.format("SHOW CREATE VIEW %s", viewIdentifier.asSummaryString()); } + + @Override + public TableResultInternal execute(Context ctx) { + final ContextResolvedTable table = + ctx.getCatalogManager() + .getTable(viewIdentifier) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Could not execute SHOW CREATE VIEW. View with identifier %s does not exist.", + viewIdentifier.asSerializableString()))); + + String resultRow = + ShowCreateUtil.buildShowCreateViewRow( + table.getResolvedTable(), viewIdentifier, table.isTemporary()); + + return buildStringArrayResult("result", new String[] {resultRow}); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java index be4cb69cea6..3468f173c2b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java @@ -18,10 +18,20 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW CURRENT CATALOG statement. */ public class ShowCurrentCatalogOperation implements ShowOperation { @Override public String asSummaryString() { return "SHOW CURRENT CATALOG"; } + + @Override + public TableResultInternal execute(Context ctx) { + return buildStringArrayResult( + "current catalog name", new String[] {ctx.getCatalogManager().getCurrentCatalog()}); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java index 6d4573ddc09..6ad47d4950e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java @@ -18,10 +18,21 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe SHOW CURRENT DATABASE operation. */ public class ShowCurrentDatabaseOperation implements ShowOperation { @Override public String asSummaryString() { return "SHOW CURRENT DATABASE"; } + + @Override + public TableResultInternal execute(Context ctx) { + return buildStringArrayResult( + "current database name", + new String[] {ctx.getCatalogManager().getCurrentDatabase()}); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java index b2c7136c52f..8e2cd2d41b1 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java @@ -18,6 +18,10 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW DATABASES statement. */ public class ShowDatabasesOperation implements ShowOperation { @@ -25,4 +29,14 @@ public class ShowDatabasesOperation implements ShowOperation { public String asSummaryString() { return "SHOW DATABASES"; } + + @Override + public TableResultInternal execute(Context ctx) { + String[] databases = + ctx.getCatalogManager() + .getCatalogOrThrowException(ctx.getCatalogManager().getCurrentCatalog()) + .listDatabases() + .toArray(new String[0]); + return buildStringArrayResult("database name", databases); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java index 466b0e7e89c..b36ac6b0fdc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java @@ -18,6 +18,12 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import java.util.Arrays; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW [USER] FUNCTIONS statement. */ public class ShowFunctionsOperation implements ShowOperation { @@ -57,4 +63,23 @@ public class ShowFunctionsOperation implements ShowOperation { public FunctionScope getFunctionScope() { return functionScope; } + + @Override + public TableResultInternal execute(Context ctx) { + final String[] functionNames; + switch (functionScope) { + case USER: + functionNames = ctx.getFunctionCatalog().getUserDefinedFunctions(); + break; + case ALL: + functionNames = ctx.getFunctionCatalog().getFunctions(); + break; + default: + throw new UnsupportedOperationException( + String.format( + "SHOW FUNCTIONS with %s scope is not supported.", functionScope)); + } + Arrays.sort(functionNames); + return buildStringArrayResult("function name", functionNames); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java index 582026cbbad..59fb5db51fd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java @@ -18,6 +18,16 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.module.ModuleEntry; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; + /** Operation to describe a SHOW [FULL] MODULES statement. */ public class ShowModulesOperation implements ShowOperation { private final boolean requireFull; @@ -34,4 +44,23 @@ public class ShowModulesOperation implements ShowOperation { public boolean requireFull() { return requireFull; } + + @Override + public TableResultInternal execute(Context ctx) { + if (requireFull) { + ModuleEntry[] fullModules = + ctx.getModuleManager().listFullModules().toArray(new ModuleEntry[0]); + Object[][] rows = + Arrays.stream(fullModules) + .map(entry -> new Object[] {entry.name(), entry.used()}) + .toArray(Object[][]::new); + return buildTableResult( + new String[] {"module name", "used"}, + new DataType[] {DataTypes.STRING(), DataTypes.BOOLEAN()}, + rows); + } else { + String[] modules = ctx.getModuleManager().listModules().toArray(new String[0]); + return buildStringArrayResult("module name", modules); + } + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java index 6f911f812d4..ff902ab6b52 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java @@ -25,4 +25,4 @@ import org.apache.flink.annotation.Internal; * TABLES, SHOW FUNCTIONS, SHOW PARTITIONS. */ @Internal -public interface ShowOperation extends Operation {} +public interface ShowOperation extends Operation, ExecutableOperation {} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java index 08a3ca7ea18..1add82526cc 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java @@ -18,11 +18,23 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW PARTITIONS statement. */ public class ShowPartitionsOperation implements ShowOperation { @@ -70,4 +82,36 @@ public class ShowPartitionsOperation implements ShowOperation { } return builder.toString(); } + + @Override + public TableResultInternal execute(Context ctx) { + try { + final ObjectPath tablePath = tableIdentifier.toObjectPath(); + Catalog catalog = + ctx.getCatalogManager() + .getCatalogOrThrowException(tableIdentifier.getCatalogName()); + List<CatalogPartitionSpec> partitionSpecs = + partitionSpec == null + ? catalog.listPartitions(tablePath) + : catalog.listPartitions(tablePath, partitionSpec); + List<String> partitionNames = new ArrayList<>(partitionSpecs.size()); + for (CatalogPartitionSpec spec : partitionSpecs) { + List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size()); + for (Map.Entry<String, String> partitionKV : spec.getPartitionSpec().entrySet()) { + String partitionValue = + partitionKV.getValue() == null + ? defaultPartitionName + : partitionKV.getValue(); + partitionKVs.add(partitionKV.getKey() + "=" + partitionValue); + } + partitionNames.add(String.join("/", partitionKVs)); + } + return buildStringArrayResult("partition name", partitionNames.toArray(new String[0])); + } catch (TableNotExistException e) { + throw new ValidationException( + String.format("Could not execute %s", asSummaryString()), e); + } catch (Exception e) { + throw new TableException(String.format("Could not execute %s", asSummaryString()), e); + } + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java index faba1f2e122..03f8c63408c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java @@ -18,6 +18,14 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.functions.SqlLikeUtils; + +import java.util.Set; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; import static org.apache.flink.util.Preconditions.checkNotNull; /** Operation to describe a SHOW TABLES statement. */ @@ -104,4 +112,33 @@ public class ShowTablesOperation implements ShowOperation { } return builder.toString(); } + + @Override + public TableResultInternal execute(Context ctx) { + final Set<String> tables; + if (preposition == null) { + tables = ctx.getCatalogManager().listTables(); + } else { + Catalog catalog = ctx.getCatalogManager().getCatalogOrThrowException(catalogName); + if (catalog.databaseExists(databaseName)) { + tables = ctx.getCatalogManager().listTables(catalogName, databaseName); + } else { + throw new ValidationException( + String.format( + "Database '%s'.'%s' doesn't exist.", catalogName, databaseName)); + } + } + + final String[] rows; + if (useLike) { + rows = + tables.stream() + .filter(row -> notLike != SqlLikeUtils.like(row, likePattern, "\\")) + .sorted() + .toArray(String[]::new); + } else { + rows = tables.stream().sorted().toArray(String[]::new); + } + return buildStringArrayResult("table name", rows); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java index 5cb25dd7e96..e181fd3b7f0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java @@ -18,6 +18,10 @@ package org.apache.flink.table.operations; +import org.apache.flink.table.api.internal.TableResultInternal; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + /** Operation to describe a SHOW VIEWS statement. */ public class ShowViewsOperation implements ShowOperation { @@ -25,4 +29,11 @@ public class ShowViewsOperation implements ShowOperation { public String asSummaryString() { return "SHOW VIEWS"; } + + @Override + public TableResultInternal execute(Context ctx) { + String[] views = + ctx.getCatalogManager().listViews().stream().sorted().toArray(String[]::new); + return buildStringArrayResult("view name", views); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java index d0995c208a7..2e28fb82130 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java @@ -18,7 +18,11 @@ package org.apache.flink.table.operations.command; +import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.operations.ShowOperation; +import org.apache.flink.table.resource.ResourceUri; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; /** Operation to describe a SHOW JARS statement. */ public class ShowJarsOperation implements ShowOperation { @@ -27,4 +31,13 @@ public class ShowJarsOperation implements ShowOperation { public String asSummaryString() { return "SHOW JARS"; } + + @Override + public TableResultInternal execute(Context ctx) { + String[] jars = + ctx.getResourceManager().getResources().keySet().stream() + .map(ResourceUri::getUri) + .toArray(String[]::new); + return buildStringArrayResult("jars", jars); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java index 24da96ca30d..f666f59629b 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java @@ -18,6 +18,7 @@ package org.apache.flink.table.operations.command; +import org.apache.flink.table.api.internal.TableResultInternal; import org.apache.flink.table.operations.ShowOperation; /** Operation to describe a SHOW JOBS statement. */ @@ -27,4 +28,12 @@ public class ShowJobsOperation implements ShowOperation { public String asSummaryString() { return "SHOW JOBS"; } + + @Override + public TableResultInternal execute(Context ctx) { + // TODO: We may need to migrate the execution for ShowJobsOperation from SQL Gateway + // OperationExecutor to here. + throw new UnsupportedOperationException( + "ShowJobsOperation doesn't support ExecutableOperation yet."); + } }
