fsk119 commented on code in PR #20401:
URL: https://github.com/apache/flink/pull/20401#discussion_r938387937


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -422,7 +426,76 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq 
tGetCatalogsReq) throws TExc
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            () -> {
+                                Set<String> schemaNames =
+                                        service
+                                                .listDatabases(
+                                                        sessionHandle,
+                                                        
tGetSchemasReq.getCatalogName())

Review Comment:
   tGetSchemasReq.getCatalogName() may be null. 
   If null, I think we can use the current catalog name.



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java:
##########
@@ -198,4 +198,15 @@ ResultSet fetchResults(
      * @return names of the registered catalogs.
      */
     Set<String> listCatalogs(SessionHandle sessionHandle) throws 
SqlGatewayException;
+
+    /**
+     * Return all available schemas' name matching {@param 
databaseNamePattern} in the given

Review Comment:
   We don't allow users to specify databaseNamePattern here.
   



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -422,7 +426,76 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq 
tGetCatalogsReq) throws TExc
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            () -> {
+                                Set<String> schemaNames =
+                                        service
+                                                .listDatabases(
+                                                        sessionHandle,
+                                                        
tGetSchemasReq.getCatalogName())
+                                                .stream()
+                                                .filter(
+                                                        name ->
+                                                                name.matches(
+                                                                        
convertNamePattern(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getSchemaName())))
+                                                .collect(Collectors.toSet());
+
+                                return new ResultSet(
+                                        EOS,
+                                        null,
+                                        GET_SCHEMAS_SCHEMA,
+                                        schemaNames.stream()
+                                                .map(
+                                                        name ->
+                                                                
GenericRowData.of(
+                                                                        
StringData.fromString(name),
+                                                                        
StringData.fromString(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getCatalogName())))
+                                                .collect(Collectors.toList()));
+                            });
+
+            // TODO: Remove this
+            while (!service.getOperationInfo(sessionHandle, operationHandle)
+                    .getStatus()
+                    .isTerminalStatus()) {
+                Thread.sleep(1000);
+            }
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(sessionHandle, operationHandle, 
OperationType.LIST_SCHEMAS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetSchemas.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
+    }
+
+    /**
+     * Convert wildchars and escape sequence of schema pattern from JDBC 
format to
+     * datanucleous/regex. The schema pattern treats empty string also as 
wildchar. implementation
+     * refers to hive's convertSchemaPattern.
+     */

Review Comment:
   ```
   /**
      * Convert SQL 'like' pattern to a Java regular expression.
      *
      * Underscores (_) are converted to '.' and percent signs (%) are 
converted to '.*'.
      *  
      * @param pattern the SQL pattern to convert
      * @return the equivalent Java regular expression of the pattern
      */
   ```



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -422,7 +426,76 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq 
tGetCatalogsReq) throws TExc
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            () -> {
+                                Set<String> schemaNames =
+                                        service
+                                                .listDatabases(
+                                                        sessionHandle,
+                                                        
tGetSchemasReq.getCatalogName())
+                                                .stream()
+                                                .filter(
+                                                        name ->
+                                                                name.matches(
+                                                                        
convertNamePattern(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getSchemaName())))
+                                                .collect(Collectors.toSet());
+
+                                return new ResultSet(
+                                        EOS,
+                                        null,
+                                        GET_SCHEMAS_SCHEMA,
+                                        schemaNames.stream()
+                                                .map(
+                                                        name ->
+                                                                
GenericRowData.of(
+                                                                        
StringData.fromString(name),
+                                                                        
StringData.fromString(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getCatalogName())))
+                                                .collect(Collectors.toList()));
+                            });
+
+            // TODO: Remove this
+            while (!service.getOperationInfo(sessionHandle, operationHandle)
+                    .getStatus()
+                    .isTerminalStatus()) {
+                Thread.sleep(1000);
+            }

