Airblader commented on a change in pull request #17264:
URL: https://github.com/apache/flink/pull/17264#discussion_r710784215



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that

Review comment:
       nit: why the fully-qualified name for the link here?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,102 +76,56 @@ default void validate(TableSchema schema) throws 
ValidationException {}
     }
 
     /**
-     * Quotes the identifier. This is used to put quotes around the identifier 
in case the column
-     * name is a reserved keyword, or in case it contains characters that 
require quotes (e.g.
-     * space). Default using double quotes {@code "} to quote.
+     * Quotes the identifier.
+     *
+     * <p>Used to put quotes around the identifier if the column name is a 
reserved keyword or
+     * contains characters requiring quotes (e.g., space).
+     *
+     * @return the quoted identifier.
      */
-    default String quoteIdentifier(String identifier) {
-        return "\"" + identifier + "\"";
-    }
+    String quoteIdentifier(String identifier);
 
     /**
-     * Get dialect upsert statement, the database has its own upsert syntax, 
such as Mysql using
-     * DUPLICATE KEY UPDATE, and PostgreSQL using ON CONFLICT... DO UPDATE 
SET..
+     * Constructs the dialects upsert statement if supported; such as MySQL's 
{@code DUPLICATE KEY
+     * UPDATE}, or PostgreSQLs {@code ON CONFLICT... DO UPDATE SET..}.
      *
-     * @return None if dialect does not support upsert statement, the writer 
will degrade to the use
-     *     of select + update/insert, this performance is poor.
+     * <p>If the dialect does not support native upsert statements, the writer 
will fallback to
+     * {@code SELECT} + {@code Update}/{@code INSERT} which may have poor 
performance.
+     *
+     * @return The upsert statement if supported, otherwise None.
      */
