libenchao commented on code in PR #22233:
URL: https://github.com/apache/flink/pull/22233#discussion_r1148345880


##########
flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkDriverTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static org.apache.flink.table.jdbc.DriverInfo.DRIVER_NAME;
+import static org.apache.flink.table.jdbc.DriverInfo.DRIVER_VERSION_MAJOR;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for {@link FlinkDriver}. */
+public class FlinkDriverTest {
+    @Test
+    public void testDriverInfo() {
+        assertEquals(DRIVER_VERSION_MAJOR, 1);
+        assertEquals(DRIVER_NAME, "Flink JDBC Driver");
+    }
+
+    @Test
+    public void testDriverUri() throws Exception {
+        String uri =
+                
"jdbc:flink://localhost:8888/catalog_name/database_name?sessionId=123&key1=val1&key2=val2";

Review Comment:
   Could you add more tests for the invalid uri exception messages?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Parse catalog, table and connection properties from uri for {@link 
FlinkDriver}. */
+public class DriverUri {
+    private static final String URL_PREFIX = "jdbc:";
+    private static final String URL_START = URL_PREFIX + "flink:";
+
+    private static final Splitter URL_ARG_SPLITTER = 
Splitter.on('&').omitEmptyStrings();
+    private static final Splitter ARG_VALUE_SPLITTER = 
Splitter.on('=').limit(2);
+    private final String host;
+    private final int port;
+    private final URI uri;
+
+    private final Properties properties;
+
+    private Optional<String> catalog = Optional.empty();
+    private Optional<String> database = Optional.empty();
+
+    private DriverUri(String url, Properties driverProperties) throws 
SQLException {
+        this(parseDriverUrl(url), driverProperties);
+    }
+
+    private DriverUri(URI uri, Properties driverProperties) throws 
SQLException {
+        this.uri = checkNotNull(uri, "uri is null");
+        this.host = uri.getHost();
+        this.port = uri.getPort();
+        this.properties = mergeDynamicProperties(uri, driverProperties);
+
+        initCatalogAndSchema();
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public Optional<String> getCatalog() {
+        return catalog;
+    }
+
+    public Optional<String> getDatabase() {
+        return database;
+    }
+
+    private void initCatalogAndSchema() throws SQLException {
+        String path = uri.getPath();
+        if (StringUtils.isNullOrWhitespaceOnly(uri.getPath()) || 
path.equals("/")) {
+            return;
+        }
+
+        // remove first slash
+        if (!path.startsWith("/")) {
+            throw new SQLException("Path in uri does not start with a slash: " 
+ uri);
+        }
+        path = path.substring(1);
+
+        List<String> parts = Splitter.on("/").splitToList(path);
+        // remove last item due to a trailing slash
+        if (parts.get(parts.size() - 1).isEmpty()) {
+            parts = parts.subList(0, parts.size() - 1);
+        }
+
+        if (parts.size() > 2) {
+            throw new SQLException("Invalid path segments in URL: " + uri);
+        }
+
+        if (parts.get(0).isEmpty()) {
+            throw new SQLException("Catalog name in URL is empty: " + uri);
+        }
+
+        catalog = Optional.ofNullable(parts.get(0));
+
+        if (parts.size() > 1) {
+            if (parts.get(1).isEmpty()) {
+                throw new SQLException("Database name in URL is empty: " + 
uri);
+            }
+
+            database = Optional.ofNullable(parts.get(1));
+        }
+    }
+
+    private static Properties mergeDynamicProperties(URI uri, Properties 
driverProperties)
+            throws SQLException {
+        Map<String, String> urlProperties = parseUriParameters(uri.getQuery());
+        Map<String, String> suppliedProperties = 
Maps.fromProperties(driverProperties);
+
+        for (String key : urlProperties.keySet()) {
+            if (suppliedProperties.containsKey(key)) {
+                throw new SQLException(
+                        format("Connection property '%s' is both in the URL 
and an argument", key));
+            }
+        }
+
+        Properties result = new Properties();
+        setMapToProperties(result, urlProperties);
+        setMapToProperties(result, suppliedProperties);
+        return result;
+    }
+
+    private static void setMapToProperties(Properties properties, Map<String, 
String> values) {
+        for (Map.Entry<String, String> entry : values.entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private static Map<String, String> parseUriParameters(String query) throws 
SQLException {
+        Map<String, String> result = new HashMap<>();
+
+        if (query != null) {
+            Iterable<String> queryArgs = URL_ARG_SPLITTER.split(query);
+            for (String queryArg : queryArgs) {
+                List<String> parts = ARG_VALUE_SPLITTER.splitToList(queryArg);
+                if (parts.size() != 2) {
+                    throw new SQLException(
+                            format(
+                                    "Connection property in uri must be 
key=val format: '%s'",
+                                    queryArg));
+                }
+                if (result.put(parts.get(0), parts.get(1)) != null) {
+                    throw new SQLException(
+                            format(
+                                    "Connection property '%s' is in URL 
multiple times",
+                                    parts.get(0)));
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private static URI parseDriverUrl(String url) throws SQLException {
+        if (!url.startsWith(URL_START)) {
+            throw new SQLException("Invalid Flink JDBC URL: " + url);

Review Comment:
   Should we add more messages to indicate why it's invalid? same to below 
exception messages.



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;

Review Comment:
   Then we'd better include the shaded-guava to `flink-sql-jdbc-driver`?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverUri.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.base.Splitter;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Maps;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Parse catalog, table and connection properties from uri for {@link 
FlinkDriver}. */
+public class DriverUri {
+    private static final String URL_PREFIX = "jdbc:";
+    private static final String URL_START = URL_PREFIX + "flink:";
+
+    private static final Splitter URL_ARG_SPLITTER = 
Splitter.on('&').omitEmptyStrings();
+    private static final Splitter ARG_VALUE_SPLITTER = 
Splitter.on('=').limit(2);
+    private final String host;
+    private final int port;
+    private final URI uri;
+
+    private final Properties properties;
+
+    private Optional<String> catalog = Optional.empty();
+    private Optional<String> database = Optional.empty();
+
+    private DriverUri(String url, Properties driverProperties) throws 
SQLException {
+        this(parseDriverUrl(url), driverProperties);
+    }
+
+    private DriverUri(URI uri, Properties driverProperties) throws 
SQLException {
+        this.uri = checkNotNull(uri, "uri is null");
+        this.host = uri.getHost();
+        this.port = uri.getPort();
+        this.properties = mergeDynamicProperties(uri, driverProperties);
+
+        initCatalogAndSchema();
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public Optional<String> getCatalog() {
+        return catalog;
+    }
+
+    public Optional<String> getDatabase() {
+        return database;
+    }
+
+    private void initCatalogAndSchema() throws SQLException {
+        String path = uri.getPath();
+        if (StringUtils.isNullOrWhitespaceOnly(uri.getPath()) || 
path.equals("/")) {
+            return;
+        }
+
+        // remove first slash
+        if (!path.startsWith("/")) {
+            throw new SQLException("Path in uri does not start with a slash: " 
+ uri);
+        }
+        path = path.substring(1);
+
+        List<String> parts = Splitter.on("/").splitToList(path);
+        // remove last item due to a trailing slash
+        if (parts.get(parts.size() - 1).isEmpty()) {
+            parts = parts.subList(0, parts.size() - 1);
+        }
+
+        if (parts.size() > 2) {
+            throw new SQLException("Invalid path segments in URL: " + uri);
+        }
+
+        if (parts.get(0).isEmpty()) {
+            throw new SQLException("Catalog name in URL is empty: " + uri);
+        }
+
+        catalog = Optional.ofNullable(parts.get(0));
+
+        if (parts.size() > 1) {
+            if (parts.get(1).isEmpty()) {
+                throw new SQLException("Database name in URL is empty: " + 
uri);
+            }
+
+            database = Optional.ofNullable(parts.get(1));
+        }
+    }
+
+    private static Properties mergeDynamicProperties(URI uri, Properties 
driverProperties)
+            throws SQLException {
+        Map<String, String> urlProperties = parseUriParameters(uri.getQuery());
+        Map<String, String> suppliedProperties = 
Maps.fromProperties(driverProperties);
+
+        for (String key : urlProperties.keySet()) {
+            if (suppliedProperties.containsKey(key)) {
+                throw new SQLException(
+                        format("Connection property '%s' is both in the URL 
and an argument", key));
+            }
+        }
+
+        Properties result = new Properties();
+        setMapToProperties(result, urlProperties);
+        setMapToProperties(result, suppliedProperties);
+        return result;
+    }
+
+    private static void setMapToProperties(Properties properties, Map<String, 
String> values) {

Review Comment:
   how about name it `add[Map]ToProperties`?



##########
flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/DriverInfo.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc;
+
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URL;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.lang.Integer.parseInt;
+import static org.apache.flink.util.Preconditions.checkNotNull;

Review Comment:
   If you are gonna to import `flink-core` dependency, then we will need to 
bundle `flink-core` to `flink-sql-jdbc-driver` finally. IMO, we'd better to 
keep the dependency as simple as possible, what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to