This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d0a5023f989 [FLINK-28632][sql-gateway][hive] Allow to 
GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint
d0a5023f989 is described below

commit d0a5023f9896663552e4b8df3b12efb1c8a8c814
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 9 10:46:52 2022 +0800

    [FLINK-28632][sql-gateway][hive] Allow to 
GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint
    
    This closes #20493
---
 .../table/endpoint/hive/HiveServer2Endpoint.java   |  69 ++++-
 .../table/endpoint/hive/HiveServer2Schemas.java    |  71 +++++
 .../hive/util/OperationExecutorFactory.java        | 266 ++++++++++++++++--
 .../endpoint/hive/HiveServer2EndpointITCase.java   | 297 +++++++++++++++++----
 .../flink/table/gateway/api/SqlGatewayService.java |  13 +
 .../gateway/api/utils/MockedSqlGatewayService.java |   9 +
 .../gateway/service/SqlGatewayServiceImpl.java     |  14 +
 .../service/operation/OperationExecutor.java       |   8 +
 .../gateway/service/SqlGatewayServiceITCase.java   |  18 ++
 .../table/catalog/ResolvedCatalogBaseTable.java    |   2 +
 10 files changed, 691 insertions(+), 76 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 2786c8996d9..6c6335d07b3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -126,10 +126,13 @@ import static 
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HI
 import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
 import static 
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetColumnsExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
+import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetPrimaryKeys;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
-import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
+import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableTypesExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
+import static 
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTypeInfoExecutor;
 import static 
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
 import static 
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFlinkTableKinds;
 import static 
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
@@ -424,7 +427,7 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
         try {
             SessionHandle sessionHandle = 
toSessionHandle(tGetTypeInfoReq.getSessionHandle());
             OperationHandle operationHandle =
-                    service.submitOperation(sessionHandle, 
createGetTableInfoExecutor());
+                    service.submitOperation(sessionHandle, 
createGetTypeInfoExecutor());
             resp.setStatus(OK_STATUS);
             resp.setOperationHandle(
                     toTOperationHandle(
@@ -509,12 +512,46 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
 
     @Override
     public TGetTableTypesResp GetTableTypes(TGetTableTypesReq 
tGetTableTypesReq) throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetTableTypesResp resp = new TGetTableTypesResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetTableTypesReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(sessionHandle, 
createGetTableTypesExecutor());
+
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(sessionHandle, operationHandle, 
TOperationType.GET_TABLES));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetTableTypes.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
     public TGetColumnsResp GetColumns(TGetColumnsReq tGetColumnsReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetColumnsResp resp = new TGetColumnsResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetColumnsReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            createGetColumnsExecutor(
+                                    service,
+                                    sessionHandle,
+                                    tGetColumnsReq.getCatalogName(),
+                                    tGetColumnsReq.getSchemaName(),
+                                    tGetColumnsReq.getTableName(),
+                                    tGetColumnsReq.getColumnName()));
+
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(sessionHandle, operationHandle, 
TOperationType.GET_COLUMNS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetColumns.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
@@ -545,7 +582,29 @@ public class HiveServer2Endpoint implements 
TCLIService.Iface, SqlGatewayEndpoin
     @Override
     public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq 
tGetPrimaryKeysReq)
             throws TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetPrimaryKeysResp resp = new TGetPrimaryKeysResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetPrimaryKeysReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            createGetPrimaryKeys(
+                                    service,
+                                    sessionHandle,
+                                    tGetPrimaryKeysReq.getCatalogName(),
+                                    tGetPrimaryKeysReq.getSchemaName(),
+                                    tGetPrimaryKeysReq.getTableName()));
+
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(
+                            // hive's implementation use "GET_FUNCTIONS" here
+                            sessionHandle, operationHandle, 
TOperationType.GET_FUNCTIONS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetPrimaryKeys.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index d1879ef9062..31c76b2d212 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -85,6 +85,77 @@ public class HiveServer2Schemas {
                             .withComment(
                                     "The name which uniquely identifies this 
function within its schema"));
 
+    /** Schema for {@link HiveServer2Endpoint#GetColumns}. */
+    public static final ResolvedSchema GET_COLUMNS_SCHEMA =
+            buildSchema(
+                    Column.physical("TABLE_CAT", DataTypes.STRING())
+                            .withComment("Catalog name. NULL if not 
applicable."),
+                    Column.physical("TABLE_SCHEM", 
DataTypes.STRING()).withComment("Schema name."),
+                    Column.physical("TABLE_NAME", 
DataTypes.STRING()).withComment("Table name."),
+                    Column.physical("COLUMN_NAME", 
DataTypes.STRING()).withComment("Column name."),
+                    Column.physical("DATA_TYPE", DataTypes.INT())
+                            .withComment("SQL type from java.sql.Types."),
+                    Column.physical("TYPE_NAME", DataTypes.STRING())
+                            .withComment(
+                                    "Data source dependent type name, for a 
UDT the type name is fully qualified."),
+                    Column.physical("COLUMN_SIZE", DataTypes.INT())
+                            .withComment(
+                                    "Column size. For char or date types this 
is the maximum number of characters, for numeric or decimal types this is 
precision."),
+                    Column.physical("BUFFER_LENGTH", 
DataTypes.TINYINT()).withComment("Unused."),
+                    Column.physical("DECIMAL_DIGITS", DataTypes.INT())
+                            .withComment("The number of fractional digits."),
+                    Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+                            .withComment("Radix (typically either 10 or 2)."),
+                    Column.physical("NULLABLE", 
DataTypes.INT()).withComment("Is NULL allowed."),
+                    Column.physical("REMARKS", DataTypes.STRING())
+                            .withComment("Comment describing column (may be 
null)."),
+                    Column.physical("COLUMN_DEF", DataTypes.STRING())
+                            .withComment("Default value (may be null)."),
+                    Column.physical("SQL_DATA_TYPE", 
DataTypes.INT()).withComment("Unused."),
+                    Column.physical("SQL_DATETIME_SUB", 
DataTypes.INT()).withComment("Unused."),
+                    Column.physical("CHAR_OCTET_LENGTH", DataTypes.INT())
+                            .withComment(
+                                    "For char types the maximum number of 
bytes in the column."),
+                    Column.physical("ORDINAL_POSITION", DataTypes.INT())
+                            .withComment("Index of column in table (starting 
at 1)."),
+                    Column.physical("IS_NULLABLE", DataTypes.STRING())
+                            .withComment(
+                                    "\"NO\" means column definitely does not 
allow NULL values; \"YES\" means the column might allow NULL values. An empty 
string means nobody knows."),
+                    Column.physical("SCOPE_CATALOG", DataTypes.STRING())
+                            .withComment(
+                                    "Catalog of table that is the scope of a 
reference attribute (null if DATA_TYPE isn't REF)."),
+                    Column.physical("SCOPE_SCHEMA", DataTypes.STRING())
+                            .withComment(
+                                    "Schema of table that is the scope of a 
reference attribute (null if the DATA_TYPE isn't REF)."),
+                    Column.physical("SCOPE_TABLE", DataTypes.STRING())
+                            .withComment(
+                                    "Table name that this the scope of a 
reference attribute (null if the DATA_TYPE isn't REF)."),
+                    Column.physical("SOURCE_DATA_TYPE", DataTypes.SMALLINT())
+                            .withComment(
+                                    "Source type of a distinct type or 
user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't 
DISTINCT or user-generated REF)."),
+                    Column.physical("IS_AUTO_INCREMENT", DataTypes.STRING())
+                            .withComment("Indicates whether this column is 
auto incremented."));
+
+    /** Schema for {@link HiveServer2Endpoint#GetTableTypes}. */
+    public static final ResolvedSchema GET_TABLE_TYPES_SCHEMA =
+            buildSchema(
+                    Column.physical("TABLE_TYPE", DataTypes.STRING())
+                            .withComment("Table type name."));
+
+    /** Schema for {@link HiveServer2Endpoint#GetPrimaryKeys}. */
+    public static final ResolvedSchema GET_PRIMARY_KEYS_SCHEMA =
+            buildSchema(
+                    Column.physical("TABLE_CAT", DataTypes.STRING())
+                            .withComment("Table catalog (may be null)."),
+                    Column.physical("TABLE_SCHEM", DataTypes.STRING())
+                            .withComment("Table schema (may be null)."),
+                    Column.physical("TABLE_NAME", 
DataTypes.STRING()).withComment("Table name."),
+                    Column.physical("COLUMN_NAME", 
DataTypes.STRING()).withComment("Column name."),
+                    Column.physical("KEY_SEQ", DataTypes.INT())
+                            .withComment("Sequence number within primary 
key."),
+                    Column.physical("PK_NAME", DataTypes.STRING())
+                            .withComment("Primary key name (may be null)."));
+
     /** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
     public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
             buildSchema(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index d6397700e13..4052ac4dd53 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -19,9 +19,13 @@
 package org.apache.flink.table.endpoint.hive.util;
 
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -39,27 +43,35 @@ import 
org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde2.thrift.Type;
 
 import javax.annotation.Nullable;
 
 import java.sql.DatabaseMetaData;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_COLUMNS_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_FUNCTIONS_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_PRIMARY_KEYS_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLE_TYPES_SCHEMA;
 import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
 import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
 import static org.apache.hadoop.hive.serde2.thrift.Type.ARRAY_TYPE;
@@ -111,6 +123,37 @@ public class OperationExecutorFactory {
                         service, sessionHandle, catalogName, schemaName, 
tableName, tableKinds);
     }
 
+    public static Callable<ResultSet> createGetColumnsExecutor(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName,
+            @Nullable String tableName,
+            @Nullable String columnsName) {
+        return () ->
+                executeGetColumns(
+                        service, sessionHandle, catalogName, schemaName, 
tableName, columnsName);
+    }
+
+    public static Callable<ResultSet> createGetTableTypesExecutor() {
+        return () ->
+                buildResultSet(
+                        GET_TABLE_TYPES_SCHEMA,
+                        Arrays.stream(TableKind.values())
+                                .map(kind -> wrap(kind.name()))
+                                .collect(Collectors.toList()));
+    }
+
+    public static Callable<ResultSet> createGetPrimaryKeys(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName,
+            @Nullable String tableName) {
+        return () ->
+                executeGetPrimaryKeys(service, sessionHandle, catalogName, 
schemaName, tableName);
+    }
+
     public static Callable<ResultSet> createGetFunctionsExecutor(
             SqlGatewayService service,
             SessionHandle sessionHandle,
@@ -122,7 +165,7 @@ public class OperationExecutorFactory {
                         service, sessionHandle, catalogName, databasePattern, 
functionNamePattern);
     }
 
-    public static Callable<ResultSet> createGetTableInfoExecutor() {
+    public static Callable<ResultSet> createGetTypeInfoExecutor() {
         return () ->
                 buildResultSet(
                         GET_TYPE_INFO_SCHEMA,
@@ -175,7 +218,8 @@ public class OperationExecutorFactory {
         String specifiedCatalogName =
                 isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
         Set<String> databaseNames =
-                filter(service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
+                filterAndSort(
+                        service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
         return buildResultSet(
                 GET_SCHEMAS_SCHEMA,
                 databaseNames.stream()
@@ -190,21 +234,30 @@ public class OperationExecutorFactory {
             @Nullable String schemaName,
             @Nullable String tableName,
             Set<TableKind> tableKinds) {
-        Set<TableInfo> tableInfos = new HashSet<>();
+        Set<TableInfo> tableInfos = new LinkedHashSet<>();
+        Set<TableInfo> viewInfos = new LinkedHashSet<>();
+
         String specifiedCatalogName =
                 isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
         for (String schema :
-                filter(service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName)) {
-            tableInfos.addAll(
-                    filter(
+                filterAndSort(
+                        service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName)) {
+            for (TableInfo tableInfo :
+                    filterAndSort(
                             service.listTables(
                                     sessionHandle, specifiedCatalogName, 
schema, tableKinds),
                             candidate -> 
candidate.getIdentifier().getObjectName(),
-                            tableName));
+                            tableName)) {
+                if (tableInfo.getTableKind().equals(TableKind.TABLE)) {
+                    tableInfos.add(tableInfo);
+                } else {
+                    viewInfos.add(tableInfo);
+                }
+            }
         }
         return buildResultSet(
                 GET_TABLES_SCHEMA,
-                tableInfos.stream()
+                Stream.concat(tableInfos.stream(), viewInfos.stream())
                         .map(
                                 info ->
                                         wrap(
@@ -212,9 +265,9 @@ public class OperationExecutorFactory {
                                                 
info.getIdentifier().getDatabaseName(),
                                                 
info.getIdentifier().getObjectName(),
                                                 info.getTableKind().name(),
-                                                // It requires to load the 
CatalogFunction from the
+                                                // It requires to load the 
CatalogTable from the
                                                 // remote server, which is 
time wasted.
-                                                "",
+                                                null,
                                                 null,
                                                 null,
                                                 null,
@@ -223,6 +276,127 @@ public class OperationExecutorFactory {
                         .collect(Collectors.toList()));
     }
 
+    private static ResultSet executeGetColumns(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName,
+            @Nullable String tableName,
+            @Nullable String columnName) {
+        String specifiedCatalogName =
+                isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
+        Set<String> schemaNames =
+                filterAndSort(
+                        service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
+        Set<TableKind> tableKinds = new 
HashSet<>(Arrays.asList(TableKind.values()));
+
+        List<RowData> results = new ArrayList<>();
+        for (String schema : schemaNames) {
+            Set<TableInfo> tableInfos =
+                    filterAndSort(
+                            service.listTables(
+                                    sessionHandle, specifiedCatalogName, 
schema, tableKinds),
+                            candidates -> 
candidates.getIdentifier().getObjectName(),
+                            tableName);
+
+            for (TableInfo tableInfo : tableInfos) {
+                ResolvedCatalogBaseTable<?> table =
+                        service.getTable(sessionHandle, 
tableInfo.getIdentifier());
+                List<Column> columns = table.getResolvedSchema().getColumns();
+
+                Set<String> matchedColumnNames =
+                        filterAndSort(
+                                new 
HashSet<>(table.getResolvedSchema().getColumnNames()),
+                                columnName);
+                for (int i = 0; i < columns.size(); i++) {
+                    Column column = columns.get(i);
+                    if (!matchedColumnNames.contains(column.getName())) {
+                        continue;
+                    }
+                    Type hiveColumnType =
+                            
Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false));
+                    LogicalType flinkColumnType = 
column.getDataType().getLogicalType();
+                    results.add(
+                            wrap(
+                                    specifiedCatalogName,
+                                    
tableInfo.getIdentifier().getDatabaseName(),
+                                    tableInfo.getIdentifier().getObjectName(),
+                                    column.getName(),
+                                    hiveColumnType.toJavaSQLType(),
+                                    hiveColumnType.getName(),
+                                    getColumnSize(flinkColumnType),
+                                    null, // BUFFER_LENGTH, unused
+                                    getDecimalDigits(flinkColumnType),
+                                    hiveColumnType.getNumPrecRadix(),
+                                    flinkColumnType.isNullable()
+                                            ? DatabaseMetaData.columnNullable
+                                            : DatabaseMetaData.columnNoNulls,
+                                    column.getComment().orElse(null),
+                                    null, // COLUMN_DEF
+                                    null, // SQL_DATA_TYPE
+                                    null, // SQL_DATETIME_SUB
+                                    null, // CHAR_OCTET_LENGTH
+                                    i + 1,
+                                    flinkColumnType.isNullable() ? "YES" : 
"NO",
+                                    null, // SCOPE_CATALOG
+                                    null, // SCOPE_SCHEMA
+                                    null, // SCOPE_TABLE
+                                    null, // SOURCE_DATA_TYPE
+                                    "NO"));
+                }
+            }
+        }
+        return buildResultSet(GET_COLUMNS_SCHEMA, results);
+    }
+
+    private static ResultSet executeGetPrimaryKeys(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            @Nullable String catalogName,
+            @Nullable String schemaName,
+            @Nullable String tableName) {
+        String specifiedCatalogName =
+                isNullOrEmpty(catalogName) ? 
service.getCurrentCatalog(sessionHandle) : catalogName;
+        Set<String> schemaNames =
+                filterAndSort(
+                        service.listDatabases(sessionHandle, 
specifiedCatalogName), schemaName);
+        List<RowData> primaryKeys = new ArrayList<>();
+
+        for (String schema : schemaNames) {
+            Set<TableInfo> tableInfos =
+                    filterAndSort(
+                            service.listTables(
+                                    sessionHandle,
+                                    specifiedCatalogName,
+                                    schema,
+                                    new 
HashSet<>(Arrays.asList(TableKind.values()))),
+                            candidate -> 
candidate.getIdentifier().getObjectName(),
+                            tableName);
+
+            for (TableInfo tableInfo : tableInfos) {
+                ResolvedCatalogBaseTable<?> table =
+                        service.getTable(sessionHandle, 
tableInfo.getIdentifier());
+                UniqueConstraint primaryKey =
+                        table.getResolvedSchema().getPrimaryKey().orElse(null);
+                if (primaryKey == null) {
+                    continue;
+                }
+
+                for (int i = 0; i < primaryKey.getColumns().size(); i++) {
+                    primaryKeys.add(
+                            wrap(
+                                    specifiedCatalogName,
+                                    
tableInfo.getIdentifier().getDatabaseName(),
+                                    tableInfo.getIdentifier().getObjectName(),
+                                    primaryKey.getColumns().get(i),
+                                    i + 1,
+                                    primaryKey.getName()));
+                }
+            }
+        }
+        return buildResultSet(GET_PRIMARY_KEYS_SCHEMA, primaryKeys);
+    }
+
     private static ResultSet executeGetFunctions(
             SqlGatewayService service,
             SessionHandle sessionHandle,
@@ -235,7 +409,7 @@ public class OperationExecutorFactory {
         Set<FunctionInfo> candidates = new HashSet<>();
         // Add user defined functions
         for (String databaseName :
-                filter(
+                filterAndSort(
                         service.listDatabases(sessionHandle, 
specifiedCatalogName),
                         databasePattern)) {
             candidates.addAll(
@@ -248,7 +422,7 @@ public class OperationExecutorFactory {
         }
         // Filter out unmatched functions
         Set<FunctionInfo> matchedFunctions =
-                filter(
+                filterAndSort(
                         candidates,
                         candidate -> 
candidate.getIdentifier().getFunctionName(),
                         functionNamePattern);
@@ -302,18 +476,19 @@ public class OperationExecutorFactory {
         return input == null || input.isEmpty();
     }
 
-    private static Set<String> filter(Set<String> candidates, @Nullable String 
pattern) {
-        return filter(candidates, Function.identity(), pattern);
+    private static Set<String> filterAndSort(Set<String> candidates, @Nullable 
String pattern) {
+        return filterAndSort(candidates, Function.identity(), pattern);
     }
 
-    private static <T> Set<T> filter(
+    private static <T> Set<T> filterAndSort(
             Set<T> candidates, Function<T, String> featureGetter, @Nullable 
String pattern) {
         Pattern compiledPattern = convertNamePattern(pattern);
         return candidates.stream()
                 .filter(
                         candidate ->
                                 
compiledPattern.matcher(featureGetter.apply(candidate)).matches())
-                .collect(Collectors.toSet());
+                .sorted(Comparator.comparing(v -> 
featureGetter.apply(v).toLowerCase()))
+                .collect(Collectors.toCollection(LinkedHashSet::new));
     }
 
     /**
@@ -431,4 +606,63 @@ public class OperationExecutorFactory {
             return definition.getClass().getCanonicalName();
         }
     }
+
+    /**
+     * The column size for this type. For numeric data this is the maximum 
precision. For character
+     * data this is the length in characters. For datetime types this is the 
length in characters of
+     * the String representation (assuming the maximum allowed precision of 
the fractional seconds
+     * component). For binary data this is the length in bytes. Null is 
returned for data types
+     * where the column size is not applicable.
+     */
+    private static @Nullable Integer getColumnSize(LogicalType columnType) {
+        switch (columnType.getTypeRoot()) {
+            case TINYINT:
+                return 3;
+            case SMALLINT:
+                return 5;
+            case INTEGER:
+            case DATE:
+                return 10;
+            case BIGINT:
+                return 19;
+            case FLOAT:
+                return 7;
+            case DOUBLE:
+                return 15;
+            case DECIMAL:
+                return ((DecimalType) columnType).getScale();
+            case VARCHAR:
+            case BINARY:
+                return Integer.MAX_VALUE;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return 29;
+            default:
+                return null;
+        }
+    }
+
+    /**
+     * The number of fractional digits for this type. Null is returned for 
data types where this is
+     * not applicable.
+     */
+    private static @Nullable Integer getDecimalDigits(LogicalType columnType) {
+        switch (columnType.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+                return 0;
+            case FLOAT:
+                return 7;
+            case DOUBLE:
+                return 15;
+            case DECIMAL:
+                return ((DecimalType) columnType).getScale();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return 9;
+            default:
+                return null;
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index ab02e770e36..4f474bf4254 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -44,7 +45,7 @@ import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
-import org.apache.hive.jdbc.JdbcColumn;
+import org.apache.hadoop.hive.serde2.thrift.Type;
 import org.apache.hive.service.rpc.thrift.TCLIService;
 import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
 import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -68,16 +69,14 @@ import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.ResultSetMetaData;
 import java.sql.Statement;
+import java.sql.Types;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -212,8 +211,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 connection -> connection.getMetaData().getCatalogs(),
                 ResolvedSchema.of(Column.physical("TABLE_CAT", 
DataTypes.STRING())),
                 Arrays.asList(
-                        Collections.singletonList("hive"),
-                        Collections.singletonList("default_catalog")));
+                        Collections.singletonList("default_catalog"),
+                        Collections.singletonList("hive")));
     }
 
     @Test
@@ -222,10 +221,10 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
                 connection -> 
connection.getMetaData().getSchemas("default_catalog", null),
                 getExpectedGetSchemasOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_database", "default_catalog"),
+                        Arrays.asList("db_diff", "default_catalog"),
                         Arrays.asList("db_test1", "default_catalog"),
                         Arrays.asList("db_test2", "default_catalog"),
-                        Arrays.asList("db_diff", "default_catalog")));
+                        Arrays.asList("default_database", "default_catalog")));
     }
 
     @Test
@@ -251,16 +250,16 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
                                         new String[] {"MANAGED_TABLE", 
"VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_test1", "tbl_1", 
"TABLE", ""),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"TABLE", ""),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_1", 
"TABLE", ""),
-                        Arrays.asList("default_catalog", "db_test2", "diff_1", 
"TABLE", ""),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_test2", "diff_2", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_diff", "tbl_1", 
"TABLE", ""),
-                        Arrays.asList("default_catalog", "db_diff", "tbl_2", 
"VIEW", "")));
+                        Arrays.asList("default_catalog", "db_diff", "tbl_1", 
"TABLE"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_1", 
"TABLE"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"TABLE"),
+                        Arrays.asList("default_catalog", "db_test2", "diff_1", 
"TABLE"),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_1", 
"TABLE"),
+                        Arrays.asList("default_catalog", "db_diff", "tbl_2", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test2", "diff_2", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW")));
     }
 
     @Test
@@ -276,9 +275,152 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
                                         new String[] {"VIRTUAL_VIEW"}),
                 getExpectedGetTablesOperationSchema(),
                 Arrays.asList(
-                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW", ""),
-                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW", "")));
+                        Arrays.asList("default_catalog", "db_test1", "tbl_3", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_4", 
"VIEW"),
+                        Arrays.asList("default_catalog", "db_test2", "tbl_2", 
"VIEW")));
+    }
+
+    @Test
+    public void testGetTableTypes() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getTableTypes(),
+                ResolvedSchema.of(Column.physical("TABLE_TYPE", 
DataTypes.STRING())),
+                Arrays.stream(TableKind.values())
+                        .map(kind -> Collections.singletonList((Object) 
kind.name()))
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    void testGetColumns() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getColumns(null, null, 
null, null),
+                getExpectedGetColumnsOperationSchema(),
+                rows ->
+                        assertThat(
+                                        rows.stream()
+                                                .map(
+                                                        row ->
+                                                                Arrays.asList(
+                                                                        
row.get(0), // CATALOG NAME
+                                                                        
row.get(1), // SCHEMA NAME
+                                                                        
row.get(2), // TABLE NAME
+                                                                        
row.get(3), // COLUMN NAME
+                                                                        
row.get(5))) // TYPE NAME
+                                                .collect(Collectors.toList()))
+                                .isEqualTo(
+                                        Arrays.asList(
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_diff",
+                                                        "tbl_2",
+                                                        "EXPR$0",
+                                                        "INT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_1",
+                                                        "user",
+                                                        "BIGINT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_1",
+                                                        "product",
+                                                        "STRING"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_1",
+                                                        "amount",
+                                                        "INT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_2",
+                                                        "user",
+                                                        "STRING"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_2",
+                                                        "id",
+                                                        "BIGINT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_2",
+                                                        "timestamp",
+                                                        "TIMESTAMP"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_3",
+                                                        "EXPR$0",
+                                                        "INT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test1",
+                                                        "tbl_4",
+                                                        "EXPR$0",
+                                                        "INT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test2",
+                                                        "diff_2",
+                                                        "EXPR$0",
+                                                        "INT"),
+                                                Arrays.asList(
+                                                        "default_catalog",
+                                                        "db_test2",
+                                                        "tbl_2",
+                                                        "EXPR$0",
+                                                        "INT"))));
+    }
+
+    @Test
+    public void testGetColumnsWithPattern() throws Exception {
+        runGetObjectTest(
+                connection ->
+                        connection
+                                .getMetaData()
+                                .getColumns("default_catalog", "db\\_test_", 
"tbl\\_1", "user"),
+                getExpectedGetColumnsOperationSchema(),
+                Collections.singletonList(
+                        Arrays.asList(
+                                "default_catalog",
+                                "db_test1",
+                                "tbl_1",
+                                "user",
+                                Types.BIGINT,
+                                "BIGINT",
+                                String.valueOf(Long.MAX_VALUE).length(),
+                                0, // digits number
+                                10, // radix
+                                0, // nullable
+                                1, // position
+                                "NO", // isNullable
+                                "NO"))); // isAutoIncrement
+    }
+
+    @Test
+    public void testGetPrimaryKey() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getPrimaryKeys(null, 
null, null),
+                getExpectedGetPrimaryKeysOperationSchema(),
+                Arrays.asList(
+                        Arrays.asList("default_catalog", "db_test1", "tbl_1", 
"user", 1, "pk"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"user", 1, "pk"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"id", 2, "pk")));
+    }
+
+    @Test
+    public void testGetPrimaryKeyWithPattern() throws Exception {
+        runGetObjectTest(
+                connection -> connection.getMetaData().getPrimaryKeys(null, 
null, "tbl_2"),
+                getExpectedGetPrimaryKeysOperationSchema(),
+                Arrays.asList(
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"user", 1, "pk"),
+                        Arrays.asList("default_catalog", "db_test1", "tbl_2", 
"id", 2, "pk")));
     }
 
     @Test
