This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 96b17a63c51d39a83f24945f4ae15a1280f4f5a8
Author: Jark Wu <[email protected]>
AuthorDate: Sun Mar 19 18:10:22 2023 +0800

    [FLINK-31507][table] Move execution logic of ShowOperation out from 
TableEnvironmentImpl
    
    This closes #22215
---
 .../flink/table/api/internal/ShowCreateUtil.java   | 136 ++++++------
 .../table/api/internal/TableEnvironmentImpl.java   | 236 ---------------------
 .../flink/table/api/internal/TableResultUtils.java |  66 ++++++
 .../table/operations/ShowCatalogsOperation.java    |  11 +
 .../table/operations/ShowColumnsOperation.java     | 130 +++++++++++-
 .../table/operations/ShowCreateTableOperation.java |  24 +++
 .../table/operations/ShowCreateViewOperation.java  |  25 +++
 .../operations/ShowCurrentCatalogOperation.java    |  10 +
 .../operations/ShowCurrentDatabaseOperation.java   |  11 +
 .../table/operations/ShowDatabasesOperation.java   |  14 ++
 .../table/operations/ShowFunctionsOperation.java   |  25 +++
 .../table/operations/ShowModulesOperation.java     |  29 +++
 .../flink/table/operations/ShowOperation.java      |   2 +-
 .../table/operations/ShowPartitionsOperation.java  |  44 ++++
 .../table/operations/ShowTablesOperation.java      |  37 ++++
 .../flink/table/operations/ShowViewsOperation.java |  11 +
 .../operations/command/ShowJarsOperation.java      |  13 ++
 .../operations/command/ShowJobsOperation.java      |   9 +
 18 files changed, 526 insertions(+), 307 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 16feaa2571c..31d85124395 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -38,10 +38,77 @@ import java.util.stream.Collectors;
 
 /** SHOW CREATE statement Util. */
 @Internal
