sjwiesman commented on a change in pull request #17264:
URL: https://github.com/apache/flink/pull/17264#discussion_r711096508



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractDialect.java
##########
@@ -18,80 +18,261 @@
 
 package org.apache.flink.connector.jdbc.dialect;
 
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
-abstract class AbstractDialect implements JdbcDialect {
+import static java.lang.String.format;
+
+/**
+ * Base class for {@link org.apache.flink.connector.jdbc.dialect.JdbcDialect 
JdbcDialect's} that
+ * implements basic data type validation and the construction of basic {@code 
INSERT}, {@code
+ * UPDATE}, {@code DELETE}, and {@code SELECT} statements.
+ *
+ * <p>Implementors should be careful to check the default SQL statements are 
performant for their
+ * specific dialect and override them if necessary.
+ */
+@PublicEvolving
+public abstract class AbstractDialect implements JdbcDialect {
 
     @Override
-    public void validate(TableSchema schema) throws ValidationException {
-        for (int i = 0; i < schema.getFieldCount(); i++) {
-            DataType dt = schema.getFieldDataType(i).get();
-            String fieldName = schema.getFieldName(i).get();
+    public void validate(DataType dataType) throws ValidationException {
+        if (!(dataType.getLogicalType() instanceof RowType)) {
+            throw new ValidationException("Logical DataType must be a 
RowType");
+        }
 
+        RowType rowType = (RowType) dataType.getLogicalType();
+        for (RowType.RowField field : rowType.getFields()) {
             // TODO: We can't convert VARBINARY(n) data type to
             //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
-            // LegacyTypeInfoDataTypeConverter
-            //  when n is smaller than Integer.MAX_VALUE
-            if (unsupportedTypes().contains(dt.getLogicalType().getTypeRoot())
-                    || (dt.getLogicalType() instanceof VarBinaryType
+            //  LegacyTypeInfoDataTypeConverter when n is smaller
+            //  than Integer.MAX_VALUE
+            if (unsupportedTypes().contains(field.getType().getTypeRoot())
+                    || (field.getType() instanceof VarBinaryType
                             && Integer.MAX_VALUE
-                                    != ((VarBinaryType) 
dt.getLogicalType()).getLength())) {
+                                    != ((VarBinaryType) 
field.getType()).getLength())) {
                 throw new ValidationException(
                         String.format(
                                 "The %s dialect doesn't support type: %s.",
-                                dialectName(), dt.toString()));
+                                dialectName(), field.getType()));
             }
 
-            if (dt.getLogicalType() instanceof DecimalType) {
-                int precision = ((DecimalType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxDecimalPrecision() || precision < 
minDecimalPrecision()) {
+            if (field.getType() instanceof DecimalType) {
+                Range range =
+                        decimalPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports DECIMAL type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((DecimalType) field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
DECIMAL "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minDecimalPrecision(),
-                                    maxDecimalPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
 
-            if (dt.getLogicalType() instanceof TimestampType) {
-                int precision = ((TimestampType) 
dt.getLogicalType()).getPrecision();
-                if (precision > maxTimestampPrecision() || precision < 
minTimestampPrecision()) {
+            if (field.getType() instanceof TimestampType) {
+                Range range =
+                        timestampPrecisionRange()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        String.format(
+                                                                "JdbcDialect 
%s supports TIMESTAMP type but no precision range has been set",
+                                                                
dialectName())));
+                int precision = ((TimestampType) 
field.getType()).getPrecision();
+                if (precision > range.max || precision < range.min) {
                     throw new ValidationException(
                             String.format(
                                     "The precision of field '%s' is out of the 
TIMESTAMP "
                                             + "precision range [%d, %d] 
supported by %s dialect.",
-                                    fieldName,
-                                    minTimestampPrecision(),
-                                    maxTimestampPrecision(),
-                                    dialectName()));
+                                    field.getName(), range.min, range.max, 
dialectName()));
                 }
             }
         }
     }
 
-    public abstract int maxDecimalPrecision();
+    /**
+     * A simple {@code INSERT INTO} statement.
+     *
+     * <pre>{@code
+     * INSERT INTO table_name (column_name [, ...])
+     * VALUES (value [, ...])
+     * }</pre>
+     */
+    @Override
+    public String getInsertIntoStatement(String tableName, String[] 
fieldNames) {
+        String columns =
+                Arrays.stream(fieldNames)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String placeholders =
+                Arrays.stream(fieldNames).map(f -> ":" + 
f).collect(Collectors.joining(", "));
+        return "INSERT INTO "
+                + quoteIdentifier(tableName)
+                + "("
+                + columns
+                + ")"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    /**
+     * A simple single row {@code UPDATE} statement.
+     *
+     * <pre>{@code
+     * UPDATE table_name
+     * SET col = val [, ...]
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getUpdateStatement(
+            String tableName, String[] fieldNames, String[] conditionFields) {
+        String setClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(", "));
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "UPDATE "
+                + quoteIdentifier(tableName)
+                + " SET "
+                + setClause
+                + " WHERE "
+                + conditionClause;
+    }
+
+    /**
+     * A simple single row {@code DELETE} statement.
+     *
+     * <pre>{@code
+     * DELETE FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getDeleteStatement(String tableName, String[] 
conditionFields) {
+        String conditionClause =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + 
conditionClause;
+    }
+
+    /**
+     * A simple {@code SELECT} statement.
+     *
+     * <pre>{@code
+     * SELECT expression [, ...]
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getSelectFromStatement(
+            String tableName, String[] selectFields, String[] conditionFields) 
{
+        String selectExpressions =
+                Arrays.stream(selectFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT "
+                + selectExpressions
+                + " FROM "
+                + quoteIdentifier(tableName)
+                + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : 
"");
+    }
+
+    /**
+     * A simple {@code SELECT} statement that checks for the existence of a 
single row.
+     *
+     * <p>
+     *
+     * <pre>{@code
+     * SELECT 1
+     * FROM table_name
+     * WHERE cond [AND ...]
+     * }</pre>
+     */
+    @Override
+    public String getRowExistsStatement(String tableName, String[] 
conditionFields) {
+        String fieldExpressions =
+                Arrays.stream(conditionFields)
+                        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
+                        .collect(Collectors.joining(" AND "));
+        return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + 
fieldExpressions;
+    }
 
-    public abstract int minDecimalPrecision();
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof JdbcDialect
+                && ((JdbcDialect) obj).dialectName().equals(dialectName());
+    }
 
-    public abstract int maxTimestampPrecision();
+    @Override
+    public int hashCode() {
+        return dialectName().hashCode() >> 31;
+    }
 
-    public abstract int minTimestampPrecision();
+    /**
+     * @return The inclusive range [min,max] of supported precisions for 
{@link TimestampType}
+     *     columns. None if timestamp type is not supported.
+     */
+    public abstract Optional<Range> timestampPrecisionRange();

Review comment:
       sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to