@@ -393,34 +535,42 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
 
     private Connection getInitializedConnection() throws Exception {
         Connection connection = ENDPOINT_EXTENSION.getConnection();
-        Statement statement = connection.createStatement();
-        statement.execute("SET table.sql-dialect=default");
-        statement.execute("USE CATALOG `default_catalog`");
-
-        // default_catalog: db_test1 | db_test2 | db_diff | default
-        //     db_test1: temporary table tbl_1, table tbl_2, temporary view 
tbl_3, view tbl_4
-        //     db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
-        //     db_diff:  table tbl_1, view tbl_2
-
-        statement.execute("CREATE DATABASE db_test1");
-        statement.execute("CREATE DATABASE db_test2");
-        statement.execute("CREATE DATABASE db_diff");
-
-        statement.execute("CREATE TEMPORARY TABLE db_test1.tbl_1 COMMENT 
'temporary table tbl_1'");
-        statement.execute("CREATE TABLE db_test1.tbl_2 COMMENT 'table tbl_2'");
-        statement.execute(
-                "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary view 
tbl_3' AS SELECT 1");
-        statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4' AS 
SELECT 1");
-
-        statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table tbl_1'");
-        statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table 
diff_1'");
-        statement.execute("CREATE VIEW db_test2.tbl_2 COMMENT 'view tbl_2' AS 
SELECT 1");
-        statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view diff_2' 
AS SELECT 1");
-
-        statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table tbl_1'");
-        statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2' AS 
SELECT 1");
-
-        statement.close();
+        try (Statement statement = connection.createStatement()) {
+            statement.execute("SET table.sql-dialect=default");
+            statement.execute("USE CATALOG `default_catalog`");
+
+            // default_catalog: db_test1 | db_test2 | db_diff | default
+            //     db_test1: temporary table tbl_1, table tbl_2, temporary 
view tbl_3, view tbl_4
+            //     db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
+            //     db_diff:  table tbl_1, view tbl_2
+
+            statement.execute("CREATE DATABASE db_test1");
+            statement.execute("CREATE DATABASE db_test2");
+            statement.execute("CREATE DATABASE db_diff");
+
+            statement.execute(
+                    "CREATE TEMPORARY TABLE db_test1.tbl_1(\n"
+                            + "`user` BIGINT CONSTRAINT `pk` PRIMARY KEY 
COMMENT 'user id.',\n"
+                            + "`product` STRING NOT NULL,\n"
+                            + "`amount`  INT) COMMENT 'temporary table 
tbl_1'");
+            statement.execute(
+                    "CREATE TABLE db_test1.tbl_2(\n"
+                            + "`user` STRING COMMENT 'user name.',\n"
+                            + "`id` BIGINT COMMENT 'user id.',\n"
+                            + "`timestamp` TIMESTAMP,"
+                            + "CONSTRAINT `pk` PRIMARY KEY(`user`, `id`) NOT 
ENFORCED) COMMENT 'table tbl_2'");
+            statement.execute(
+                    "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary 
view tbl_3' AS SELECT 1");
+            statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4' 
AS SELECT 1");
+
+            statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table 
tbl_1'");
+            statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table 
diff_1'");
+            statement.execute("CREATE VIEW db_test2.tbl_2 COMMENT 'view tbl_2' 
AS SELECT 1");
+            statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view 
diff_2' AS SELECT 1");
+
+            statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table 
tbl_1'");
+            statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2' 
AS SELECT 1");
+        }
         return connection;
     }
 
@@ -432,13 +582,13 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
         runGetObjectTest(
                 resultSetSupplier,
                 expectedSchema,
-                result -> assertThat(result).isEqualTo(new 
HashSet<>(expectedResults)));
+                result -> assertThat(result).isEqualTo(expectedResults));
     }
 
     private void runGetObjectTest(
             FunctionWithException<Connection, java.sql.ResultSet, Exception> 
resultSetSupplier,
             ResolvedSchema expectedSchema,
-            Consumer<Set<List<Object>>> validator)
+            Consumer<List<List<Object>>> validator)
             throws Exception {
         try (Connection connection = getInitializedConnection();
                 java.sql.ResultSet result = 
resultSetSupplier.apply(connection)) {
@@ -506,6 +656,43 @@ public class HiveServer2EndpointITCase extends TestLogger {
                 Column.physical("REF_GENERATION", DataTypes.STRING()));
     }
 
+    private ResolvedSchema getExpectedGetColumnsOperationSchema() {
+        return ResolvedSchema.of(
+                Column.physical("TABLE_CAT", DataTypes.STRING()),
+                Column.physical("TABLE_SCHEM", DataTypes.STRING()),
+                Column.physical("TABLE_NAME", DataTypes.STRING()),
+                Column.physical("COLUMN_NAME", DataTypes.STRING()),
+                Column.physical("DATA_TYPE", DataTypes.INT()),
+                Column.physical("TYPE_NAME", DataTypes.STRING()),
+                Column.physical("COLUMN_SIZE", DataTypes.INT()),
+                Column.physical("BUFFER_LENGTH", DataTypes.TINYINT()),
+                Column.physical("DECIMAL_DIGITS", DataTypes.INT()),
+                Column.physical("NUM_PREC_RADIX", DataTypes.INT()),
+                Column.physical("NULLABLE", DataTypes.INT()),
+                Column.physical("REMARKS", DataTypes.STRING()),
+                Column.physical("COLUMN_DEF", DataTypes.STRING()),
+                Column.physical("SQL_DATA_TYPE", DataTypes.INT()),
+                Column.physical("SQL_DATETIME_SUB", DataTypes.INT()),
+                Column.physical("CHAR_OCTET_LENGTH", DataTypes.INT()),
+                Column.physical("ORDINAL_POSITION", DataTypes.INT()),
+                Column.physical("IS_NULLABLE", DataTypes.STRING()),
+                Column.physical("SCOPE_CATALOG", DataTypes.STRING()),
+                Column.physical("SCOPE_SCHEMA", DataTypes.STRING()),
+                Column.physical("SCOPE_TABLE", DataTypes.STRING()),
+                Column.physical("SOURCE_DATA_TYPE", DataTypes.SMALLINT()),
+                Column.physical("IS_AUTO_INCREMENT", DataTypes.STRING()));
+    }
+
+    private ResolvedSchema getExpectedGetPrimaryKeysOperationSchema() {
+        return ResolvedSchema.of(
+                Column.physical("TABLE_CAT", DataTypes.STRING()),
+                Column.physical("TABLE_SCHEM", DataTypes.STRING()),
+                Column.physical("TABLE_NAME", DataTypes.STRING()),
+                Column.physical("COLUMN_NAME", DataTypes.STRING()),
+                Column.physical("KEY_SEQ", DataTypes.INT()),
+                Column.physical("PK_NAME", DataTypes.STRING()));
+    }
+
     private ResolvedSchema getExpectedGetTypeInfoSchema() {
         return ResolvedSchema.of(
                 Column.physical("TYPE_NAME", DataTypes.STRING()),
@@ -537,13 +724,13 @@ public class HiveServer2EndpointITCase extends TestLogger 
{
                             .orElseThrow(() -> new RuntimeException("Can not 
get column."));
             assertThat(metaData.getColumnName(i)).isEqualTo(column.getName());
             int jdbcType =
-                    JdbcColumn.hiveTypeToSqlType(
-                            HiveTypeUtil.toHiveTypeInfo(column.getDataType(), 
false).getTypeName());
+                    
Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false))
+                            .toJavaSQLType();
             assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType);
         }
     }
 
-    private Set<List<Object>> collectAndCompact(java.sql.ResultSet result, int 
columnCount)
+    private List<List<Object>> collectAndCompact(java.sql.ResultSet result, 
int columnCount)
             throws Exception {
         List<List<Object>> actual = new ArrayList<>();
         while (result.next()) {
@@ -558,6 +745,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
             }
             actual.add(row);
         }
-        return new LinkedHashSet<>(actual);
+        return actual;
     }
 }
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index f12c44250c4..9bdc9c751b7 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.gateway.api;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -237,6 +239,17 @@ public interface SqlGatewayService {
             Set<TableKind> tableKinds)
             throws SqlGatewayException;
 
+    /**
+     * Return table of the given fully qualified name.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param tableIdentifier fully qualified name of the table.
+     * @return information of the table.
+     */
+    ResolvedCatalogBaseTable<?> getTable(
+            SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+            throws SqlGatewayException;
+
     /**
      * List all user defined functions.
      *
diff --git 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 8508edbd23a..a9c6bd77cc7 100644
--- 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.gateway.api.utils;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -170,4 +172,11 @@ public class MockedSqlGatewayService implements 
SqlGatewayService {
     public GatewayInfo getGatewayInfo() {
         throw new UnsupportedOperationException();
     }
+
+    @Override
+    public ResolvedCatalogBaseTable<?> getTable(
+            SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 8114fc5b79a..4bdbafe5ab7 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.gateway.service;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -255,6 +257,18 @@ public class SqlGatewayServiceImpl implements 
SqlGatewayService {
         }
     }
 
+    @Override
+    public ResolvedCatalogBaseTable<?> getTable(
+            SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+            throws SqlGatewayException {
+        try {
+            return 
getSession(sessionHandle).createExecutor().getTable(tableIdentifier);
+        } catch (Throwable t) {
+            LOG.error("Failed to getTable.", t);
+            throw new SqlGatewayException("Failed to getTable.", t);
+        }
+    }
+
     @Override
     public Set<FunctionInfo> listUserDefinedFunctions(
             SessionHandle sessionHandle, String catalogName, String 
databaseName)
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 87c4661f24d..569e838c3a2 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.data.GenericRowData;
@@ -156,6 +157,13 @@ public class OperationExecutor {
         }
     }
 
+    public ResolvedCatalogBaseTable<?> getTable(ObjectIdentifier 
tableIdentifier) {
+        return getTableEnvironment()
+                .getCatalogManager()
+                .getTableOrError(tableIdentifier)
+                .getResolvedTable();
+    }
+
     public Set<FunctionInfo> listUserDefinedFunctions(String catalogName, 
String databaseName) {
         return sessionContext.getSessionState().functionCatalog
                 .getUserDefinedFunctions(catalogName, databaseName).stream()
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 6447771a33d..685bd04cb30 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -468,6 +470,22 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
                                                 "table_func0"))));
     }
 
+    @Test
+    public void testGetTable() {
+        SessionHandle sessionHandle = createInitializedSession();
+        ResolvedCatalogTable actualTable =
+                (ResolvedCatalogTable)
+                        service.getTable(sessionHandle, 
ObjectIdentifier.of("cat1", "db1", "tbl1"));
+        
assertThat(actualTable.getResolvedSchema()).isEqualTo(ResolvedSchema.of());
+        assertThat(actualTable.getOptions())
+                .isEqualTo(Collections.singletonMap("connector", "values"));
+
+        ResolvedCatalogView actualView =
+                (ResolvedCatalogView)
+                        service.getTable(sessionHandle, 
ObjectIdentifier.of("cat1", "db1", "tbl3"));
+        assertThat(actualView.getOriginalQuery()).isEqualTo("SELECT 1");
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Concurrent tests
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
index 27f40a353c4..840374eeaa6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.TableSchema;
 
 /**
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.TableSchema;
  *
  * @param <T> {@link CatalogTable} or {@link CatalogView}
  */
+@PublicEvolving
 public interface ResolvedCatalogBaseTable<T extends CatalogBaseTable> extends 
CatalogBaseTable {
 
     /**

Reply via email to