-class ShowCreateUtil {
+public class ShowCreateUtil {
 
     private ShowCreateUtil() {}
 
+    public static String buildShowCreateTableRow(
+            ResolvedCatalogBaseTable<?> table,
+            ObjectIdentifier tableIdentifier,
+            boolean isTemporary) {
+        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE TABLE is only supported for tables, 
but %s is a view. Please use SHOW CREATE VIEW instead.",
+                            tableIdentifier.asSerializableString()));
+        }
+        final String printIndent = "  ";
+        StringBuilder sb =
+                new StringBuilder()
+                        .append(buildCreateFormattedPrefix("TABLE", 
isTemporary, tableIdentifier));
+        sb.append(extractFormattedColumns(table, printIndent));
+        extractFormattedWatermarkSpecs(table, printIndent)
+                .ifPresent(watermarkSpecs -> 
sb.append(",\n").append(watermarkSpecs));
+        extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> 
sb.append(",\n").append(pk));
+        sb.append("\n) ");
+        extractFormattedComment(table)
+                .ifPresent(
+                        c -> sb.append(String.format("COMMENT '%s'%s", c, 
System.lineSeparator())));
+        extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
+                .ifPresent(
+                        partitionedInfoFormatted ->
+                                sb.append("PARTITIONED BY (")
+                                        .append(partitionedInfoFormatted)
+                                        .append(")\n"));
+        extractFormattedOptions(table, printIndent)
+                .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
+        return sb.toString();
+    }
+
+    /** Show create view statement only for views. */
+    public static String buildShowCreateViewRow(
+            ResolvedCatalogBaseTable<?> view,
+            ObjectIdentifier viewIdentifier,
+            boolean isTemporary) {
+        if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) {
+            throw new TableException(
+                    String.format(
+                            "SHOW CREATE VIEW is only supported for views, but 
%s is a table. Please use SHOW CREATE TABLE instead.",
+                            viewIdentifier.asSerializableString()));
+        }
+        StringBuilder stringBuilder = new StringBuilder();
+        if (view.getOrigin() instanceof QueryOperationCatalogView) {
+            throw new TableException(
+                    "SHOW CREATE VIEW is not supported for views registered by 
Table API.");
+        } else {
+            stringBuilder.append(
+                    String.format(
+                            "CREATE %sVIEW %s%s as%s%s",
+                            isTemporary ? "TEMPORARY " : "",
+                            viewIdentifier.asSerializableString(),
+                            String.format("(%s)", 
extractFormattedColumnNames(view)),
+                            System.lineSeparator(),
+                            ((CatalogView) 
view.getOrigin()).getExpandedQuery()));
+        }
+        extractFormattedComment(view)
+                .ifPresent(
+                        c ->
+                                stringBuilder.append(
+                                        String.format(
+                                                " COMMENT '%s'%s", c, 
System.lineSeparator())));
+        return stringBuilder.toString();
+    }
+
     static String buildCreateFormattedPrefix(
             String tableType, boolean isTemporary, ObjectIdentifier 
identifier) {
         return String.format(
@@ -153,71 +220,4 @@ class ShowCreateUtil {
                 .map(EncodingUtils::escapeIdentifier)
                 .collect(Collectors.joining(", "));
     }
-
-    static String buildShowCreateTableRow(
-            ResolvedCatalogBaseTable<?> table,
-            ObjectIdentifier tableIdentifier,
-            boolean isTemporary) {
-        if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
-            throw new TableException(
-                    String.format(
-                            "SHOW CREATE TABLE is only supported for tables, 
but %s is a view. Please use SHOW CREATE VIEW instead.",
-                            tableIdentifier.asSerializableString()));
-        }
-        final String printIndent = "  ";
-        StringBuilder sb =
-                new StringBuilder()
-                        .append(buildCreateFormattedPrefix("TABLE", 
isTemporary, tableIdentifier));
-        sb.append(extractFormattedColumns(table, printIndent));
-        extractFormattedWatermarkSpecs(table, printIndent)
-                .ifPresent(watermarkSpecs -> 
sb.append(",\n").append(watermarkSpecs));
-        extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> 
sb.append(",\n").append(pk));
-        sb.append("\n) ");
-        extractFormattedComment(table)
-                .ifPresent(
-                        c -> sb.append(String.format("COMMENT '%s'%s", c, 
System.lineSeparator())));
-        extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
-                .ifPresent(
-                        partitionedInfoFormatted ->
-                                sb.append("PARTITIONED BY (")
-                                        .append(partitionedInfoFormatted)
-                                        .append(")\n"));
-        extractFormattedOptions(table, printIndent)
-                .ifPresent(v -> sb.append("WITH 
(\n").append(v).append("\n)\n"));
-        return sb.toString();
-    }
-
-    /** Show create view statement only for views. */
-    static String buildShowCreateViewRow(
-            ResolvedCatalogBaseTable<?> view,
-            ObjectIdentifier viewIdentifier,
-            boolean isTemporary) {
-        if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) {
-            throw new TableException(
-                    String.format(
-                            "SHOW CREATE VIEW is only supported for views, but 
%s is a table. Please use SHOW CREATE TABLE instead.",
-                            viewIdentifier.asSerializableString()));
-        }
-        StringBuilder stringBuilder = new StringBuilder();
-        if (view.getOrigin() instanceof QueryOperationCatalogView) {
-            throw new TableException(
-                    "SHOW CREATE VIEW is not supported for views registered by 
Table API.");
-        } else {
-            stringBuilder.append(
-                    String.format(
-                            "CREATE %sVIEW %s%s as%s%s",
-                            isTemporary ? "TEMPORARY " : "",
-                            viewIdentifier.asSerializableString(),
-                            String.format("(%s)", 
extractFormattedColumnNames(view)),
-                            System.lineSeparator(),
-                            ((CatalogView) 
view.getOrigin()).getExpandedQuery()));
-        }
-        extractFormattedComment(view)
-                .ifPresent(
-                        c ->
-                                stringBuilder.append(
-                                        String.format(
-                                                " COMMENT '%s'%s", c, 
System.lineSeparator())));
-        return stringBuilder.toString();
-    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f9cb8beabd5..1e75fce6e34 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -54,14 +53,12 @@ import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
@@ -73,7 +70,6 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.SqlLikeUtils;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.Module;
@@ -91,18 +87,6 @@ import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.NopOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowColumnsOperation;
-import org.apache.flink.table.operations.ShowCreateTableOperation;
-import org.apache.flink.table.operations.ShowCreateViewOperation;
-import org.apache.flink.table.operations.ShowCurrentCatalogOperation;
-import org.apache.flink.table.operations.ShowCurrentDatabaseOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowModulesOperation;
-import org.apache.flink.table.operations.ShowPartitionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.ShowViewsOperation;
 import org.apache.flink.table.operations.SinkModifyOperation;
 import org.apache.flink.table.operations.SourceQueryOperation;
 import org.apache.flink.table.operations.StatementSetOperation;
@@ -110,7 +94,6 @@ import 
org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.UnloadModuleOperation;
 import org.apache.flink.table.operations.command.AddJarOperation;
 import org.apache.flink.table.operations.command.ExecutePlanOperation;
-import org.apache.flink.table.operations.command.ShowJarsOperation;
 import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
 import org.apache.flink.table.operations.ddl.CompilePlanOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
@@ -479,13 +462,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
-    // TODO: Maybe we should expose listJars as tEnv's API later.
-    private String[] listJars() {
-        return resourceManager.getResources().keySet().stream()
-                .map(ResourceUri::getUri)
-                .toArray(String[]::new);
-    }
-
     @Override
     public void createTemporaryTable(String path, TableDescriptor descriptor) {
         Preconditions.checkNotNull(path, "Path must not be null.");
@@ -971,164 +947,10 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             return executeInternal(((StatementSetOperation) 
operation).getOperations());
         } else if (operation instanceof AddJarOperation) {
             return addJar((AddJarOperation) operation);
-        } else if (operation instanceof ShowJarsOperation) {
-            return buildShowResult("jars", listJars());
         } else if (operation instanceof LoadModuleOperation) {
             return loadModule((LoadModuleOperation) operation);
         } else if (operation instanceof UnloadModuleOperation) {
             return unloadModule((UnloadModuleOperation) operation);
-        } else if (operation instanceof ShowCatalogsOperation) {
-            return buildShowResult("catalog name", listCatalogs());
-        } else if (operation instanceof ShowCreateTableOperation) {
-            ShowCreateTableOperation showCreateTableOperation =
-                    (ShowCreateTableOperation) operation;
-            ContextResolvedTable table =
-                    catalogManager
-                            
.getTable(showCreateTableOperation.getTableIdentifier())
-                            .orElseThrow(
-                                    () ->
-                                            new ValidationException(
-                                                    String.format(
-                                                            "Could not execute 
SHOW CREATE TABLE. Table with identifier %s does not exist.",
-                                                            
showCreateTableOperation
-                                                                    
.getTableIdentifier()
-                                                                    
.asSerializableString())));
-
-            return TableResultImpl.builder()
-                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-                    .schema(ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())))
-                    .data(
-                            Collections.singletonList(
-                                    Row.of(
-                                            
ShowCreateUtil.buildShowCreateTableRow(
-                                                    table.getResolvedTable(),
-                                                    
showCreateTableOperation.getTableIdentifier(),
-                                                    table.isTemporary()))))
-                    .build();
-
-        } else if (operation instanceof ShowCreateViewOperation) {
-            ShowCreateViewOperation showCreateViewOperation = 
(ShowCreateViewOperation) operation;
-            final ContextResolvedTable table =
-                    catalogManager
-                            
.getTable(showCreateViewOperation.getViewIdentifier())
-                            .orElseThrow(
-                                    () ->
-                                            new ValidationException(
-                                                    String.format(
-                                                            "Could not execute 
SHOW CREATE VIEW. View with identifier %s does not exist.",
-                                                            
showCreateViewOperation
-                                                                    
.getViewIdentifier()
-                                                                    
.asSerializableString())));
-
-            return TableResultImpl.builder()
-                    .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-                    .schema(ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())))
-                    .data(
-                            Collections.singletonList(
-                                    Row.of(
-                                            
ShowCreateUtil.buildShowCreateViewRow(
-                                                    table.getResolvedTable(),
-                                                    
showCreateViewOperation.getViewIdentifier(),
-                                                    table.isTemporary()))))
-                    .build();
-        } else if (operation instanceof ShowCurrentCatalogOperation) {
-            return buildShowResult(
-                    "current catalog name", new String[] 
{catalogManager.getCurrentCatalog()});
-        } else if (operation instanceof ShowDatabasesOperation) {
-            return buildShowResult("database name", listDatabases());
-        } else if (operation instanceof ShowCurrentDatabaseOperation) {
-            return buildShowResult(
-                    "current database name", new String[] 
{catalogManager.getCurrentDatabase()});
-        } else if (operation instanceof ShowModulesOperation) {
-            ShowModulesOperation showModulesOperation = (ShowModulesOperation) 
operation;
-            if (showModulesOperation.requireFull()) {
-                return buildShowFullModulesResult(listFullModules());
-            } else {
-                return buildShowResult("module name", listModules());
-            }
-        } else if (operation instanceof ShowTablesOperation) {
-            ShowTablesOperation showTablesOperation = (ShowTablesOperation) 
operation;
-            if (showTablesOperation.getPreposition() == null) {
-                return buildShowTablesResult(listTables(), 
showTablesOperation);
-            }
-            final String catalogName = showTablesOperation.getCatalogName();
-            final String databaseName = showTablesOperation.getDatabaseName();
-            Catalog catalog = getCatalogOrThrowException(catalogName);
-            if (catalog.databaseExists(databaseName)) {
-                return buildShowTablesResult(
-                        listTables(catalogName, databaseName), 
showTablesOperation);
-            } else {
-                throw new ValidationException(
-                        String.format(
-                                "Database '%s'.'%s' doesn't exist.", 
catalogName, databaseName));
-            }
-        } else if (operation instanceof ShowFunctionsOperation) {
-            ShowFunctionsOperation showFunctionsOperation = 
(ShowFunctionsOperation) operation;
-            String[] functionNames = null;
-            ShowFunctionsOperation.FunctionScope functionScope =
-                    showFunctionsOperation.getFunctionScope();
-            switch (functionScope) {
-                case USER:
-                    functionNames = listUserDefinedFunctions();
-                    break;
-                case ALL:
-                    functionNames = listFunctions();
-                    break;
-                default:
-                    throw new UnsupportedOperationException(
-                            String.format(
-                                    "SHOW FUNCTIONS with %s scope is not 
supported.",
-                                    functionScope));
-            }
-            return buildShowResult("function name", functionNames);
-        } else if (operation instanceof ShowViewsOperation) {
-            return buildShowResult("view name", listViews());
-        } else if (operation instanceof ShowColumnsOperation) {
-            ShowColumnsOperation showColumnsOperation = (ShowColumnsOperation) 
operation;
-            Optional<ContextResolvedTable> result =
-                    
catalogManager.getTable(showColumnsOperation.getTableIdentifier());
-            if (result.isPresent()) {
-                return buildShowColumnsResult(
-                        result.get().getResolvedSchema(), 
showColumnsOperation);
-            } else {
-                throw new ValidationException(
-                        String.format(
-                                "Tables or views with the identifier '%s' 
doesn't exist.",
-                                
showColumnsOperation.getTableIdentifier().asSummaryString()));
-            }
-        } else if (operation instanceof ShowPartitionsOperation) {
-            String exMsg = 
getDDLOpExecuteErrorMsg(operation.asSummaryString());
-            try {
-                ShowPartitionsOperation showPartitionsOperation =
-                        (ShowPartitionsOperation) operation;
-                Catalog catalog =
-                        getCatalogOrThrowException(
-                                
showPartitionsOperation.getTableIdentifier().getCatalogName());
-                ObjectPath tablePath = 
showPartitionsOperation.getTableIdentifier().toObjectPath();
-                CatalogPartitionSpec partitionSpec = 
showPartitionsOperation.getPartitionSpec();
-                List<CatalogPartitionSpec> partitionSpecs =
-                        partitionSpec == null
-                                ? catalog.listPartitions(tablePath)
-                                : catalog.listPartitions(tablePath, 
partitionSpec);
-                List<String> partitionNames = new 
ArrayList<>(partitionSpecs.size());
-                for (CatalogPartitionSpec spec : partitionSpecs) {
-                    List<String> partitionKVs = new 
ArrayList<>(spec.getPartitionSpec().size());
-                    for (Map.Entry<String, String> partitionKV :
-                            spec.getPartitionSpec().entrySet()) {
-                        String partitionValue =
-                                partitionKV.getValue() == null
-                                        ? 
showPartitionsOperation.getDefaultPartitionName()
-                                        : partitionKV.getValue();
-                        partitionKVs.add(partitionKV.getKey() + "=" + 
partitionValue);
-                    }
-                    partitionNames.add(String.join("/", partitionKVs));
-                }
-                return buildShowResult("partition name", 
partitionNames.toArray(new String[0]));
-            } catch (TableNotExistException e) {
-                throw new ValidationException(exMsg, e);
-            } catch (Exception e) {
-                throw new TableException(exMsg, e);
-            }
         } else if (operation instanceof ExplainOperation) {
             ExplainOperation explainOperation = (ExplainOperation) operation;
             ExplainDetail[] explainDetails =
@@ -1246,13 +1068,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
-    private TableResultInternal buildShowResult(String columnName, String[] 
objects) {
-        return buildResult(
-                new String[] {columnName},
-                new DataType[] {DataTypes.STRING()},
-                Arrays.stream(objects).map((c) -> new String[] 
{c}).toArray(String[][]::new));
-    }
-
     private TableResultInternal buildDescribeResult(ResolvedSchema schema) {
         Object[][] rows = buildTableColumns(schema);
         boolean nonComments = isSchemaNonColumnComments(schema);
@@ -1288,57 +1103,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         return result.toArray(new String[0]);
     }
 
-    private TableResultInternal buildShowTablesResult(
-            String[] tableList, ShowTablesOperation showTablesOp) {
-        String[] rows = tableList.clone();
-        if (showTablesOp.isUseLike()) {
-            rows =
-                    Arrays.stream(tableList)
-                            .filter(
-                                    row ->
-                                            showTablesOp.isNotLike()
-                                                    != SqlLikeUtils.like(
-                                                            row,
-                                                            
showTablesOp.getLikePattern(),
-                                                            "\\"))
-                            .toArray(String[]::new);
-        }
-        return buildShowResult("table name", rows);
-    }
-
-    private TableResultInternal buildShowColumnsResult(
-            ResolvedSchema schema, ShowColumnsOperation showColumnsOp) {
-        Object[][] rows = buildTableColumns(schema);
-        if (showColumnsOp.isUseLike()) {
-            rows =
-                    Arrays.stream(rows)
-                            .filter(
-                                    row ->
-                                            showColumnsOp.isNotLike()
-                                                    != SqlLikeUtils.like(
-                                                            row[0].toString(),
-                                                            
showColumnsOp.getLikePattern(),
-                                                            "\\"))
-                            .toArray(Object[][]::new);
-        }
-        boolean nonComments = isSchemaNonColumnComments(schema);
-        return buildResult(
-                generateTableColumnsNames(nonComments),
-                generateTableColumnsDataTypes(nonComments),
-                rows);
-    }
-
-    private TableResultInternal buildShowFullModulesResult(ModuleEntry[] 
moduleEntries) {
-        Object[][] rows =
-                Arrays.stream(moduleEntries)
-                        .map(entry -> new Object[] {entry.name(), 
entry.used()})
-                        .toArray(Object[][]::new);
-        return buildResult(
-                new String[] {"module name", "used"},
-                new DataType[] {DataTypes.STRING(), DataTypes.BOOLEAN()},
-                rows);
-    }
-
     private Object[][] buildTableColumns(ResolvedSchema schema) {
         Map<String, String> fieldToWatermark =
                 schema.getWatermarkSpecs().stream()
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java
new file mode 100644
index 00000000000..3e0869cc615
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.internal;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.types.Row;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/** Utilities to build {@link TableResultInternal}. */
+public class TableResultUtils {
+
+    /**
+     * Build a {@link TableResultInternal} for {@code String[]} values with 
single column.
+     *
+     * @param columnName the column name of the result values.
+     * @param values the string values of the result.
+     */
+    public static TableResultInternal buildStringArrayResult(String 
columnName, String[] values) {
+        return buildTableResult(
+                new String[] {columnName},
+                new DataType[] {DataTypes.STRING()},
+                Arrays.stream(values).map((c) -> new String[] 
{c}).toArray(String[][]::new));
+    }
+
+    public static TableResultInternal buildTableResult(
+            String[] headers, DataType[] types, Object[][] rows) {
+        ResolvedSchema schema = ResolvedSchema.physical(headers, types);
+        ResultProvider provider =
+                new StaticResultProvider(
+                        
Arrays.stream(rows).map(Row::of).collect(Collectors.toList()));
+        return TableResultImpl.builder()
+                .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                .schema(ResolvedSchema.physical(headers, types))
+                .resultProvider(provider)
+                .setPrintStyle(
+                        PrintStyle.tableauWithDataInferredColumnWidths(
+                                schema,
+                                provider.getRowDataStringConverter(),
+                                Integer.MAX_VALUE,
+                                true,
+                                false))
+                .build();
+    }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java
index 053c4682046..e608c3fb5f6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCatalogsOperation.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW CATALOGS statement. */
 public class ShowCatalogsOperation implements ShowOperation {
 
@@ -25,4 +29,11 @@ public class ShowCatalogsOperation implements ShowOperation {
     public String asSummaryString() {
         return "SHOW CATALOGS";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        String[] catalogs =
+                
ctx.getCatalogManager().listCatalogs().stream().sorted().toArray(String[]::new);
+        return buildStringArrayResult("catalog name", catalogs);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java
index b001ff88d77..71fdc6f1a11 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowColumnsOperation.java
@@ -18,7 +18,29 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.functions.SqlLikeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
 
 /** Show columns from [[catalog.]database.]table. */
 public class ShowColumnsOperation implements ShowOperation {
@@ -26,12 +48,12 @@ public class ShowColumnsOperation implements ShowOperation {
     private final ObjectIdentifier tableIdentifier;
     private final boolean useLike;
     private final boolean notLike;
-    private final String likePattern;
+    private final @Nullable String likePattern;
     private final String preposition;
 
     public ShowColumnsOperation(
             ObjectIdentifier tableIdentifier,
-            String likePattern,
+            @Nullable String likePattern,
             boolean useLike,
             boolean notLike,
             String preposition) {
@@ -76,4 +98,108 @@ public class ShowColumnsOperation implements ShowOperation {
         }
         return String.format("SHOW COLUMNS %s %s", preposition, 
tableIdentifier.asSummaryString());
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        Optional<ContextResolvedTable> result = 
ctx.getCatalogManager().getTable(tableIdentifier);
+        if (!result.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "Tables or views with the identifier '%s' doesn't 
exist.",
+                            tableIdentifier.asSummaryString()));
+        }
+
+        ResolvedSchema schema = result.get().getResolvedSchema();
+        Object[][] rows = generateTableColumnsRows(schema);
+        if (useLike) {
+            rows =
+                    Arrays.stream(rows)
+                            .filter(
+                                    row ->
+                                            notLike
+                                                    != SqlLikeUtils.like(
+                                                            row[0].toString(), 
likePattern, "\\"))
+                            .toArray(Object[][]::new);
+        }
+        boolean nonComments = isSchemaNonColumnComments(schema);
+        return buildTableResult(
+                generateTableColumnsNames(nonComments),
+                generateTableColumnsDataTypes(nonComments),
+                rows);
+    }
+
+    private Object[][] generateTableColumnsRows(ResolvedSchema schema) {
+        Map<String, String> fieldToWatermark =
+                schema.getWatermarkSpecs().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        WatermarkSpec::getRowtimeAttribute,
+                                        spec -> 
spec.getWatermarkExpression().asSummaryString()));
+
+        Map<String, String> fieldToPrimaryKey = new HashMap<>();
+        schema.getPrimaryKey()
+                .ifPresent(
+                        (p) -> {
+                            List<String> columns = p.getColumns();
+                            columns.forEach(
+                                    (c) ->
+                                            fieldToPrimaryKey.put(
+                                                    c,
+                                                    String.format(
+                                                            "PRI(%s)",
+                                                            String.join(", ", 
columns))));
+                        });
+        boolean nonComments = isSchemaNonColumnComments(schema);
+        return schema.getColumns().stream()
+                .map(
+                        (c) -> {
+                            final LogicalType logicalType = 
c.getDataType().getLogicalType();
+                            final ArrayList<Object> result =
+                                    new ArrayList<>(
+                                            Arrays.asList(
+                                                    c.getName(),
+                                                    
logicalType.copy(true).asSummaryString(),
+                                                    logicalType.isNullable(),
+                                                    
fieldToPrimaryKey.getOrDefault(
+                                                            c.getName(), null),
+                                                    
c.explainExtras().orElse(null),
+                                                    
fieldToWatermark.getOrDefault(
+                                                            c.getName(), 
null)));
+                            if (!nonComments) {
+                                result.add(c.getComment().orElse(null));
+                            }
+                            return result.toArray();
+                        })
+                .toArray(Object[][]::new);
+    }
+
+    private boolean isSchemaNonColumnComments(ResolvedSchema schema) {
+        return 
schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent);
+    }
+
+    private String[] generateTableColumnsNames(boolean nonComments) {
+        final ArrayList<String> result =
+                new ArrayList<>(
+                        Arrays.asList("name", "type", "null", "key", "extras", 
"watermark"));
+        if (!nonComments) {
+            result.add("comment");
+        }
+        return result.toArray(new String[0]);
+    }
+
+    private DataType[] generateTableColumnsDataTypes(boolean nonComments) {
+        final ArrayList<DataType> result =
+                new ArrayList<>(
+                        Arrays.asList(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.BOOLEAN(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING()));
+        if (!nonComments) {
+            result.add(DataTypes.STRING());
+        }
+        return result.toArray(new DataType[0]);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
index 47f6b3eed7d..5514fc1c2e4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateTableOperation.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW CREATE TABLE statement. */
 public class ShowCreateTableOperation implements ShowOperation {
 
@@ -37,4 +43,22 @@ public class ShowCreateTableOperation implements 
ShowOperation {
     public String asSummaryString() {
         return String.format("SHOW CREATE TABLE %s", 
tableIdentifier.asSummaryString());
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ContextResolvedTable table =
+                ctx.getCatalogManager()
+                        .getTable(tableIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "Could not execute 
SHOW CREATE TABLE. Table with identifier %s does not exist.",
+                                                        
tableIdentifier.asSerializableString())));
+        String resultRow =
+                ShowCreateUtil.buildShowCreateTableRow(
+                        table.getResolvedTable(), tableIdentifier, 
table.isTemporary());
+
+        return buildStringArrayResult("result", new String[] {resultRow});
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java
index 312c305cc56..5bfb091f22e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateViewOperation.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.ShowCreateUtil;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW CREATE VIEW statement. */
 public class ShowCreateViewOperation implements ShowOperation {
 
@@ -37,4 +43,23 @@ public class ShowCreateViewOperation implements 
ShowOperation {
     public String asSummaryString() {
         return String.format("SHOW CREATE VIEW %s", 
viewIdentifier.asSummaryString());
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        final ContextResolvedTable table =
+                ctx.getCatalogManager()
+                        .getTable(viewIdentifier)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "Could not execute 
SHOW CREATE VIEW. View with identifier %s does not exist.",
+                                                        
viewIdentifier.asSerializableString())));
+
+        String resultRow =
+                ShowCreateUtil.buildShowCreateViewRow(
+                        table.getResolvedTable(), viewIdentifier, 
table.isTemporary());
+
+        return buildStringArrayResult("result", new String[] {resultRow});
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java
index be4cb69cea6..3468f173c2b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java
@@ -18,10 +18,20 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW CURRENT CATALOG statement. */
 public class ShowCurrentCatalogOperation implements ShowOperation {
     @Override
     public String asSummaryString() {
         return "SHOW CURRENT CATALOG";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        return buildStringArrayResult(
+                "current catalog name", new String[] 
{ctx.getCatalogManager().getCurrentCatalog()});
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java
index 6d4573ddc09..6ad47d4950e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java
@@ -18,10 +18,21 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe SHOW CURRENT DATABASE operation. */
 public class ShowCurrentDatabaseOperation implements ShowOperation {
     @Override
     public String asSummaryString() {
         return "SHOW CURRENT DATABASE";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        return buildStringArrayResult(
+                "current database name",
+                new String[] {ctx.getCatalogManager().getCurrentDatabase()});
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java
index b2c7136c52f..8e2cd2d41b1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowDatabasesOperation.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW DATABASES statement. */
 public class ShowDatabasesOperation implements ShowOperation {
 
@@ -25,4 +29,14 @@ public class ShowDatabasesOperation implements ShowOperation 
{
     public String asSummaryString() {
         return "SHOW DATABASES";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        String[] databases =
+                ctx.getCatalogManager()
+                        
.getCatalogOrThrowException(ctx.getCatalogManager().getCurrentCatalog())
+                        .listDatabases()
+                        .toArray(new String[0]);
+        return buildStringArrayResult("database name", databases);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
index 466b0e7e89c..b36ac6b0fdc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowFunctionsOperation.java
@@ -18,6 +18,12 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW [USER] FUNCTIONS statement. */
 public class ShowFunctionsOperation implements ShowOperation {
 
@@ -57,4 +63,23 @@ public class ShowFunctionsOperation implements ShowOperation 
{
     public FunctionScope getFunctionScope() {
         return functionScope;
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        final String[] functionNames;
+        switch (functionScope) {
+            case USER:
+                functionNames = 
ctx.getFunctionCatalog().getUserDefinedFunctions();
+                break;
+            case ALL:
+                functionNames = ctx.getFunctionCatalog().getFunctions();
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "SHOW FUNCTIONS with %s scope is not 
supported.", functionScope));
+        }
+        Arrays.sort(functionNames);
+        return buildStringArrayResult("function name", functionNames);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java
index 582026cbbad..59fb5db51fd 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowModulesOperation.java
@@ -18,6 +18,16 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.module.ModuleEntry;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+
 /** Operation to describe a SHOW [FULL] MODULES statement. */
 public class ShowModulesOperation implements ShowOperation {
     private final boolean requireFull;
@@ -34,4 +44,23 @@ public class ShowModulesOperation implements ShowOperation {
     public boolean requireFull() {
         return requireFull;
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        if (requireFull) {
+            ModuleEntry[] fullModules =
+                    ctx.getModuleManager().listFullModules().toArray(new 
ModuleEntry[0]);
+            Object[][] rows =
+                    Arrays.stream(fullModules)
+                            .map(entry -> new Object[] {entry.name(), 
entry.used()})
+                            .toArray(Object[][]::new);
+            return buildTableResult(
+                    new String[] {"module name", "used"},
+                    new DataType[] {DataTypes.STRING(), DataTypes.BOOLEAN()},
+                    rows);
+        } else {
+            String[] modules = 
ctx.getModuleManager().listModules().toArray(new String[0]);
+            return buildStringArrayResult("module name", modules);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java
index 6f911f812d4..ff902ab6b52 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowOperation.java
@@ -25,4 +25,4 @@ import org.apache.flink.annotation.Internal;
  * TABLES, SHOW FUNCTIONS, SHOW PARTITIONS.
  */
 @Internal
-public interface ShowOperation extends Operation {}
+public interface ShowOperation extends Operation, ExecutableOperation {}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
index 08a3ca7ea18..1add82526cc 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java
@@ -18,11 +18,23 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW PARTITIONS statement. */
 public class ShowPartitionsOperation implements ShowOperation {
 
@@ -70,4 +82,36 @@ public class ShowPartitionsOperation implements 
ShowOperation {
         }
         return builder.toString();
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        try {
+            final ObjectPath tablePath = tableIdentifier.toObjectPath();
+            Catalog catalog =
+                    ctx.getCatalogManager()
+                            
.getCatalogOrThrowException(tableIdentifier.getCatalogName());
+            List<CatalogPartitionSpec> partitionSpecs =
+                    partitionSpec == null
+                            ? catalog.listPartitions(tablePath)
+                            : catalog.listPartitions(tablePath, partitionSpec);
+            List<String> partitionNames = new 
ArrayList<>(partitionSpecs.size());
+            for (CatalogPartitionSpec spec : partitionSpecs) {
+                List<String> partitionKVs = new 
ArrayList<>(spec.getPartitionSpec().size());
+                for (Map.Entry<String, String> partitionKV : 
spec.getPartitionSpec().entrySet()) {
+                    String partitionValue =
+                            partitionKV.getValue() == null
+                                    ? defaultPartitionName
+                                    : partitionKV.getValue();
+                    partitionKVs.add(partitionKV.getKey() + "=" + 
partitionValue);
+                }
+                partitionNames.add(String.join("/", partitionKVs));
+            }
+            return buildStringArrayResult("partition name", 
partitionNames.toArray(new String[0]));
+        } catch (TableNotExistException e) {
+            throw new ValidationException(
+                    String.format("Could not execute %s", asSummaryString()), 
e);
+        } catch (Exception e) {
+            throw new TableException(String.format("Could not execute %s", 
asSummaryString()), e);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
index faba1f2e122..03f8c63408c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowTablesOperation.java
@@ -18,6 +18,14 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.functions.SqlLikeUtils;
+
+import java.util.Set;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Operation to describe a SHOW TABLES statement. */
@@ -104,4 +112,33 @@ public class ShowTablesOperation implements ShowOperation {
         }
         return builder.toString();
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        final Set<String> tables;
+        if (preposition == null) {
+            tables = ctx.getCatalogManager().listTables();
+        } else {
+            Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(catalogName);
+            if (catalog.databaseExists(databaseName)) {
+                tables = ctx.getCatalogManager().listTables(catalogName, 
databaseName);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Database '%s'.'%s' doesn't exist.", 
catalogName, databaseName));
+            }
+        }
+
+        final String[] rows;
+        if (useLike) {
+            rows =
+                    tables.stream()
+                            .filter(row -> notLike != SqlLikeUtils.like(row, 
likePattern, "\\"))
+                            .sorted()
+                            .toArray(String[]::new);
+        } else {
+            rows = tables.stream().sorted().toArray(String[]::new);
+        }
+        return buildStringArrayResult("table name", rows);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
index 5cb25dd7e96..e181fd3b7f0 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowViewsOperation.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.table.operations;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
+
 /** Operation to describe a SHOW VIEWS statement. */
 public class ShowViewsOperation implements ShowOperation {
 
@@ -25,4 +29,11 @@ public class ShowViewsOperation implements ShowOperation {
     public String asSummaryString() {
         return "SHOW VIEWS";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        String[] views =
+                
ctx.getCatalogManager().listViews().stream().sorted().toArray(String[]::new);
+        return buildStringArrayResult("view name", views);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java
index d0995c208a7..2e28fb82130 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJarsOperation.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.table.operations.command;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.operations.ShowOperation;
+import org.apache.flink.table.resource.ResourceUri;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
 
 /** Operation to describe a SHOW JARS statement. */
 public class ShowJarsOperation implements ShowOperation {
@@ -27,4 +31,13 @@ public class ShowJarsOperation implements ShowOperation {
     public String asSummaryString() {
         return "SHOW JARS";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        String[] jars =
+                ctx.getResourceManager().getResources().keySet().stream()
+                        .map(ResourceUri::getUri)
+                        .toArray(String[]::new);
+        return buildStringArrayResult("jars", jars);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
index 24da96ca30d..f666f59629b 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/ShowJobsOperation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.operations.command;
 
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.operations.ShowOperation;
 
 /** Operation to describe a SHOW JOBS statement. */
@@ -27,4 +28,12 @@ public class ShowJobsOperation implements ShowOperation {
     public String asSummaryString() {
         return "SHOW JOBS";
     }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        // TODO: We may need to migrate the execution for ShowJobsOperation 
from SQL Gateway
+        //  OperationExecutor to here.
+        throw new UnsupportedOperationException(
+                "ShowJobsOperation doesn't support ExecutableOperation yet.");
+    }
 }

Reply via email to