-    default Optional<String> getUpsertStatement(
-            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
-        return Optional.empty();
-    }
+    Optional<String> getUpsertStatement(

Review comment:
       Maybe for all the methods taking some string arrays we could still add a 
sentence to the JavaDoc explaining a bit better what an implementation has to 
do, i.e. use prepared statements where the placeholder name is the same as the 
field name?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that
+ * implements basic data type validation and the construction of basic {@code 
INSERT}, {@code
+ * UPDATE}, {@code DELETE}, and {@code SELECT} statements.
+ *
+ * <p>Implementors should be careful to check the default SQL statements are 
performant for their
+ * specific dialect and override them if necessary.
+ */
+@PublicEvolving
+public abstract class AbstractDialect implements JdbcDialect {
 
     @Override
-    public void validate(TableSchema schema) throws ValidationException {
-        for (int i = 0; i < schema.getFieldCount(); i++) {
-            DataType dt = schema.getFieldDataType(i).get();
-            String fieldName = schema.getFieldName(i).get();
+    public void validate(DataType dataType) throws ValidationException {
+        if (!(dataType.getLogicalType() instanceof RowType)) {
+            throw new ValidationException("Logical DataType must be a 
RowType");
+        }
 
+        RowType rowType = (RowType) dataType.getLogicalType();
+        for (RowType.RowField field : rowType.getFields()) {
             // TODO: We can't convert VARBINARY(n) data type to
             //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
-            // LegacyTypeInfoDataTypeConverter
-            //  when n is smaller than Integer.MAX_VALUE
-            if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
-                    || (dt.getLogicalType() instanceof VarBinaryType
+            //  LegacyTypeInfoDataTypeConverter when n is smaller
+            //  than Integer.MAX_VALUE
+            if (unsupportedTypes().contains(field.getType().getTypeRoot())
+                    || (field.getType() instanceof VarBinaryType
                             && Integer.MAX_VALUE
-                                    != ((VarBinaryType) 
dt.getLogicalType()).getLength())) {
+                                    != ((VarBinaryType) 
field.getType()).getLength())) {
                 throw new ValidationException(
                         String.format(
                                 "The %s dialect doesn't support type: %s.",
-                                dialectName(), dt.toString()));
+                                dialectName(), field.getType()));
             }
 
-            if (dt.getLogicalType() instanceof DecimalType) {
-                int precision = ((DecimalType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxDecimalPrecision() || precision < 
minDecimalPrecision()) {
+            if (field.getType() instanceof DecimalType) {
+                Range range =
+                        decimalPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports DECIMAL type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((DecimalType) field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
DECIMAL "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minDecimalPrecision(),
-                                    maxDecimalPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
 
-            if (dt.getLogicalType() instanceof TimestampType) {
-                int precision = ((TimestampType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxTimestampPrecision() || precision < 
minTimestampPrecision()) {
+            if (field.getType() instanceof TimestampType) {
+                Range range =
+                        timestampPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports TIMESTAMP type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((TimestampType) 
field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
TIMESTAMP "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minTimestampPrecision(),
-                                    maxTimestampPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
         }
     }
 
-    public abstract int maxDecimalPrecision();
+    /**
+     * A simple {@code INSERT INTO} statement.
+     *
+     * <pre>{@code
+     * INSERT INTO table_name (column_name [, ...])
+     * VALUES (value [, ...])
+     * }</pre>
+     */
+    @Override
+    public String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String placeholders =
+                Arrays.stream(fieldNames).map(f -> ":" + 
f).collect(Collectors.joining(", "));
+        return "INSERT INTO "
+                + quoteIdentifier(tableName)
+                + "("
+                + columns
+                + ")"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    /**
+     * A simple single row {@code UPDATE} statement.
+     *
+     * <pre>{@code
+     * UPDATE table_name
+     * SET col = val [, ...]
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getUpdateStatement(
+            String tableName, String[] fieldNames, String[] conditionFields) {
+        String setClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(", "));
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "UPDATE "
+                + quoteIdentifier(tableName)
+                + " SET "
+                + setClause
+                + " WHERE "
+                + conditionClause;
+    }
+
+    /**
+     * A simple single row {@code DELETE} statement.
+     *
+     * <pre>{@code
+     * DELETE FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getDeleteStatement(String tableName, String[] 
conditionFields) {
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + 
conditionClause;
+    }
+
+    /**
+     * A simple {@code SELECT} statement.
+     *
+     * <pre>{@code
+     * SELECT expression [, ...]
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getSelectFromStatement(
+            String tableName, String[] selectFields, String[] conditionFields) 
{
+        String selectExpressions =
+                Arrays.stream(selectFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT "
+                + selectExpressions
+                + " FROM "
+                + quoteIdentifier(tableName)
+                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : 
"");
+    }
+
+    /**
+     * A simple {@code SELECT} statement that checks for the existence of a 
single row.
+     *
+     * <p>
+     *
+     * <pre>{@code
+     * SELECT 1
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + 
fieldExpressions;
+    }
 
-    public abstract int minDecimalPrecision();
+    @Override
+    public boolean equals(Object obj) {

Review comment:
       Since we removed equals / hashCode from the interface, I'm still 
wondering why we need equality on the dialect name. Is this used anywhere?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that

Review comment:
       Maybe I'm missing something here, but you chose the apostrophe here even 
after the revision. I'm not a native speaker, but to me the apostrophe looks 
wrong since it's not possessive?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,102 +76,56 @@ default void validate(TableSchema schema) throws 
ValidationException {}
     }
 
     /**
-     * Quotes the identifier. This is used to put quotes around the identifier 
in case the column
-     * name is a reserved keyword, or in case it contains characters that 
require quotes (e.g.
-     * space). Default using double quotes {@code "} to quote.
+     * Quotes the identifier.
+     *
+     * <p>Used to put quotes around the identifier if the column name is a 
reserved keyword or
+     * contains characters requiring quotes (e.g., space).
+     *
+     * @return the quoted identifier.
      */
-    default String quoteIdentifier(String identifier) {
-        return "\"" + identifier + "\"";
-    }
+    String quoteIdentifier(String identifier);
 
     /**
-     * Get dialect upsert statement, the database has its own upsert syntax, 
such as Mysql using
-     * DUPLICATE KEY UPDATE, and PostgreSQL using ON CONFLICT... DO UPDATE 
SET..
+     * Constructs the dialects upsert statement if supported; such as MySQL's 
{@code DUPLICATE KEY
+     * UPDATE}, or PostgreSQLs {@code ON CONFLICT... DO UPDATE SET..}.

Review comment:
       ```suggestion
        * UPDATE}, or PostgreSQL's {@code ON CONFLICT... DO UPDATE SET..}.
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that
+ * implements basic data type validation and the construction of basic {@code 
INSERT}, {@code
+ * UPDATE}, {@code DELETE}, and {@code SELECT} statements.
+ *
+ * <p>Implementors should be careful to check the default SQL statements are 
performant for their
+ * specific dialect and override them if necessary.
+ */
+@PublicEvolving
+public abstract class AbstractDialect implements JdbcDialect {
 
     @Override
-    public void validate(TableSchema schema) throws ValidationException {
-        for (int i = 0; i < schema.getFieldCount(); i++) {
-            DataType dt = schema.getFieldDataType(i).get();
-            String fieldName = schema.getFieldName(i).get();
+    public void validate(DataType dataType) throws ValidationException {
+        if (!(dataType.getLogicalType() instanceof RowType)) {
+            throw new ValidationException("Logical DataType must be a 
RowType");
+        }
 
+        RowType rowType = (RowType) dataType.getLogicalType();
+        for (RowType.RowField field : rowType.getFields()) {
             // TODO: We can't convert VARBINARY(n) data type to
             //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
-            // LegacyTypeInfoDataTypeConverter
-            //  when n is smaller than Integer.MAX_VALUE
-            if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
-                    || (dt.getLogicalType() instanceof VarBinaryType
+            //  LegacyTypeInfoDataTypeConverter when n is smaller
+            //  than Integer.MAX_VALUE
+            if (unsupportedTypes().contains(field.getType().getTypeRoot())
+                    || (field.getType() instanceof VarBinaryType
                             && Integer.MAX_VALUE
-                                    != ((VarBinaryType) 
dt.getLogicalType()).getLength())) {
+                                    != ((VarBinaryType) 
field.getType()).getLength())) {
                 throw new ValidationException(
                         String.format(
                                 "The %s dialect doesn't support type: %s.",
-                                dialectName(), dt.toString()));
+                                dialectName(), field.getType()));
             }
 
-            if (dt.getLogicalType() instanceof DecimalType) {
-                int precision = ((DecimalType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxDecimalPrecision() || precision < 
minDecimalPrecision()) {
+            if (field.getType() instanceof DecimalType) {
+                Range range =
+                        decimalPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports DECIMAL type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((DecimalType) field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
DECIMAL "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minDecimalPrecision(),
-                                    maxDecimalPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
 
-            if (dt.getLogicalType() instanceof TimestampType) {
-                int precision = ((TimestampType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxTimestampPrecision() || precision < 
minTimestampPrecision()) {
+            if (field.getType() instanceof TimestampType) {
+                Range range =
+                        timestampPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports TIMESTAMP type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((TimestampType) 
field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
TIMESTAMP "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minTimestampPrecision(),
-                                    maxTimestampPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
         }
     }
 
-    public abstract int maxDecimalPrecision();
+    /**
+     * A simple {@code INSERT INTO} statement.
+     *
+     * <pre>{@code
+     * INSERT INTO table_name (column_name [, ...])
+     * VALUES (value [, ...])
+     * }</pre>
+     */
+    @Override
+    public String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String placeholders =
+                Arrays.stream(fieldNames).map(f -> ":" + 
f).collect(Collectors.joining(", "));
+        return "INSERT INTO "
+                + quoteIdentifier(tableName)
+                + "("
+                + columns
+                + ")"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    /**
+     * A simple single row {@code UPDATE} statement.
+     *
+     * <pre>{@code
+     * UPDATE table_name
+     * SET col = val [, ...]
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getUpdateStatement(
+            String tableName, String[] fieldNames, String[] conditionFields) {
+        String setClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(", "));
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "UPDATE "
+                + quoteIdentifier(tableName)
+                + " SET "
+                + setClause
+                + " WHERE "
+                + conditionClause;
+    }
+
+    /**
+     * A simple single row {@code DELETE} statement.
+     *
+     * <pre>{@code
+     * DELETE FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getDeleteStatement(String tableName, String[] 
conditionFields) {
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + 
conditionClause;
+    }
+
+    /**
+     * A simple {@code SELECT} statement.
+     *
+     * <pre>{@code
+     * SELECT expression [, ...]
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getSelectFromStatement(
+            String tableName, String[] selectFields, String[] conditionFields) 
{
+        String selectExpressions =
+                Arrays.stream(selectFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT "
+                + selectExpressions
+                + " FROM "
+                + quoteIdentifier(tableName)
+                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : 
"");
+    }
+
+    /**
+     * A simple {@code SELECT} statement that checks for the existence of a 
single row.
+     *
+     * <p>
+     *
+     * <pre>{@code
+     * SELECT 1
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + 
fieldExpressions;
+    }
 
-    public abstract int minDecimalPrecision();
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof JdbcDialect
+                && ((JdbcDialect) obj).dialectName().equals(dialectName());
+    }
 
-    public abstract int maxTimestampPrecision();
+    @Override
+    public int hashCode() {
+        return dialectName().hashCode() >> 31;
+    }
 
-    public abstract int minTimestampPrecision();
+    /**
+     * @return The inclusive range [min,max] of supported precisions for 
{@link TimestampType}
+     *     columns. None if timestamp type is not supported.
+     */
+    public abstract Optional<Range> timestampPrecisionRange();

Review comment:
       Default-implement these to return `Optional.empty()`?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectFactory.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * A factory to create different {@link JdbcDialect JdbcDialect's}. This 
factory is used with Java's

Review comment:
       Pending the reply on the comment above, here's another apostrophe that 
seems wrong to me.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -83,102 +76,56 @@ default void validate(TableSchema schema) throws 
ValidationException {}
     }
 
     /**
-     * Quotes the identifier. This is used to put quotes around the identifier 
in case the column
-     * name is a reserved keyword, or in case it contains characters that 
require quotes (e.g.
-     * space). Default using double quotes {@code "} to quote.
+     * Quotes the identifier.
+     *
+     * <p>Used to put quotes around the identifier if the column name is a 
reserved keyword or
+     * contains characters requiring quotes (e.g., space).
+     *
+     * @return the quoted identifier.
      */
-    default String quoteIdentifier(String identifier) {
-        return "\"" + identifier + "\"";
-    }
+    String quoteIdentifier(String identifier);
 
     /**
-     * Get dialect upsert statement, the database has its own upsert syntax, 
such as Mysql using
-     * DUPLICATE KEY UPDATE, and PostgreSQL using ON CONFLICT... DO UPDATE 
SET..
+     * Constructs the dialects upsert statement if supported; such as MySQL's 
{@code DUPLICATE KEY
+     * UPDATE}, or PostgreSQLs {@code ON CONFLICT... DO UPDATE SET..}.
      *
-     * @return None if dialect does not support upsert statement, the writer 
will degrade to the use
-     *     of select + update/insert, this performance is poor.
+     * <p>If the dialect does not support native upsert statements, the writer 
will fallback to
+     * {@code SELECT} + {@code Update}/{@code INSERT} which may have poor 
performance.

Review comment:
       ```suggestion
        * {@code SELECT} + {@code UPDATE}/{@code INSERT} which may have poor 
performance.
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that
+ * implements basic data type validation and the construction of basic {@code 
INSERT}, {@code
+ * UPDATE}, {@code DELETE}, and {@code SELECT} statements.
+ *
+ * <p>Implementors should be careful to check the default SQL statements are 
performant for their
+ * specific dialect and override them if necessary.
+ */
+@PublicEvolving
+public abstract class AbstractDialect implements JdbcDialect {
 
     @Override
-    public void validate(TableSchema schema) throws ValidationException {
-        for (int i = 0; i < schema.getFieldCount(); i++) {
-            DataType dt = schema.getFieldDataType(i).get();
-            String fieldName = schema.getFieldName(i).get();
+    public void validate(DataType dataType) throws ValidationException {
+        if (!(dataType.getLogicalType() instanceof RowType)) {
+            throw new ValidationException("Logical DataType must be a 
RowType");
+        }
 
+        RowType rowType = (RowType) dataType.getLogicalType();
+        for (RowType.RowField field : rowType.getFields()) {
             // TODO: We can't convert VARBINARY(n) data type to
             //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
-            // LegacyTypeInfoDataTypeConverter
-            //  when n is smaller than Integer.MAX_VALUE
-            if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
-                    || (dt.getLogicalType() instanceof VarBinaryType
+            //  LegacyTypeInfoDataTypeConverter when n is smaller
+            //  than Integer.MAX_VALUE
+            if (unsupportedTypes().contains(field.getType().getTypeRoot())
+                    || (field.getType() instanceof VarBinaryType
                             && Integer.MAX_VALUE
-                                    != ((VarBinaryType) 
dt.getLogicalType()).getLength())) {
+                                    != ((VarBinaryType) 
field.getType()).getLength())) {
                 throw new ValidationException(
                         String.format(
                                 "The %s dialect doesn't support type: %s.",
-                                dialectName(), dt.toString()));
+                                dialectName(), field.getType()));
             }
 
-            if (dt.getLogicalType() instanceof DecimalType) {
-                int precision = ((DecimalType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxDecimalPrecision() || precision < 
minDecimalPrecision()) {
+            if (field.getType() instanceof DecimalType) {
+                Range range =
+                        decimalPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports DECIMAL type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((DecimalType) field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
DECIMAL "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minDecimalPrecision(),
-                                    maxDecimalPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
 
-            if (dt.getLogicalType() instanceof TimestampType) {
-                int precision = ((TimestampType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxTimestampPrecision() || precision < 
minTimestampPrecision()) {
+            if (field.getType() instanceof TimestampType) {
+                Range range =
+                        timestampPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports TIMESTAMP type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((TimestampType) 
field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
TIMESTAMP "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minTimestampPrecision(),
-                                    maxTimestampPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
         }
     }
 
-    public abstract int maxDecimalPrecision();
+    /**
+     * A simple {@code INSERT INTO} statement.
+     *
+     * <pre>{@code
+     * INSERT INTO table_name (column_name [, ...])
+     * VALUES (value [, ...])
+     * }</pre>
+     */
+    @Override
+    public String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String placeholders =
+                Arrays.stream(fieldNames).map(f -> ":" + 
f).collect(Collectors.joining(", "));
+        return "INSERT INTO "
+                + quoteIdentifier(tableName)
+                + "("
+                + columns
+                + ")"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    /**
+     * A simple single row {@code UPDATE} statement.
+     *
+     * <pre>{@code
+     * UPDATE table_name
+     * SET col = val [, ...]
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getUpdateStatement(
+            String tableName, String[] fieldNames, String[] conditionFields) {
+        String setClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(", "));
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "UPDATE "
+                + quoteIdentifier(tableName)
+                + " SET "
+                + setClause
+                + " WHERE "
+                + conditionClause;
+    }
+
+    /**
+     * A simple single row {@code DELETE} statement.
+     *
+     * <pre>{@code
+     * DELETE FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getDeleteStatement(String tableName, String[] 
conditionFields) {
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + 
conditionClause;
+    }
+
+    /**
+     * A simple {@code SELECT} statement.
+     *
+     * <pre>{@code
+     * SELECT expression [, ...]
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getSelectFromStatement(
+            String tableName, String[] selectFields, String[] conditionFields) 
{
+        String selectExpressions =
+                Arrays.stream(selectFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT "
+                + selectExpressions
+                + " FROM "
+                + quoteIdentifier(tableName)
+                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : 
"");
+    }
+
+    /**
+     * A simple {@code SELECT} statement that checks for the existence of a 
single row.
+     *
+     * <p>
+     *
+     * <pre>{@code
+     * SELECT 1
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + 
fieldExpressions;
+    }
 
-    public abstract int minDecimalPrecision();
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof JdbcDialect
+                && ((JdbcDialect) obj).dialectName().equals(dialectName());
+    }
 
-    public abstract int maxTimestampPrecision();
+    @Override
+    public int hashCode() {
+        return dialectName().hashCode() >> 31;
+    }
 
-    public abstract int minTimestampPrecision();
+    /**
+     * @return The inclusive range [min,max] of supported precisions for 
{@link TimestampType}
+     *     columns. None if timestamp type is not supported.
+     */
+    public abstract Optional<Range> timestampPrecisionRange();
+
+    /**
+     * @return The inclusive range [min,max] of supported precisions for 
{@link DecimalType}
+     *     columns. None if decimal type is not supported.
+     */
+    public abstract Optional<Range> decimalPrecisionRange();
 
     /**
      * Defines the unsupported types for the dialect.
      *
      * @return a list of logical type roots.
      */
     public abstract List<LogicalTypeRoot> unsupportedTypes();

Review comment:
       I still think listing unsupported types is the wrong design. We should 
change this to list supported types instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to