fsk119 commented on code in PR #20438:
URL: https://github.com/apache/flink/pull/20438#discussion_r939540914
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -617,4 +623,29 @@ private static List<String> enroll(Throwable ex,
StackTraceElement[] trace, int
}
return details;
}
+
+ public static Set<TableKind> mapToFlinkTableKinds(List<String> tableTypes)
Review Comment:
If the parameter is nullable, please add the annotation to notify the users.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java:
##########
@@ -51,6 +56,18 @@ public static Callable<ResultSet> createGetSchemasExecutor(
return () -> executeGetSchemas(service, sessionHandle, catalogName,
schemaName);
}
+ public static Callable<ResultSet> createGetTablesExecutor(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName,
+ Set<TableKind> tableKinds) {
Review Comment:
add `@Nullable`
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -124,6 +130,57 @@ public Set<String> listDatabases(String catalogName) {
.listDatabases());
}
+ public Set<TableInfo> listTables(
+ String catalogName, String databaseName, Set<TableKind>
tableKinds) {
+ if (tableKinds.contains(TableKind.TABLE) &&
tableKinds.contains(TableKind.VIEW)) {
Review Comment:
We should check the argument `tableKinds` contains `TABLE`, `VIEW` or
`TABLE` and `VIEW`.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java:
##########
@@ -47,4 +47,23 @@ public class HiveServer2Schemas {
.withComment("Catalog name. NULL if not
applicable")),
Collections.emptyList(),
null);
+
+ /** Schema for {@link HiveServer2Endpoint#GetTables}. */
+ public static final ResolvedSchema GET_TABLES_SCHEMA =
+ new ResolvedSchema(
+ Collections.unmodifiableList(
+ Arrays.asList(
+ Column.physical("TABLE_CAT",
DataTypes.STRING())
+ .withComment("Catalog name. NULL
if not applicable."),
+ Column.physical("TABLE_SCHEMA",
DataTypes.STRING())
+ .withComment("Schema name. NULL if
not applicable."),
+ Column.physical("TABLE_NAME",
DataTypes.STRING())
+ .withComment("Table name. NULL if
not applicable."),
+ Column.physical("TABLE_TYPE",
DataTypes.STRING())
+ .withComment("Table type, e.g.
TABLE, VIEW."),
+ Column.physical("REMARKS",
DataTypes.STRING())
+ .withComment(
+ "Table description.
Currently Empty considering performance."))),
Review Comment:
Don't modify the Hive Operation Schema comment. We should not expose inner
implementation details to users.
BTW, it seems we miss some columns in the schema.
##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/TableInfo.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Objects;
+
+/** Information of the {@code Table}. */
+@PublicEvolving
+public class TableInfo {
+ private final boolean isTemporary;
Review Comment:
I think we can expose this field when needed. Currently, HiveServer2
Endpoint doesn't require this.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -617,4 +623,29 @@ private static List<String> enroll(Throwable ex,
StackTraceElement[] trace, int
}
return details;
}
+
+ public static Set<TableKind> mapToFlinkTableKinds(List<String> tableTypes)
+ throws UnsupportedOperationException {
Review Comment:
We don't mark the method that throws UnsupportedOperationException. Because
the UnsupportedOperationException is RuntimeException that is an unchecked
exception. Just as the java doc of the RuntimeException states:
```
Unchecked exceptions do <em>not</em> need to be
declared in a method or constructor's {@code throws} clause if they
can be thrown by the execution of the method or constructor and
propagate outside the method or constructor boundary.
```
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -617,4 +623,29 @@ private static List<String> enroll(Throwable ex,
StackTraceElement[] trace, int
}
return details;
}
+
+ public static Set<TableKind> mapToFlinkTableKinds(List<String> tableTypes)
+ throws UnsupportedOperationException {
+ Set<TableKind> tableKinds = new HashSet<>();
+
+ if (tableTypes == null || tableTypes.isEmpty()) {
+ tableKinds.add(TableKind.TABLE);
+ tableKinds.add(TableKind.VIEW);
+ return tableKinds;
+ }
+
+ if (tableTypes.contains(TableType.MATERIALIZED_VIEW.name())) {
+ throw new UnsupportedOperationException(
+ "Table type 'MATERIALIZED_VIEW' not supported currently.");
+ }
+
+ if (tableTypes.contains(TableType.MANAGED_TABLE.name())
+ || tableTypes.contains(TableType.EXTERNAL_TABLE.name())) {
+ tableKinds.add(TableKind.TABLE);
+ }
+ if (tableTypes.contains(TableType.VIRTUAL_VIEW.name())) {
+ tableKinds.add(TableKind.VIEW);
+ }
Review Comment:
I think it's not very clear here because the user can't get straightforward
mappings between the Flink Table Type and Hive Table Type. If hive extends its
TableType, we may miss the mapping.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java:
##########
@@ -77,7 +94,10 @@ private static ResultSet executeGetSchemas(
? service.getCurrentCatalog(sessionHandle)
: catalogName;
Set<String> databaseNames =
- filter(service.listDatabases(sessionHandle,
specifiedCatalogName), schemaName);
+ filter(
+ service.listDatabases(sessionHandle,
specifiedCatalogName),
+ Function.identity(),
Review Comment:
I think it's better to add a method
```
private static Set<String> filter(Set<String> candidates, @Nullable String
pattern) {
return filter(candidates, Function.identity(), pattern);
}
```
##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/TableInfo.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Objects;
+
+/** Information of the {@code Table}. */
+@PublicEvolving
+public class TableInfo {
Review Comment:
It's better to add toString for the POJO, which is user friendly for users.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -617,4 +623,29 @@ private static List<String> enroll(Throwable ex,
StackTraceElement[] trace, int
}
return details;
}
+
+ public static Set<TableKind> mapToFlinkTableKinds(List<String> tableTypes)
+ throws UnsupportedOperationException {
+ Set<TableKind> tableKinds = new HashSet<>();
+
+ if (tableTypes == null || tableTypes.isEmpty()) {
+ tableKinds.add(TableKind.TABLE);
+ tableKinds.add(TableKind.VIEW);
+ return tableKinds;
Review Comment:
It's better to use
```
tableKinds.addAll(Arrays.asList(TableKind.values()));
```
In the future, if others add more kinds , we don't modify here.
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java:
##########
@@ -87,14 +107,60 @@ private static ResultSet executeGetSchemas(
.collect(Collectors.toList()));
}
+ private static ResultSet executeGetTables(
+ SqlGatewayService service,
+ SessionHandle sessionHandle,
+ @Nullable String catalogName,
+ @Nullable String schemaName,
+ @Nullable String tableName,
+ Set<TableKind> tableKinds) {
Review Comment:
ditto
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -347,17 +364,72 @@ public void testListDatabases() throws Exception {
}
@Test
- public void testGetCurrentCatalog() {
+ public void testListTables() throws Exception {
SessionEnvironment environment =
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.registerCatalog("cat1", new
GenericInMemoryCatalog("cat1"))
.registerCatalog("cat2", new
GenericInMemoryCatalog("cat2"))
- .setDefaultCatalog("cat2")
.build();
SessionHandle sessionHandle = service.openSession(environment);
+ Configuration configuration =
+ Configuration.fromMap(service.getSessionConfig(sessionHandle));
- assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
+ // catalogs: cat1 | cat2
+ // cat1: db1 | db2
+ // db1: temporary table tb1, table tb2, temporary view tb3,
view tb4
+ // db2: table tb1, view tb2
+ // cat2 db0
+ // db0: table tb0
+ service.executeStatement(sessionHandle, "CREATE DATABASE cat1.db1",
-1, configuration);
+ service.executeStatement(
+ sessionHandle,
+ "CREATE TEMPORARY TABLE cat1.db1.tb1 WITH('connector' =
'values')",
+ -1,
+ configuration);
+ service.executeStatement(
+ sessionHandle,
+ "CREATE TABLE cat1.db1.tb2 WITH('connector' = 'values')",
+ -1,
+ configuration);
+ service.executeStatement(
+ sessionHandle, "CREATE TEMPORARY VIEW cat1.db1.tb3 AS SELECT
1", -1, configuration);
+ service.executeStatement(
+ sessionHandle, "CREATE VIEW cat1.db1.tb4 AS SELECT 1", -1,
configuration);
+
+ service.executeStatement(sessionHandle, "CREATE DATABASE cat1.db2",
-1, configuration);
+ service.executeStatement(sessionHandle, "CREATE TABLE cat1.db2.tb1",
-1, configuration);
Review Comment:
People prefer to use `tbl` to represent the `Table` rather than `tb`
##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java:
##########
@@ -617,4 +623,29 @@ private static List<String> enroll(Throwable ex,
StackTraceElement[] trace, int
}
return details;
}
+
+ public static Set<TableKind> mapToFlinkTableKinds(List<String> tableTypes)
Review Comment:
Rename to `toFlinkTableKinds`. Please align the method name to other methods
in the current class.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -347,17 +364,72 @@ public void testListDatabases() throws Exception {
}
@Test
- public void testGetCurrentCatalog() {
+ public void testListTables() throws Exception {
SessionEnvironment environment =
SessionEnvironment.newBuilder()
.setSessionEndpointVersion(MockedEndpointVersion.V1)
.registerCatalog("cat1", new
GenericInMemoryCatalog("cat1"))
.registerCatalog("cat2", new
GenericInMemoryCatalog("cat2"))
- .setDefaultCatalog("cat2")
.build();
SessionHandle sessionHandle = service.openSession(environment);
+ Configuration configuration =
+ Configuration.fromMap(service.getSessionConfig(sessionHandle));
- assertThat(service.getCurrentCatalog(sessionHandle)).isEqualTo("cat2");
+ // catalogs: cat1 | cat2
+ // cat1: db1 | db2
+ // db1: temporary table tb1, table tb2, temporary view tb3,
view tb4
+ // db2: table tb1, view tb2
+ // cat2 db0
+ // db0: table tb0
+ service.executeStatement(sessionHandle, "CREATE DATABASE cat1.db1",
-1, configuration);
+ service.executeStatement(
+ sessionHandle,
+ "CREATE TEMPORARY TABLE cat1.db1.tb1 WITH('connector' =
'values')",
+ -1,
+ configuration);
Review Comment:
Be careful. SqlGatewayService submits the statement in async mode. It's
possible the execution order is different from the submission order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]