Airblader commented on a change in pull request #17264:
URL: https://github.com/apache/flink/pull/17264#discussion_r709807987
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -31,8 +31,13 @@
import static java.lang.String.format;
-/** Handle the SQL dialect of jdbc driver. */
-@Internal
+/**
+ * Represents a dialect of SQL implemented by a particular JDBC system.
Dialects should be immutable
+ * and stateless.
+ *
+ * @see JdbcDialectFactory
+ */
+@PublicEvolving
public interface JdbcDialect extends Serializable {
Review comment:
Since we're making this a public API now it would be good to go through
all the JavaDocs in here and bring them a bit into shape.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create different JdbcDialect's. This factory is used with
Java's Service Provider
Review comment:
```suggestion
* A factory to create different {@link JdbcDialect JdbcDialects}. This
factory is used with Java's Service Provider
```
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link MySQLDialect}. */
+public class MySQLDialectFactory implements JdbcDialectFactory {
Review comment:
Add `@Internal`
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/derby/DerbyDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.derby;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link DerbyDialect}. */
+public class DerbyDialectFactory implements JdbcDialectFactory {
Review comment:
Add `@Internal`
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,60 +18,65 @@
package org.apache.flink.connector.jdbc.dialect;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import java.util.List;
-abstract class AbstractDialect implements JdbcDialect {
+/** Base class that handles common validation of JDBC data types. */
+@PublicEvolving
Review comment:
Should we really make this public already? If so, what is the contract
that we guarantee here (other than `JdbcDialect`'s)? This deserves better
documentation, I think.
I'm also not sure the design of this class is great yet. For example
implementers are forced to specifiy min/max precisions, maybe that should be
optional? Also, shouldn't an implementation list supported types rather than
unsupported ones?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,27 +86,37 @@ default void validate(TableSchema schema) throws
ValidationException {}
}
/**
- * Quotes the identifier. This is used to put quotes around the identifier
in case the column
- * name is a reserved keyword, or in case it contains characters that
require quotes (e.g.
- * space). Default using double quotes {@code "} to quote.
+ * Quotes the identifier.
+ *
+ * <p>Used to put quotes around the identifier if the column name is a
reserved keyword or
+ * contains characters requiring quotes (e.g., space). Default using
double quotes {@code "} to
+ * quote.
+ *
+ * @return the quoted identifier.
*/
default String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
}
/**
- * Get dialect upsert statement, the database has its own upsert syntax,
such as Mysql using
- * DUPLICATE KEY UPDATE, and PostgreSQL using ON CONFLICT... DO UPDATE
SET..
+ * Constructs the dialects upsert statement if supported; such as MySQL's
{@code DUPLICATE KEY
+ * UPDATE}, or PostgreSQLs {@code ON CONFLICT... DO UPDATE SET..}.
+ *
+ * <p>If the dialect does not support native upsert statements, the writer
will fallback to
+ * {@code SELECT} + {@code Update}/{@code INSERT} which may have poor
performance.
*
- * @return None if dialect does not support upsert statement, the writer
will degrade to the use
- * of select + update/insert, this performance is poor.
+ * @return The upsert statement if supported, otherwise None.
*/
default Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
Review comment:
Given the performance impact if this is left unimplemented, is it maybe
worth not default-implementing it to force implementations to make a conscious
decision?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create different JdbcDialect's. This factory is used with
Java's Service Provider
+ * Interfaces (SPI) for discovering.
+ *
+ * <p>Classes that implement this interface can be added to the
+ *
"META_INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory"
file of a JAR file
+ * in the current classpath to be found.
+ *
+ * @see JdbcDialect
+ */
+@PublicEvolving
+public interface JdbcDialectFactory {
+
+ /**
+ * Check if this dialect instance can handle a certain jdbc url.
+ *
+ * @param url the jdbc url.
+ * @return True if the dialect can be applied on the given jdbc url.
+ */
+ boolean canHandle(String url);
Review comment:
Maybe we can rename this to `acceptsURL` to be in line with
`java.sql.Driver`?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,27 +86,37 @@ default void validate(TableSchema schema) throws
ValidationException {}
}
/**
- * Quotes the identifier. This is used to put quotes around the identifier
in case the column
- * name is a reserved keyword, or in case it contains characters that
require quotes (e.g.
- * space). Default using double quotes {@code "} to quote.
+ * Quotes the identifier.
+ *
+ * <p>Used to put quotes around the identifier if the column name is a
reserved keyword or
+ * contains characters requiring quotes (e.g., space). Default using
double quotes {@code "} to
+ * quote.
+ *
+ * @return the quoted identifier.
*/
default String quoteIdentifier(String identifier) {
return "\"" + identifier + "\"";
Review comment:
The default implementation seems broken to me because it doesn't escape
the identifier. That being said, should we really offer a default
implementation for this?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java
##########
@@ -18,23 +18,25 @@
package org.apache.flink.connector.jdbc.dialect;
-import java.util.Arrays;
-import java.util.List;
+import org.apache.flink.annotation.Internal;
+
import java.util.Optional;
+import java.util.ServiceLoader;
/** Default JDBC dialects. */
+@Internal
public final class JdbcDialects {
- private static final List<JdbcDialect> DIALECTS =
- Arrays.asList(new DerbyDialect(), new MySQLDialect(), new
PostgresDialect());
-
/** Fetch the JdbcDialect class corresponding to a given database url. */
public static Optional<JdbcDialect> get(String url) {
- for (JdbcDialect dialect : DIALECTS) {
- if (dialect.canHandle(url)) {
- return Optional.of(dialect);
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+ for (JdbcDialectFactory factory :
ServiceLoader.load(JdbcDialectFactory.class, cl)) {
+ if (factory.canHandle(url)) {
Review comment:
What if there are multiple implementations that accept the URL? We
should probably catch such cases and give an appropriate error.
Comparing with `FactoryUtil` for the Table & SQL stack, it probably also
makes sense to throw an error if no implementations could be found (note: I
don't mean none that accept the URL, but none at all, hinting to some setup
problem).
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/psql/PostgresDialectFactory.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.psql;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link PostgresDialect}. */
+public class PostgresDialectFactory implements JdbcDialectFactory {
Review comment:
Add `@Internal`
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -131,8 +144,11 @@ default String getInsertIntoStatement(String tableName,
String[] fieldNames) {
}
/**
- * Get update one row statement by condition fields, default not use limit
1, because limit 1 is
- * a sql dialect.
+ * Constructs the dialects update statement for a single row with the
given condition.
+ *
+ * <p>The default implementation does not use {@code LIMIT 1} as limit is
dialect specific.
+ *
+ * @return A single row update statement.
*/
default String getUpdateStatement(
Review comment:
The more default implementations I see here, the more I think maybe we
should separate the interface and some (abstract) base implementation. Which we
already have in `AbstractDialect` anyway. Should we consider moving these there?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLDialect.java
##########
@@ -29,7 +30,7 @@
import java.util.stream.Collectors;
/** JDBC dialect for MySQL. */
-public class MySQLDialect extends AbstractDialect {
+class MySQLDialect extends AbstractDialect {
Review comment:
nit: `@Internal` to be clear?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -66,13 +63,19 @@
*/
String getLimitClause(long limit);
+ /** @return True if two instances support the same dialect. */
+ boolean equals(Object o);
Review comment:
Where do we actually need this (same for hash code)?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -80,6 +85,17 @@ public void validate(TableSchema schema) throws
ValidationException {
}
}
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof JdbcDialect
+ && ((JdbcDialect) obj).dialectName().equals(dialectName());
+ }
+
+ @Override
+ public int hashCode() {
+ return dialectName().hashCode() >> 31;
+ }
+
public abstract int maxDecimalPrecision();
Review comment:
All of the abstract methods should have proper JavaDocs now that this is
public API.
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java
##########
@@ -18,23 +18,25 @@
package org.apache.flink.connector.jdbc.dialect;
-import java.util.Arrays;
-import java.util.List;
+import org.apache.flink.annotation.Internal;
+
import java.util.Optional;
+import java.util.ServiceLoader;
/** Default JDBC dialects. */
+@Internal
public final class JdbcDialects {
Review comment:
nit: add a private default constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]