fsk119 commented on code in PR #20401:
URL: https://github.com/apache/flink/pull/20401#discussion_r938676333


##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java:
##########
@@ -429,46 +416,20 @@ public TGetSchemasResp GetSchemas(TGetSchemasReq 
tGetSchemasReq) throws TExcepti
         TGetSchemasResp resp = new TGetSchemasResp();
         try {
             SessionHandle sessionHandle = 
toSessionHandle(tGetSchemasReq.getSessionHandle());
+            String catalogName = tGetSchemasReq.getCatalogName();
+            if (catalogName == null || catalogName.equals("")) {
+                catalogName = service.getCurrentCatalog(sessionHandle);
+            }

Review Comment:
   Move into the executor. It should be executed when get the opertaion lock



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+
+/** Factory to create the operation executor. */
+public class OperationExecutorFactory {
+
+    public static Callable<ResultSet> createGetCatalogsExecutor(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        return () -> executeGetCatalogs(service, sessionHandle);
+    }
+
+    public static Callable<ResultSet> createGetSchemasExecutor(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            String catalogName,
+            String schemaName) {
+        return () -> executeGetSchemas(service, sessionHandle, catalogName, 
schemaName);
+    }
+
+    private static ResultSet executeGetCatalogs(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        Set<String> catalogNames = service.listCatalogs(sessionHandle);
+        return new ResultSet(
+                EOS,
+                null,
+                GET_CATALOGS_SCHEMA,
+                catalogNames.stream()
+                        .map(OperationExecutorFactory::packData)
+                        .collect(Collectors.toList()));
+    }
+
+    private static ResultSet executeGetSchemas(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            String catalogName,
+            String schemaName) {
+        Set<String> schemaNames =
+                filterStringSetBy(service.listDatabases(sessionHandle, 
catalogName), schemaName);
+
+        return new ResultSet(
+                EOS,
+                null,
+                GET_SCHEMAS_SCHEMA,
+                schemaNames.stream()
+                        .map(name -> packData(name, catalogName))
+                        .collect(Collectors.toList()));
+    }
+
+    // -------------------------------------
+    // useful methods
+    // -------------------------------------
+
+    /**
+     * Covert SQL 'like' pattern to a Java regular expression. Underscores (_) 
are converted to '.'
+     * and percent signs (%) are converted to '.*'. Note: escape characters 
are removed.
+     *
+     * @param pattern the SQL pattern to convert.
+     * @return the equivalent Java regular expression of the pattern.
+     */
+    private static String convertNamePattern(String pattern) {
+        if ((pattern == null) || pattern.isEmpty()) {
+            pattern = "%";
+        }
+        String wStr = ".*";
+        return pattern.replaceAll("([^\\\\])%", "$1" + wStr)
+                .replaceAll("\\\\%", "%")
+                .replaceAll("^%", wStr)
+                .replaceAll("([^\\\\])_", "$1.")
+                .replaceAll("\\\\_", "_")
+                .replaceAll("^_", ".");
+    }
+
+    public static Set<String> filterStringSetBy(Set<String> set, String 
pattern) {
+        return set.stream()
+                .filter(name -> name.matches(convertNamePattern(pattern)))
+                .collect(Collectors.toSet());

Review Comment:
   It's better to reuse the same `Pattern`. It's time wasted to recompile. 
Please cc 
   
   
https://stackoverflow.com/questions/2469244/whats-the-difference-between-string-matches-and-matcher-matches



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -216,44 +217,74 @@ public void testGetCatalogs() throws Exception {
     @Test
     public void testGetSchemas() throws Exception {
         try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
-            connection.createStatement().execute("CREATE SCHEMA schema1");
-            connection.createStatement().execute("CREATE SCHEMA schema2");
-            connection.createStatement().execute("CREATE SCHEMA different");
-
+            initDefaultTestConnection(connection);
             // test all
-            java.sql.ResultSet resultAll = 
connection.getMetaData().getSchemas("hive", null);
-            assertSchemaEquals(
-                    ResolvedSchema.of(
-                            Column.physical("TABLE_SCHEMA", 
DataTypes.STRING()),
-                            Column.physical("TABLE_CAT", DataTypes.STRING())),
-                    resultAll.getMetaData());
+            testGetSchemasAll(connection);
+            // test schema pattern parameter
+            testGetSchemasWithPattern(connection);
+        }
+    }
 
-            List<List<String>> actual = new ArrayList<>();
-            while (resultAll.next()) {
-                actual.add(Arrays.asList(resultAll.getString(1), 
resultAll.getString(2)));
-            }
-            assertThat(new HashSet<>(actual))
-                    .isEqualTo(
-                            new HashSet<>(
-                                    Arrays.asList(
-                                            Arrays.asList("default", "hive"),
-                                            Arrays.asList("schema1", "hive"),
-                                            Arrays.asList("schema2", "hive"),
-                                            Arrays.asList("different", 
"hive"))));
+    private void testGetSchemasAll(Connection connection) throws Exception {

Review Comment:
   I think we can resuse most of codes between `testGetSchemasAll` and 
`testGetSchemasWithPattern`



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -216,44 +217,74 @@ public void testGetCatalogs() throws Exception {
     @Test
     public void testGetSchemas() throws Exception {
         try (Connection connection = ENDPOINT_EXTENSION.getConnection()) {
-            connection.createStatement().execute("CREATE SCHEMA schema1");
-            connection.createStatement().execute("CREATE SCHEMA schema2");
-            connection.createStatement().execute("CREATE SCHEMA different");
-
+            initDefaultTestConnection(connection);
             // test all
-            java.sql.ResultSet resultAll = 
connection.getMetaData().getSchemas("hive", null);
-            assertSchemaEquals(
-                    ResolvedSchema.of(
-                            Column.physical("TABLE_SCHEMA", 
DataTypes.STRING()),
-                            Column.physical("TABLE_CAT", DataTypes.STRING())),
-                    resultAll.getMetaData());
+            testGetSchemasAll(connection);
+            // test schema pattern parameter
+            testGetSchemasWithPattern(connection);

Review Comment:
   It's better to split into two tests if they are irrelevant.



##########
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.endpoint.hive.util;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_CATALOGS_SCHEMA;
+import static 
org.apache.flink.table.endpoint.hive.HiveServer2Schemas.GET_SCHEMAS_SCHEMA;
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS;
+
+/** Factory to create the operation executor. */
+public class OperationExecutorFactory {
+
+    public static Callable<ResultSet> createGetCatalogsExecutor(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        return () -> executeGetCatalogs(service, sessionHandle);
+    }
+
+    public static Callable<ResultSet> createGetSchemasExecutor(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            String catalogName,
+            String schemaName) {
+        return () -> executeGetSchemas(service, sessionHandle, catalogName, 
schemaName);
+    }
+
+    private static ResultSet executeGetCatalogs(
+            SqlGatewayService service, SessionHandle sessionHandle) {
+        Set<String> catalogNames = service.listCatalogs(sessionHandle);
+        return new ResultSet(
+                EOS,
+                null,
+                GET_CATALOGS_SCHEMA,
+                catalogNames.stream()
+                        .map(OperationExecutorFactory::packData)
+                        .collect(Collectors.toList()));
+    }
+
+    private static ResultSet executeGetSchemas(
+            SqlGatewayService service,
+            SessionHandle sessionHandle,
+            String catalogName,
+            String schemaName) {
+        Set<String> schemaNames =
+                filterStringSetBy(service.listDatabases(sessionHandle, 
catalogName), schemaName);
+
+        return new ResultSet(
+                EOS,
+                null,
+                GET_SCHEMAS_SCHEMA,
+                schemaNames.stream()
+                        .map(name -> packData(name, catalogName))
+                        .collect(Collectors.toList()));
+    }
+
+    // -------------------------------------
+    // useful methods
+    // -------------------------------------
+
+    /**
+     * Covert SQL 'like' pattern to a Java regular expression. Underscores (_) 
are converted to '.'
+     * and percent signs (%) are converted to '.*'. Note: escape characters 
are removed.
+     *
+     * @param pattern the SQL pattern to convert.
+     * @return the equivalent Java regular expression of the pattern.
+     */
+    private static String convertNamePattern(String pattern) {
+        if ((pattern == null) || pattern.isEmpty()) {
+            pattern = "%";
+        }
+        String wStr = ".*";
+        return pattern.replaceAll("([^\\\\])%", "$1" + wStr)
+                .replaceAll("\\\\%", "%")
+                .replaceAll("^%", wStr)
+                .replaceAll("([^\\\\])_", "$1.")
+                .replaceAll("\\\\_", "_")
+                .replaceAll("^_", ".");
+    }
+
+    public static Set<String> filterStringSetBy(Set<String> set, String 
pattern) {
+        return set.stream()
+                .filter(name -> name.matches(convertNamePattern(pattern)))
+                .collect(Collectors.toSet());
+    }
+
+    private static GenericRowData packData(List<Object> data) {

Review Comment:
   It seems `packData(List<Object> data)` and `packData(Object... data)` are 
the same. We can combine into one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to