Airblader commented on a change in pull request #17264:
URL: https://github.com/apache/flink/pull/17264#discussion_r709807987



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -31,8 +31,13 @@
 
 import static java.lang.String.format;
 
-/** Handle the SQL dialect of jdbc driver. */
-@Internal
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system. 
Dialects should be immutable
+ * and stateless.
+ *
+ * @see JdbcDialectFactory
+ */
+@PublicEvolving
 public interface JdbcDialect extends Serializable {

Review comment:
       Since we're making this a public API now it would be good to go through 
all the JavaDocs in here and bring them a bit into shape.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create different JdbcDialect's. This factory is used with 
Java's Service Provider

Review comment:
       ```suggestion
    * A factory to create different {@link JdbcDialect JdbcDialects}. This 
factory is used with Java's Service Provider
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link MySQLDialect}. */
+public class MySQLDialectFactory implements JdbcDialectFactory {

Review comment:
       Add `@Internal`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/derby/DerbyDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.derby;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link DerbyDialect}. */
+public class DerbyDialectFactory implements JdbcDialectFactory {

Review comment:
       Add `@Internal`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,60 +18,65 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 
 import java.util.List;
 
-abstract class AbstractDialect implements JdbcDialect {
+/** Base class that handles common validation of JDBC data types. */
+@PublicEvolving

Review comment:
       Should we really make this public already? If so, what is the contract 
that we guarantee here (other than `JdbcDialect`'s)? This deserves better 
documentation, I think.
   
   I'm also not sure the design of this class is great yet. For example 
implementers are forced to specifiy min/max precisions, maybe that should be 
optional? Also, shouldn't an implementation list supported types rather than 
unsupported ones?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,27 +86,37 @@ default void validate(TableSchema schema) throws 
ValidationException {}
     }
 
     /**
-     * Quotes the identifier. This is used to put quotes around the identifier 
in case the column
-     * name is a reserved keyword, or in case it contains characters that 
require quotes (e.g.
-     * space). Default using double quotes {@code "} to quote.
+     * Quotes the identifier.
+     *
+     * <p>Used to put quotes around the identifier if the column name is a 
reserved keyword or
+     * contains characters requiring quotes (e.g., space). Default using 
double quotes {@code "} to
+     * quote.
+     *
+     * @return the quoted identifier.
      */
     default String quoteIdentifier(String identifier) {
         return "\"" + identifier + "\"";
     }
 
     /**
-     * Get dialect upsert statement, the database has its own upsert syntax, 
such as Mysql using
-     * DUPLICATE KEY UPDATE, and PostgreSQL using ON CONFLICT... DO UPDATE 
SET..
+     * Constructs the dialects upsert statement if supported; such as MySQL's 
{@code DUPLICATE KEY
+     * UPDATE}, or PostgreSQLs {@code ON CONFLICT... DO UPDATE SET..}.
+     *
+     * <p>If the dialect does not support native upsert statements, the writer 
will fallback to
+     * {@code SELECT} + {@code Update}/{@code INSERT} which may have poor 
performance.
      *
-     * @return None if dialect does not support upsert statement, the writer 
will degrade to the use
-     *     of select + update/insert, this performance is poor.
+     * @return The upsert statement if supported, otherwise None.
      */
     default Optional<String> getUpsertStatement(
             String tableName, String[] fieldNames, String[] uniqueKeyFields) {

Review comment:
       Given the performance impact if this is left unimplemented, is it maybe 
worth not default-implementing it to force implementations to make a conscious 
decision?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create different JdbcDialect's. This factory is used with 
Java's Service Provider
+ * Interfaces (SPI) for discovering.
+ *
+ * <p>Classes that implement this interface can be added to the
+ * 
"META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory" 
file of a JAR file
+ * in the current classpath to be found.
+ *
+ * @see JdbcDialect
+ */
+@PublicEvolving
+public interface JdbcDialectFactory {
+
+    /**
+     * Check if this dialect instance can handle a certain jdbc url.
+     *
+     * @param url the jdbc url.
+     * @return True if the dialect can be applied on the given jdbc url.
+     */
+    boolean canHandle(String url);

Review comment:
       Maybe we can rename this to `acceptsURL` to be in line with 
`java.sql.Driver`?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,27 +86,37 @@ default void validate(TableSchema schema) throws 
ValidationException {}
     }
 
     /**
-     * Quotes the identifier. This is used to put quotes around the identifier 
in case the column
-     * name is a reserved keyword, or in case it contains characters that 
require quotes (e.g.
-     * space). Default using double quotes {@code "} to quote.
+     * Quotes the identifier.
+     *
+     * <p>Used to put quotes around the identifier if the column name is a 
reserved keyword or
+     * contains characters requiring quotes (e.g., space). Default using 
double quotes {@code "} to
+     * quote.
+     *
+     * @return the quoted identifier.
      */
     default String quoteIdentifier(String identifier) {
         return "\"" + identifier + "\"";

Review comment:
       The default implementation seems broken to me because it doesn't escape 
the identifier. That being said, should we really offer a default 
implementation for this?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java
##########
@@ -18,23 +18,25 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import java.util.Arrays;
-import java.util.List;
+import org.apache.flink.annotation.Internal;
+
 import java.util.Optional;
+import java.util.ServiceLoader;
 
 /** Default JDBC dialects. */
+@Internal
 public final class JdbcDialects {
 
-    private static final List<JdbcDialect> DIALECTS =
-            Arrays.asList(new DerbyDialect(), new MySQLDialect(), new 
PostgresDialect());
-
     /** Fetch the JdbcDialect class corresponding to a given database url. */
     public static Optional<JdbcDialect> get(String url) {
-        for (JdbcDialect dialect : DIALECTS) {
-            if (dialect.canHandle(url)) {
-                return Optional.of(dialect);
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+        for (JdbcDialectFactory factory : 
ServiceLoader.load(JdbcDialectFactory.class, cl)) {
+            if (factory.canHandle(url)) {

Review comment:
       What if there are multiple implementations that accept the URL? We 
should probably catch such cases and give an appropriate error. 
   
   Comparing with `FactoryUtil` for the Table & SQL stack, it probably also 
makes sense to throw an error if no implementations could be found (note: I 
don't mean none that accept the URL, but none at all, hinting to some setup 
problem).

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.psql;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link PostgresDialect}. */
+public class PostgresDialectFactory implements JdbcDialectFactory {

Review comment:
       Add `@Internal`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -131,8 +144,11 @@ default String getInsertIntoStatement(String tableName, 
String[] fieldNames) {
     }
 
     /**
-     * Get update one row statement by condition fields, default not use limit 
1, because limit 1 is
-     * a sql dialect.
+     * Constructs the dialects update statement for a single row with the 
given condition.
+     *
+     * <p>The default implementation does not use {@code LIMIT 1} as limit is 
dialect specific.
+     *
+     * @return A single row update statement.
      */
     default String getUpdateStatement(

Review comment:
       The more default implementations I see here, the more I think maybe we 
should separate the interface and some (abstract) base implementation. Which we 
already have in `AbstractDialect` anyway. Should we consider moving these there?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLDialect.java
##########
@@ -29,7 +30,7 @@
 import java.util.stream.Collectors;
 
 /** JDBC dialect for MySQL. */
-public class MySQLDialect extends AbstractDialect {
+class MySQLDialect extends AbstractDialect {

Review comment:
       nit: `@Internal` to be clear?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -66,13 +63,19 @@
      */
     String getLimitClause(long limit);
 
+    /** @return True if two instances support the same dialect. */
+    boolean equals(Object o);

Review comment:
       Where do we actually need this (same for hash code)?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -80,6 +85,17 @@ public void validate(TableSchema schema) throws 
ValidationException {
         }
     }
 
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof JdbcDialect
+                && ((JdbcDialect) obj).dialectName().equals(dialectName());
+    }
+
+    @Override
+    public int hashCode() {
+        return dialectName().hashCode() >> 31;
+    }
+
     public abstract int maxDecimalPrecision();

Review comment:
       All of the abstract methods should have proper JavaDocs now that this is 
public API.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java
##########
@@ -18,23 +18,25 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import java.util.Arrays;
-import java.util.List;
+import org.apache.flink.annotation.Internal;
+
 import java.util.Optional;
+import java.util.ServiceLoader;
 
 /** Default JDBC dialects. */
+@Internal
 public final class JdbcDialects {

Review comment:
       nit: add a private default constructor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to