Review Comment:
   Remove this



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/session/SessionEnvironment.java:
##########
@@ -41,6 +41,7 @@ public class SessionEnvironment {
     private final @Nullable String sessionName;
     private final EndpointVersion version;
     private final Map<String, Catalog> registeredCatalogs;
+

Review Comment:
   remove this change.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -422,7 +426,76 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq 
tGetCatalogsReq) throws TExc
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            () -> {
+                                Set<String> schemaNames =
+                                        service
+                                                .listDatabases(
+                                                        sessionHandle,
+                                                        
tGetSchemasReq.getCatalogName())
+                                                .stream()
+                                                .filter(
+                                                        name ->
+                                                                name.matches(
+                                                                        
convertNamePattern(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getSchemaName())))
+                                                .collect(Collectors.toSet());
+
+                                return new ResultSet(
+                                        EOS,
+                                        null,
+                                        GET_SCHEMAS_SCHEMA,
+                                        schemaNames.stream()
+                                                .map(
+                                                        name ->
+                                                                
GenericRowData.of(
+                                                                        
StringData.fromString(name),
+                                                                        
StringData.fromString(

Review Comment:
   It's better we can reuse or extend the `StringRowDataUtils`



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Schemas.java:
##########
@@ -35,4 +36,16 @@ public class HiveServer2Schemas {
                                     .withComment("Catalog name. NULL if not 
applicable.")),
                     Collections.emptyList(),
                     null);
+
+    /** Schema for {@link HiveServer2Endpoint#GetSchemas}. */
+    public static final ResolvedSchema GET_SCHEMAS_SCHEMA =
+            new ResolvedSchema(
+                    Collections.unmodifiableList(

Review Comment:
   I think we can remove Collections.unmodifiedList? It's trivial to wrap this 
again.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -211,6 +213,49 @@ public void testGetCatalogs() throws Exception {
         }
     }
 
+    @Test
+    public void testGetSchemas() throws Exception {
+        try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
+            connection.createStatement().execute("CREATE SCHEMA schema1");

Review Comment:
   reuse the same statement



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -316,6 +317,32 @@ public void testListCatalogs() {
         assertThat(service.listCatalogs(sessionHandle)).contains("cat1", 
"cat2");
     }
 
+    @Test
+    public void testListDatabases() throws Exception {
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .registerCatalog("cat", new 
GenericInMemoryCatalog("cat"))
+                        .build();

Review Comment:
   `.setDefaultCatalog("cat")`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -104,8 +105,15 @@ public Set<String> listCatalogs() {
         return getTableEnvironment().getCatalogManager().listCatalogs();
     }
 
+
+    public Set<String> listDatabases(String catalogName) {
+        return Collections.unmodifiableSet(
+                new 
HashSet<>(getTableEnvironment().getCatalogManager().listSchemas(catalogName)));
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
+

Review Comment:
   remove this



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -104,8 +105,15 @@ public Set<String> listCatalogs() {
         return getTableEnvironment().getCatalogManager().listCatalogs();
     }
 
+
+    public Set<String> listDatabases(String catalogName) {
+        return Collections.unmodifiableSet(
+                new 
HashSet<>(getTableEnvironment().getCatalogManager().listSchemas(catalogName)));

Review Comment:
   listSchema is different from the listDatabases. `listSchemas` also includes 
the temporary object here but `listDatabases` doesn't.  We should keep the 
meaning and the actual implementation is the same here.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -422,7 +426,76 @@ public TGetCatalogsResp GetCatalogs(TGetCatalogsReq 
tGetCatalogsReq) throws TExc
 
     @Override
     public TGetSchemasResp GetSchemas(TGetSchemasReq tGetSchemasReq) throws 
TException {
-        throw new UnsupportedOperationException(ERROR_MESSAGE);
+        TGetSchemasResp resp = new TGetSchemasResp();
+        try {
+            SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            OperationHandle operationHandle =
+                    service.submitOperation(
+                            sessionHandle,
+                            OperationType.LIST_SCHEMAS,
+                            () -> {
+                                Set<String> schemaNames =
+                                        service
+                                                .listDatabases(
+                                                        sessionHandle,
+                                                        
tGetSchemasReq.getCatalogName())
+                                                .stream()
+                                                .filter(
+                                                        name ->
+                                                                name.matches(
+                                                                        
convertNamePattern(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getSchemaName())))
+                                                .collect(Collectors.toSet());
+
+                                return new ResultSet(
+                                        EOS,
+                                        null,
+                                        GET_SCHEMAS_SCHEMA,
+                                        schemaNames.stream()
+                                                .map(
+                                                        name ->
+                                                                
GenericRowData.of(
+                                                                        
StringData.fromString(name),
+                                                                        
StringData.fromString(
+                                                                               
 tGetSchemasReq
+                                                                               
         .getCatalogName())))
+                                                .collect(Collectors.toList()));
+                            });
+
+            // TODO: Remove this
+            while (!service.getOperationInfo(sessionHandle, operationHandle)
+                    .getStatus()
+                    .isTerminalStatus()) {
+                Thread.sleep(1000);
+            }
+            resp.setStatus(OK_STATUS);
+            resp.setOperationHandle(
+                    toTOperationHandle(sessionHandle, operationHandle, 
OperationType.LIST_SCHEMAS));
+        } catch (Throwable t) {
+            LOG.error("Failed to GetSchemas.", t);
+            resp.setStatus(toTStatus(t));
+        }
+        return resp;
+    }
+
+    /**
+     * Convert wildchars and escape sequence of schema pattern from JDBC 
format to
+     * datanucleous/regex. The schema pattern treats empty string also as 
wildchar. implementation
+     * refers to hive's convertSchemaPattern.
+     */
+    private String convertNamePattern(String schemaName) {

Review Comment:
   rename schemaName to pattern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to