This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d0a5023f989 [FLINK-28632][sql-gateway][hive] Allow to
GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint
d0a5023f989 is described below
commit d0a5023f9896663552e4b8df3b12efb1c8a8c814
Author: yuzelin <[email protected]>
AuthorDate: Tue Aug 9 10:46:52 2022 +0800
[FLINK-28632][sql-gateway][hive] Allow to
GetColumns/GetPrimaryKeys/GetTableTypes in the HiveServer2 Endpoint
This closes #20493
---
.../table/endpoint/hive/HiveServer2Endpoint.java | 69 ++++-
.../table/endpoint/hive/HiveServer2Schemas.java | 71 +++++
.../hive/util/OperationExecutorFactory.java | 266 ++++++++++++++++--
.../endpoint/hive/HiveServer2EndpointITCase.java | 297 +++++++++++++++++----
.../flink/table/gateway/api/SqlGatewayService.java | 13 +
.../gateway/api/utils/MockedSqlGatewayService.java | 9 +
.../gateway/service/SqlGatewayServiceImpl.java | 14 +
.../service/operation/OperationExecutor.java | 8 +
.../gateway/service/SqlGatewayServiceITCase.java | 18 ++
.../table/catalog/ResolvedCatalogBaseTable.java | 2 +
10 files changed, 691 insertions(+), 76 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
index 2786c8996d9..6c6335d07b3 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java
@@ -126,10 +126,13 @@ import static
org.apache.flink.table.endpoint.hive.HiveServer2EndpointVersion.HI
import static
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.getUsedDefaultDatabase;
import static
org.apache.flink.table.endpoint.hive.util.HiveJdbcParameterUtils.validateAndNormalize;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetCatalogsExecutor;
+import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetColumnsExecutor;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetFunctionsExecutor;
+import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetPrimaryKeys;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetSchemasExecutor;
-import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableInfoExecutor;
+import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTableTypesExecutor;
import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTablesExecutor;
+import static
org.apache.flink.table.endpoint.hive.util.OperationExecutorFactory.createGetTypeInfoExecutor;
import static
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFetchOrientation;
import static
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toFlinkTableKinds;
import static
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toOperationHandle;
@@ -424,7 +427,7 @@ public class HiveServer2Endpoint implements
TCLIService.Iface, SqlGatewayEndpoin
try {
SessionHandle sessionHandle =
toSessionHandle(tGetTypeInfoReq.getSessionHandle());
OperationHandle operationHandle =
- service.submitOperation(sessionHandle,
createGetTableInfoExecutor());
+ service.submitOperation(sessionHandle,
createGetTypeInfoExecutor());
resp.setStatus(OK_STATUS);
resp.setOperationHandle(
toTOperationHandle(
@@ -509,12 +512,46 @@ public class HiveServer2Endpoint implements
TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetTableTypesResp GetTableTypes(TGetTableTypesReq
tGetTableTypesReq) throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetTableTypesResp resp = new TGetTableTypesResp();
+ try {
+ SessionHandle sessionHandle =
toSessionHandle(tGetTableTypesReq.getSessionHandle());
+ OperationHandle operationHandle =
+ service.submitOperation(sessionHandle,
createGetTableTypesExecutor());
+
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(sessionHandle, operationHandle,
TOperationType.GET_TABLES));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetTableTypes.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
public TGetColumnsResp GetColumns(TGetColumnsReq tGetColumnsReq) throws
TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetColumnsResp resp = new TGetColumnsResp();
+ try {
+ SessionHandle sessionHandle =
toSessionHandle(tGetColumnsReq.getSessionHandle());
+ OperationHandle operationHandle =
+ service.submitOperation(
+ sessionHandle,
+ createGetColumnsExecutor(
+ service,
+ sessionHandle,
+ tGetColumnsReq.getCatalogName(),
+ tGetColumnsReq.getSchemaName(),
+ tGetColumnsReq.getTableName(),
+ tGetColumnsReq.getColumnName()));
+
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(sessionHandle, operationHandle,
TOperationType.GET_COLUMNS));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetColumns.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
@@ -545,7 +582,29 @@ public class HiveServer2Endpoint implements
TCLIService.Iface, SqlGatewayEndpoin
@Override
public TGetPrimaryKeysResp GetPrimaryKeys(TGetPrimaryKeysReq
tGetPrimaryKeysReq)
throws TException {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
+ TGetPrimaryKeysResp resp = new TGetPrimaryKeysResp();
+ try {
+ SessionHandle sessionHandle =
toSessionHandle(tGetPrimaryKeysReq.getSessionHandle());
+ OperationHandle operationHandle =
+ service.submitOperation(
+ sessionHandle,
+ createGetPrimaryKeys(
+ service,
+ sessionHandle,
+ tGetPrimaryKeysReq.getCatalogName(),
+ tGetPrimaryKeysReq.getSchemaName(),
+ tGetPrimaryKeysReq.getTableName()));
+
+ resp.setStatus(OK_STATUS);
+ resp.setOperationHandle(
+ toTOperationHandle(
+ // hive's implementation use "GET_FUNCTIONS" here
+ sessionHandle, operationHandle,
TOperationType.GET_FUNCTIONS));
+ } catch (Throwable t) {
+ LOG.error("Failed to GetPrimaryKeys.", t);
+ resp.setStatus(toTStatus(t));
+ }
+ return resp;
}
@Override
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
index d1879ef9062..31c76b2d212 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java
@@ -85,6 +85,77 @@ public class HiveServer2Schemas {
.withComment(
"The name which uniquely identifies this
function within its schema"));
+ /** Schema for {@link HiveServer2Endpoint#GetColumns}. */
+ public static final ResolvedSchema GET_COLUMNS_SCHEMA =
+ buildSchema(
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Catalog name. NULL if not
applicable."),
+ Column.physical("TABLE_SCHEM",
DataTypes.STRING()).withComment("Schema name."),
+ Column.physical("TABLE_NAME",
DataTypes.STRING()).withComment("Table name."),
+ Column.physical("COLUMN_NAME",
DataTypes.STRING()).withComment("Column name."),
+ Column.physical("DATA_TYPE", DataTypes.INT())
+ .withComment("SQL type from java.sql.Types."),
+ Column.physical("TYPE_NAME", DataTypes.STRING())
+ .withComment(
+ "Data source dependent type name, for a
UDT the type name is fully qualified."),
+ Column.physical("COLUMN_SIZE", DataTypes.INT())
+ .withComment(
+ "Column size. For char or date types this
is the maximum number of characters, for numeric or decimal types this is
precision."),
+ Column.physical("BUFFER_LENGTH",
DataTypes.TINYINT()).withComment("Unused."),
+ Column.physical("DECIMAL_DIGITS", DataTypes.INT())
+ .withComment("The number of fractional digits."),
+ Column.physical("NUM_PREC_RADIX", DataTypes.INT())
+ .withComment("Radix (typically either 10 or 2)."),
+ Column.physical("NULLABLE",
DataTypes.INT()).withComment("Is NULL allowed."),
+ Column.physical("REMARKS", DataTypes.STRING())
+ .withComment("Comment describing column (may be
null)."),
+ Column.physical("COLUMN_DEF", DataTypes.STRING())
+ .withComment("Default value (may be null)."),
+ Column.physical("SQL_DATA_TYPE",
DataTypes.INT()).withComment("Unused."),
+ Column.physical("SQL_DATETIME_SUB",
DataTypes.INT()).withComment("Unused."),
+ Column.physical("CHAR_OCTET_LENGTH", DataTypes.INT())
+ .withComment(
+ "For char types the maximum number of
bytes in the column."),
+ Column.physical("ORDINAL_POSITION", DataTypes.INT())
+ .withComment("Index of column in table (starting
at 1)."),
+ Column.physical("IS_NULLABLE", DataTypes.STRING())
+ .withComment(
+ "\"NO\" means column definitely does not
allow NULL values; \"YES\" means the column might allow NULL values. An empty
string means nobody knows."),
+ Column.physical("SCOPE_CATALOG", DataTypes.STRING())
+ .withComment(
+ "Catalog of table that is the scope of a
reference attribute (null if DATA_TYPE isn't REF)."),
+ Column.physical("SCOPE_SCHEMA", DataTypes.STRING())
+ .withComment(
+ "Schema of table that is the scope of a
reference attribute (null if the DATA_TYPE isn't REF)."),
+ Column.physical("SCOPE_TABLE", DataTypes.STRING())
+ .withComment(
+ "Table name that this the scope of a
reference attribute (null if the DATA_TYPE isn't REF)."),
+ Column.physical("SOURCE_DATA_TYPE", DataTypes.SMALLINT())
+ .withComment(
+ "Source type of a distinct type or
user-generated Ref type, SQL type from java.sql.Types (null if DATA_TYPE isn't
DISTINCT or user-generated REF)."),
+ Column.physical("IS_AUTO_INCREMENT", DataTypes.STRING())
+ .withComment("Indicates whether this column is
auto incremented."));
+
+ /** Schema for {@link HiveServer2Endpoint#GetTableTypes}. */
+ public static final ResolvedSchema GET_TABLE_TYPES_SCHEMA =
+ buildSchema(
+ Column.physical("TABLE_TYPE", DataTypes.STRING())
+ .withComment("Table type name."));
+
+ /** Schema for {@link HiveServer2Endpoint#GetPrimaryKeys}. */
+ public static final ResolvedSchema GET_PRIMARY_KEYS_SCHEMA =
+ buildSchema(
+ Column.physical("TABLE_CAT", DataTypes.STRING())
+ .withComment("Table catalog (may be null)."),
+ Column.physical("TABLE_SCHEM", DataTypes.STRING())
+ .withComment("Table schema (may be null)."),
+ Column.physical("TABLE_NAME",
DataTypes.STRING()).withComment("Table name."),
+ Column.physical("COLUMN_NAME",
DataTypes.STRING()).withComment("Column name."),
+ Column.physical("KEY_SEQ", DataTypes.INT())
+ .withComment("Sequence number within primary
key."),
+ Column.physical("PK_NAME", DataTypes.STRING())
+ .withComment("Primary key name (may be null)."));
+
/** Schema for {@link HiveServer2Endpoint#GetTypeInfo}. */
public static final ResolvedSchema GET_TYPE_INFO_SCHEMA =
buildSchema(
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
index d6397700e13..4052ac4dd53 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java
@@ -19,9 +19,13 @@
package org.apache.flink.table.endpoint.hive.util;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -39,27 +43,35 @@ import
org.apache.flink.table.gateway.api.results.FunctionInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.results.TableInfo;
import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde2.thrift.Type;
import javax.annotation.Nullable;
import java.sql.DatabaseMetaData;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_COLUMNS_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_FUNCTIONS_SCHEMA;
+import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_PRIMARY_KEYS_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLES_SCHEMA;
+import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TABLE_TYPES_SCHEMA;
import static
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_TYPE_INFO_SCHEMA;
import static
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
import static org.apache.hadoop.hive.serde2.thrift.Type.ARRAY_TYPE;
@@ -111,6 +123,37 @@ public class OperationExecutorFactory {
service, sessionHandle, catalogName, schemaName,
tableName, tableKinds);
}
+ public static Callable<ResultSet> createGetColumnsExecutor(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName,
+ @Nullable String columnsName) {
+ return () ->
+ executeGetColumns(
+ service, sessionHandle, catalogName, schemaName,
tableName, columnsName);
+ }
+
+ public static Callable<ResultSet> createGetTableTypesExecutor() {
+ return () ->
+ buildResultSet(
+ GET_TABLE_TYPES_SCHEMA,
+ Arrays.stream(TableKind.values())
+ .map(kind -> wrap(kind.name()))
+ .collect(Collectors.toList()));
+ }
+
+ public static Callable<ResultSet> createGetPrimaryKeys(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName) {
+ return () ->
+ executeGetPrimaryKeys(service, sessionHandle, catalogName,
schemaName, tableName);
+ }
+
public static Callable<ResultSet> createGetFunctionsExecutor(
SqlGatewayService service,
SessionHandle sessionHandle,
@@ -122,7 +165,7 @@ public class OperationExecutorFactory {
service, sessionHandle, catalogName, databasePattern,
functionNamePattern);
}
- public static Callable<ResultSet> createGetTableInfoExecutor() {
+ public static Callable<ResultSet> createGetTypeInfoExecutor() {
return () ->
buildResultSet(
GET_TYPE_INFO_SCHEMA,
@@ -175,7 +218,8 @@ public class OperationExecutorFactory {
String specifiedCatalogName =
isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
Set<String> databaseNames =
- filter(service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
+ filterAndSort(
+ service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
return buildResultSet(
GET_SCHEMAS_SCHEMA,
databaseNames.stream()
@@ -190,21 +234,30 @@ public class OperationExecutorFactory {
@Nullable String schemaName,
@Nullable String tableName,
Set<TableKind> tableKinds) {
- Set<TableInfo> tableInfos = new HashSet<>();
+ Set<TableInfo> tableInfos = new LinkedHashSet<>();
+ Set<TableInfo> viewInfos = new LinkedHashSet<>();
+
String specifiedCatalogName =
isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
for (String schema :
- filter(service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName)) {
- tableInfos.addAll(
- filter(
+ filterAndSort(
+ service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName)) {
+ for (TableInfo tableInfo :
+ filterAndSort(
service.listTables(
sessionHandle, specifiedCatalogName,
schema, tableKinds),
candidate ->
candidate.getIdentifier().getObjectName(),
- tableName));
+ tableName)) {
+ if (tableInfo.getTableKind().equals(TableKind.TABLE)) {
+ tableInfos.add(tableInfo);
+ } else {
+ viewInfos.add(tableInfo);
+ }
+ }
}
return buildResultSet(
GET_TABLES_SCHEMA,
- tableInfos.stream()
+ Stream.concat(tableInfos.stream(), viewInfos.stream())
.map(
info ->
wrap(
@@ -212,9 +265,9 @@ public class OperationExecutorFactory {
info.getIdentifier().getDatabaseName(),
info.getIdentifier().getObjectName(),
info.getTableKind().name(),
- // It requires to load the
CatalogFunction from the
+ // It requires to load the
CatalogTable from the
// remote server, which is
time wasted.
- "",
+ null,
null,
null,
null,
@@ -223,6 +276,127 @@ public class OperationExecutorFactory {
.collect(Collectors.toList()));
}
+ private static ResultSet executeGetColumns(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName,
+ @Nullable String columnName) {
+ String specifiedCatalogName =
+ isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
+ Set<String> schemaNames =
+ filterAndSort(
+ service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
+ Set<TableKind> tableKinds = new
HashSet<>(Arrays.asList(TableKind.values()));
+
+ List<RowData> results = new ArrayList<>();
+ for (String schema : schemaNames) {
+ Set<TableInfo> tableInfos =
+ filterAndSort(
+ service.listTables(
+ sessionHandle, specifiedCatalogName,
schema, tableKinds),
+ candidates ->
candidates.getIdentifier().getObjectName(),
+ tableName);
+
+ for (TableInfo tableInfo : tableInfos) {
+ ResolvedCatalogBaseTable<?> table =
+ service.getTable(sessionHandle,
tableInfo.getIdentifier());
+ List<Column> columns = table.getResolvedSchema().getColumns();
+
+ Set<String> matchedColumnNames =
+ filterAndSort(
+ new
HashSet<>(table.getResolvedSchema().getColumnNames()),
+ columnName);
+ for (int i = 0; i < columns.size(); i++) {
+ Column column = columns.get(i);
+ if (!matchedColumnNames.contains(column.getName())) {
+ continue;
+ }
+ Type hiveColumnType =
+
Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false));
+ LogicalType flinkColumnType =
column.getDataType().getLogicalType();
+ results.add(
+ wrap(
+ specifiedCatalogName,
+
tableInfo.getIdentifier().getDatabaseName(),
+ tableInfo.getIdentifier().getObjectName(),
+ column.getName(),
+ hiveColumnType.toJavaSQLType(),
+ hiveColumnType.getName(),
+ getColumnSize(flinkColumnType),
+ null, // BUFFER_LENGTH, unused
+ getDecimalDigits(flinkColumnType),
+ hiveColumnType.getNumPrecRadix(),
+ flinkColumnType.isNullable()
+ ? DatabaseMetaData.columnNullable
+ : DatabaseMetaData.columnNoNulls,
+ column.getComment().orElse(null),
+ null, // COLUMN_DEF
+ null, // SQL_DATA_TYPE
+ null, // SQL_DATETIME_SUB
+ null, // CHAR_OCTET_LENGTH
+ i + 1,
+ flinkColumnType.isNullable() ? "YES" :
"NO",
+ null, // SCOPE_CATALOG
+ null, // SCOPE_SCHEMA
+ null, // SCOPE_TABLE
+ null, // SOURCE_DATA_TYPE
+ "NO"));
+ }
+ }
+ }
+ return buildResultSet(GET_COLUMNS_SCHEMA, results);
+ }
+
+ private static ResultSet executeGetPrimaryKeys(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName) {
+ String specifiedCatalogName =
+ isNullOrEmpty(catalogName) ?
service.getCurrentCatalog(sessionHandle) : catalogName;
+ Set<String> schemaNames =
+ filterAndSort(
+ service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
+ List<RowData> primaryKeys = new ArrayList<>();
+
+ for (String schema : schemaNames) {
+ Set<TableInfo> tableInfos =
+ filterAndSort(
+ service.listTables(
+ sessionHandle,
+ specifiedCatalogName,
+ schema,
+ new
HashSet<>(Arrays.asList(TableKind.values()))),
+ candidate ->
candidate.getIdentifier().getObjectName(),
+ tableName);
+
+ for (TableInfo tableInfo : tableInfos) {
+ ResolvedCatalogBaseTable<?> table =
+ service.getTable(sessionHandle,
tableInfo.getIdentifier());
+ UniqueConstraint primaryKey =
+ table.getResolvedSchema().getPrimaryKey().orElse(null);
+ if (primaryKey == null) {
+ continue;
+ }
+
+ for (int i = 0; i < primaryKey.getColumns().size(); i++) {
+ primaryKeys.add(
+ wrap(
+ specifiedCatalogName,
+
tableInfo.getIdentifier().getDatabaseName(),
+ tableInfo.getIdentifier().getObjectName(),
+ primaryKey.getColumns().get(i),
+ i + 1,
+ primaryKey.getName()));
+ }
+ }
+ }
+ return buildResultSet(GET_PRIMARY_KEYS_SCHEMA, primaryKeys);
+ }
+
private static ResultSet executeGetFunctions(
SqlGatewayService service,
SessionHandle sessionHandle,
@@ -235,7 +409,7 @@ public class OperationExecutorFactory {
Set<FunctionInfo> candidates = new HashSet<>();
// Add user defined functions
for (String databaseName :
- filter(
+ filterAndSort(
service.listDatabases(sessionHandle,
specifiedCatalogName),
databasePattern)) {
candidates.addAll(
@@ -248,7 +422,7 @@ public class OperationExecutorFactory {
}
// Filter out unmatched functions
Set<FunctionInfo> matchedFunctions =
- filter(
+ filterAndSort(
candidates,
candidate ->
candidate.getIdentifier().getFunctionName(),
functionNamePattern);
@@ -302,18 +476,19 @@ public class OperationExecutorFactory {
return input == null || input.isEmpty();
}
- private static Set<String> filter(Set<String> candidates, @Nullable String
pattern) {
- return filter(candidates, Function.identity(), pattern);
+ private static Set<String> filterAndSort(Set<String> candidates, @Nullable
String pattern) {
+ return filterAndSort(candidates, Function.identity(), pattern);
}
- private static <T> Set<T> filter(
+ private static <T> Set<T> filterAndSort(
Set<T> candidates, Function<T, String> featureGetter, @Nullable
String pattern) {
Pattern compiledPattern = convertNamePattern(pattern);
return candidates.stream()
.filter(
candidate ->
compiledPattern.matcher(featureGetter.apply(candidate)).matches())
- .collect(Collectors.toSet());
+ .sorted(Comparator.comparing(v ->
featureGetter.apply(v).toLowerCase()))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
}
/**
@@ -431,4 +606,63 @@ public class OperationExecutorFactory {
return definition.getClass().getCanonicalName();
}
}
+
+ /**
+ * The column size for this type. For numeric data this is the maximum
precision. For character
+ * data this is the length in characters. For datetime types this is the
length in characters of
+ * the String representation (assuming the maximum allowed precision of
the fractional seconds
+ * component). For binary data this is the length in bytes. Null is
returned for data types
+ * where the column size is not applicable.
+ */
+ private static @Nullable Integer getColumnSize(LogicalType columnType) {
+ switch (columnType.getTypeRoot()) {
+ case TINYINT:
+ return 3;
+ case SMALLINT:
+ return 5;
+ case INTEGER:
+ case DATE:
+ return 10;
+ case BIGINT:
+ return 19;
+ case FLOAT:
+ return 7;
+ case DOUBLE:
+ return 15;
+ case DECIMAL:
+ return ((DecimalType) columnType).getScale();
+ case VARCHAR:
+ case BINARY:
+ return Integer.MAX_VALUE;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return 29;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * The number of fractional digits for this type. Null is returned for
data types where this is
+ * not applicable.
+ */
+ private static @Nullable Integer getDecimalDigits(LogicalType columnType) {
+ switch (columnType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ return 0;
+ case FLOAT:
+ return 7;
+ case DOUBLE:
+ return 15;
+ case DECIMAL:
+ return ((DecimalType) columnType).getScale();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return 9;
+ default:
+ return null;
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index ab02e770e36..4f474bf4254 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
@@ -44,7 +45,7 @@ import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
-import org.apache.hive.jdbc.JdbcColumn;
+import org.apache.hadoop.hive.serde2.thrift.Type;
import org.apache.hive.service.rpc.thrift.TCLIService;
import org.apache.hive.service.rpc.thrift.TCancelOperationReq;
import org.apache.hive.service.rpc.thrift.TCancelOperationResp;
@@ -68,16 +69,14 @@ import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
+import java.sql.Types;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -212,8 +211,8 @@ public class HiveServer2EndpointITCase extends TestLogger {
connection -> connection.getMetaData().getCatalogs(),
ResolvedSchema.of(Column.physical("TABLE_CAT",
DataTypes.STRING())),
Arrays.asList(
- Collections.singletonList("hive"),
- Collections.singletonList("default_catalog")));
+ Collections.singletonList("default_catalog"),
+ Collections.singletonList("hive")));
}
@Test
@@ -222,10 +221,10 @@ public class HiveServer2EndpointITCase extends TestLogger
{
connection ->
connection.getMetaData().getSchemas("default_catalog", null),
getExpectedGetSchemasOperationSchema(),
Arrays.asList(
- Arrays.asList("default_database", "default_catalog"),
+ Arrays.asList("db_diff", "default_catalog"),
Arrays.asList("db_test1", "default_catalog"),
Arrays.asList("db_test2", "default_catalog"),
- Arrays.asList("db_diff", "default_catalog")));
+ Arrays.asList("default_database", "default_catalog")));
}
@Test
@@ -251,16 +250,16 @@ public class HiveServer2EndpointITCase extends TestLogger
{
new String[] {"MANAGED_TABLE",
"VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_test1", "tbl_1",
"TABLE", ""),
- Arrays.asList("default_catalog", "db_test1", "tbl_2",
"TABLE", ""),
- Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_test2", "tbl_1",
"TABLE", ""),
- Arrays.asList("default_catalog", "db_test2", "diff_1",
"TABLE", ""),
- Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_test2", "diff_2",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_diff", "tbl_1",
"TABLE", ""),
- Arrays.asList("default_catalog", "db_diff", "tbl_2",
"VIEW", "")));
+ Arrays.asList("default_catalog", "db_diff", "tbl_1",
"TABLE"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_1",
"TABLE"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"TABLE"),
+ Arrays.asList("default_catalog", "db_test2", "diff_1",
"TABLE"),
+ Arrays.asList("default_catalog", "db_test2", "tbl_1",
"TABLE"),
+ Arrays.asList("default_catalog", "db_diff", "tbl_2",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test2", "diff_2",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW")));
}
@Test
@@ -276,9 +275,152 @@ public class HiveServer2EndpointITCase extends TestLogger
{
new String[] {"VIRTUAL_VIEW"}),
getExpectedGetTablesOperationSchema(),
Arrays.asList(
- Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW", ""),
- Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW", "")));
+ Arrays.asList("default_catalog", "db_test1", "tbl_3",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_4",
"VIEW"),
+ Arrays.asList("default_catalog", "db_test2", "tbl_2",
"VIEW")));
+ }
+
+ @Test
+ public void testGetTableTypes() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getTableTypes(),
+ ResolvedSchema.of(Column.physical("TABLE_TYPE",
DataTypes.STRING())),
+ Arrays.stream(TableKind.values())
+ .map(kind -> Collections.singletonList((Object)
kind.name()))
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ void testGetColumns() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getColumns(null, null,
null, null),
+ getExpectedGetColumnsOperationSchema(),
+ rows ->
+ assertThat(
+ rows.stream()
+ .map(
+ row ->
+ Arrays.asList(
+
row.get(0), // CATALOG NAME
+
row.get(1), // SCHEMA NAME
+
row.get(2), // TABLE NAME
+
row.get(3), // COLUMN NAME
+
row.get(5))) // TYPE NAME
+ .collect(Collectors.toList()))
+ .isEqualTo(
+ Arrays.asList(
+ Arrays.asList(
+ "default_catalog",
+ "db_diff",
+ "tbl_2",
+ "EXPR$0",
+ "INT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_1",
+ "user",
+ "BIGINT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_1",
+ "product",
+ "STRING"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_1",
+ "amount",
+ "INT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_2",
+ "user",
+ "STRING"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_2",
+ "id",
+ "BIGINT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_2",
+ "timestamp",
+ "TIMESTAMP"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_3",
+ "EXPR$0",
+ "INT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_4",
+ "EXPR$0",
+ "INT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test2",
+ "diff_2",
+ "EXPR$0",
+ "INT"),
+ Arrays.asList(
+ "default_catalog",
+ "db_test2",
+ "tbl_2",
+ "EXPR$0",
+ "INT"))));
+ }
+
+ @Test
+ public void testGetColumnsWithPattern() throws Exception {
+ runGetObjectTest(
+ connection ->
+ connection
+ .getMetaData()
+ .getColumns("default_catalog", "db\\_test_",
"tbl\\_1", "user"),
+ getExpectedGetColumnsOperationSchema(),
+ Collections.singletonList(
+ Arrays.asList(
+ "default_catalog",
+ "db_test1",
+ "tbl_1",
+ "user",
+ Types.BIGINT,
+ "BIGINT",
+ String.valueOf(Long.MAX_VALUE).length(),
+ 0, // digits number
+ 10, // radix
+ 0, // nullable
+ 1, // position
+ "NO", // isNullable
+ "NO"))); // isAutoIncrement
+ }
+
+ @Test
+ public void testGetPrimaryKey() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getPrimaryKeys(null,
null, null),
+ getExpectedGetPrimaryKeysOperationSchema(),
+ Arrays.asList(
+ Arrays.asList("default_catalog", "db_test1", "tbl_1",
"user", 1, "pk"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"user", 1, "pk"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"id", 2, "pk")));
+ }
+
+ @Test
+ public void testGetPrimaryKeyWithPattern() throws Exception {
+ runGetObjectTest(
+ connection -> connection.getMetaData().getPrimaryKeys(null,
null, "tbl_2"),
+ getExpectedGetPrimaryKeysOperationSchema(),
+ Arrays.asList(
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"user", 1, "pk"),
+ Arrays.asList("default_catalog", "db_test1", "tbl_2",
"id", 2, "pk")));
}
@Test
@@ -393,34 +535,42 @@ public class HiveServer2EndpointITCase extends TestLogger
{
private Connection getInitializedConnection() throws Exception {
Connection connection = ENDPOINT_EXTENSION.getConnection();
- Statement statement = connection.createStatement();
- statement.execute("SET table.sql-dialect=default");
- statement.execute("USE CATALOG `default_catalog`");
-
- // default_catalog: db_test1 | db_test2 | db_diff | default
- // db_test1: temporary table tbl_1, table tbl_2, temporary view
tbl_3, view tbl_4
- // db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
- // db_diff: table tbl_1, view tbl_2
-
- statement.execute("CREATE DATABASE db_test1");
- statement.execute("CREATE DATABASE db_test2");
- statement.execute("CREATE DATABASE db_diff");
-
- statement.execute("CREATE TEMPORARY TABLE db_test1.tbl_1 COMMENT
'temporary table tbl_1'");
- statement.execute("CREATE TABLE db_test1.tbl_2 COMMENT 'table tbl_2'");
- statement.execute(
- "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary view
tbl_3' AS SELECT 1");
- statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4' AS
SELECT 1");
-
- statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table tbl_1'");
- statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table
diff_1'");
- statement.execute("CREATE VIEW db_test2.tbl_2 COMMENT 'view tbl_2' AS
SELECT 1");
- statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view diff_2'
AS SELECT 1");
-
- statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table tbl_1'");
- statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2' AS
SELECT 1");
-
- statement.close();
+ try (Statement statement = connection.createStatement()) {
+ statement.execute("SET table.sql-dialect=default");
+ statement.execute("USE CATALOG `default_catalog`");
+
+ // default_catalog: db_test1 | db_test2 | db_diff | default
+ // db_test1: temporary table tbl_1, table tbl_2, temporary
view tbl_3, view tbl_4
+ // db_test2: table tbl_1, table diff_1, view tbl_2, view diff_2
+ // db_diff: table tbl_1, view tbl_2
+
+ statement.execute("CREATE DATABASE db_test1");
+ statement.execute("CREATE DATABASE db_test2");
+ statement.execute("CREATE DATABASE db_diff");
+
+ statement.execute(
+ "CREATE TEMPORARY TABLE db_test1.tbl_1(\n"
+ + "`user` BIGINT CONSTRAINT `pk` PRIMARY KEY
COMMENT 'user id.',\n"
+ + "`product` STRING NOT NULL,\n"
+ + "`amount` INT) COMMENT 'temporary table
tbl_1'");
+ statement.execute(
+ "CREATE TABLE db_test1.tbl_2(\n"
+ + "`user` STRING COMMENT 'user name.',\n"
+ + "`id` BIGINT COMMENT 'user id.',\n"
+ + "`timestamp` TIMESTAMP,"
+ + "CONSTRAINT `pk` PRIMARY KEY(`user`, `id`) NOT
ENFORCED) COMMENT 'table tbl_2'");
+ statement.execute(
+ "CREATE TEMPORARY VIEW db_test1.tbl_3 COMMENT 'temporary
view tbl_3' AS SELECT 1");
+ statement.execute("CREATE VIEW db_test1.tbl_4 COMMENT 'view tbl_4'
AS SELECT 1");
+
+ statement.execute("CREATE TABLE db_test2.tbl_1 COMMENT 'table
tbl_1'");
+ statement.execute("CREATE TABLE db_test2.diff_1 COMMENT 'table
diff_1'");
+ statement.execute("CREATE VIEW db_test2.tbl_2 COMMENT 'view tbl_2'
AS SELECT 1");
+ statement.execute("CREATE VIEW db_test2.diff_2 COMMENT 'view
diff_2' AS SELECT 1");
+
+ statement.execute("CREATE TABLE db_diff.tbl_1 COMMENT 'table
tbl_1'");
+ statement.execute("CREATE VIEW db_diff.tbl_2 COMMENT 'view tbl_2'
AS SELECT 1");
+ }
return connection;
}
@@ -432,13 +582,13 @@ public class HiveServer2EndpointITCase extends TestLogger
{
runGetObjectTest(
resultSetSupplier,
expectedSchema,
- result -> assertThat(result).isEqualTo(new
HashSet<>(expectedResults)));
+ result -> assertThat(result).isEqualTo(expectedResults));
}
private void runGetObjectTest(
FunctionWithException<Connection, java.sql.ResultSet, Exception>
resultSetSupplier,
ResolvedSchema expectedSchema,
- Consumer<Set<List<Object>>> validator)
+ Consumer<List<List<Object>>> validator)
throws Exception {
try (Connection connection = getInitializedConnection();
java.sql.ResultSet result =
resultSetSupplier.apply(connection)) {
@@ -506,6 +656,43 @@ public class HiveServer2EndpointITCase extends TestLogger {
Column.physical("REF_GENERATION", DataTypes.STRING()));
}
+ private ResolvedSchema getExpectedGetColumnsOperationSchema() {
+ return ResolvedSchema.of(
+ Column.physical("TABLE_CAT", DataTypes.STRING()),
+ Column.physical("TABLE_SCHEM", DataTypes.STRING()),
+ Column.physical("TABLE_NAME", DataTypes.STRING()),
+ Column.physical("COLUMN_NAME", DataTypes.STRING()),
+ Column.physical("DATA_TYPE", DataTypes.INT()),
+ Column.physical("TYPE_NAME", DataTypes.STRING()),
+ Column.physical("COLUMN_SIZE", DataTypes.INT()),
+ Column.physical("BUFFER_LENGTH", DataTypes.TINYINT()),
+ Column.physical("DECIMAL_DIGITS", DataTypes.INT()),
+ Column.physical("NUM_PREC_RADIX", DataTypes.INT()),
+ Column.physical("NULLABLE", DataTypes.INT()),
+ Column.physical("REMARKS", DataTypes.STRING()),
+ Column.physical("COLUMN_DEF", DataTypes.STRING()),
+ Column.physical("SQL_DATA_TYPE", DataTypes.INT()),
+ Column.physical("SQL_DATETIME_SUB", DataTypes.INT()),
+ Column.physical("CHAR_OCTET_LENGTH", DataTypes.INT()),
+ Column.physical("ORDINAL_POSITION", DataTypes.INT()),
+ Column.physical("IS_NULLABLE", DataTypes.STRING()),
+ Column.physical("SCOPE_CATALOG", DataTypes.STRING()),
+ Column.physical("SCOPE_SCHEMA", DataTypes.STRING()),
+ Column.physical("SCOPE_TABLE", DataTypes.STRING()),
+ Column.physical("SOURCE_DATA_TYPE", DataTypes.SMALLINT()),
+ Column.physical("IS_AUTO_INCREMENT", DataTypes.STRING()));
+ }
+
+ private ResolvedSchema getExpectedGetPrimaryKeysOperationSchema() {
+ return ResolvedSchema.of(
+ Column.physical("TABLE_CAT", DataTypes.STRING()),
+ Column.physical("TABLE_SCHEM", DataTypes.STRING()),
+ Column.physical("TABLE_NAME", DataTypes.STRING()),
+ Column.physical("COLUMN_NAME", DataTypes.STRING()),
+ Column.physical("KEY_SEQ", DataTypes.INT()),
+ Column.physical("PK_NAME", DataTypes.STRING()));
+ }
+
private ResolvedSchema getExpectedGetTypeInfoSchema() {
return ResolvedSchema.of(
Column.physical("TYPE_NAME", DataTypes.STRING()),
@@ -537,13 +724,13 @@ public class HiveServer2EndpointITCase extends TestLogger
{
.orElseThrow(() -> new RuntimeException("Can not
get column."));
assertThat(metaData.getColumnName(i)).isEqualTo(column.getName());
int jdbcType =
- JdbcColumn.hiveTypeToSqlType(
- HiveTypeUtil.toHiveTypeInfo(column.getDataType(),
false).getTypeName());
+
Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false))
+ .toJavaSQLType();
assertThat(metaData.getColumnType(i)).isEqualTo(jdbcType);
}
}
- private Set<List<Object>> collectAndCompact(java.sql.ResultSet result, int
columnCount)
+ private List<List<Object>> collectAndCompact(java.sql.ResultSet result,
int columnCount)
throws Exception {
List<List<Object>> actual = new ArrayList<>();
while (result.next()) {
@@ -558,6 +745,6 @@ public class HiveServer2EndpointITCase extends TestLogger {
}
actual.add(row);
}
- return new LinkedHashSet<>(actual);
+ return actual;
}
}
diff --git
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index f12c44250c4..9bdc9c751b7 100644
---
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.gateway.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -237,6 +239,17 @@ public interface SqlGatewayService {
Set<TableKind> tableKinds)
throws SqlGatewayException;
+ /**
+ * Return table of the given fully qualified name.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param tableIdentifier fully qualified name of the table.
+ * @return information of the table.
+ */
+ ResolvedCatalogBaseTable<?> getTable(
+ SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+ throws SqlGatewayException;
+
/**
* List all user defined functions.
*
diff --git
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 8508edbd23a..a9c6bd77cc7 100644
---
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.gateway.api.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -170,4 +172,11 @@ public class MockedSqlGatewayService implements
SqlGatewayService {
public GatewayInfo getGatewayInfo() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ResolvedCatalogBaseTable<?> getTable(
+ SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 8114fc5b79a..4bdbafe5ab7 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.gateway.service;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.functions.FunctionDefinition;
@@ -255,6 +257,18 @@ public class SqlGatewayServiceImpl implements
SqlGatewayService {
}
}
+ @Override
+ public ResolvedCatalogBaseTable<?> getTable(
+ SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
+ throws SqlGatewayException {
+ try {
+ return
getSession(sessionHandle).createExecutor().getTable(tableIdentifier);
+ } catch (Throwable t) {
+ LOG.error("Failed to getTable.", t);
+ throw new SqlGatewayException("Failed to getTable.", t);
+ }
+ }
+
@Override
public Set<FunctionInfo> listUserDefinedFunctions(
SessionHandle sessionHandle, String catalogName, String
databaseName)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index 87c4661f24d..569e838c3a2 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -28,6 +28,7 @@ import
org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.data.GenericRowData;
@@ -156,6 +157,13 @@ public class OperationExecutor {
}
}
+ public ResolvedCatalogBaseTable<?> getTable(ObjectIdentifier
tableIdentifier) {
+ return getTableEnvironment()
+ .getCatalogManager()
+ .getTableOrError(tableIdentifier)
+ .getResolvedTable();
+ }
+
public Set<FunctionInfo> listUserDefinedFunctions(String catalogName,
String databaseName) {
return sessionContext.getSessionState().functionCatalog
.getUserDefinedFunctions(catalogName, databaseName).stream()
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 6447771a33d..685bd04cb30 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -468,6 +470,22 @@ public class SqlGatewayServiceITCase extends
AbstractTestBase {
"table_func0"))));
}
+ @Test
+ public void testGetTable() {
+ SessionHandle sessionHandle = createInitializedSession();
+ ResolvedCatalogTable actualTable =
+ (ResolvedCatalogTable)
+ service.getTable(sessionHandle,
ObjectIdentifier.of("cat1", "db1", "tbl1"));
+
assertThat(actualTable.getResolvedSchema()).isEqualTo(ResolvedSchema.of());
+ assertThat(actualTable.getOptions())
+ .isEqualTo(Collections.singletonMap("connector", "values"));
+
+ ResolvedCatalogView actualView =
+ (ResolvedCatalogView)
+ service.getTable(sessionHandle,
ObjectIdentifier.of("cat1", "db1", "tbl3"));
+ assertThat(actualView.getOriginalQuery()).isEqualTo("SELECT 1");
+ }
+
//
--------------------------------------------------------------------------------------------
// Concurrent tests
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
index 27f40a353c4..840374eeaa6 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogBaseTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.catalog;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.TableSchema;
/**
@@ -25,6 +26,7 @@ import org.apache.flink.table.api.TableSchema;
*
* @param <T> {@link CatalogTable} or {@link CatalogView}
*/
+@PublicEvolving
public interface ResolvedCatalogBaseTable<T extends CatalogBaseTable> extends
CatalogBaseTable {
/**