This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d2204b3d3 [INLONG-7076][Sort] Add multi table sink for MySQL (#7079)
d2204b3d3 is described below

commit d2204b3d3d6b061acdbfb1e1560a007100f3b7e2
Author: chyueyi <[email protected]>
AuthorDate: Fri Dec 30 10:47:46 2022 +0800

    [INLONG-7076][Sort] Add multi table sink for MySQL (#7079)
---
 .../jdbc/converter/AbstractJdbcRowConverter.java   |  21 ++-
 .../jdbc/converter/mysql/MySQLRowConverter.java    |  39 +++++
 .../inlong/sort/jdbc/dialect/MySQLDialect.java     | 164 +++++++++++++++++++++
 .../internal/JdbcMultiBatchingOutputFormat.java    |  52 ++++++-
 .../inlong/sort/jdbc/table/JdbcDialects.java       |   2 +-
 5 files changed, 270 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
index 52e2eb29f..9715a4636 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
@@ -25,11 +25,12 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.utils.TypeConversions;
 
 import java.io.Serializable;
@@ -151,6 +152,7 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
                 return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() 
/ 1_000_000L);
             case TIMESTAMP_WITH_TIME_ZONE:
             case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return val -> val instanceof LocalDateTime
                         ? TimestampData.fromLocalDateTime((LocalDateTime) val)
                         : TimestampData.fromTimestamp((Timestamp) val);
@@ -179,14 +181,17 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
 
     protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
             JdbcSerializationConverter jdbcSerializationConverter, LogicalType 
type) {
-        final int sqlType =
-                JdbcTypeUtil.typeInformationToSqlType(
-                        TypeConversions.fromDataTypeToLegacyInfo(
-                                TypeConversions.fromLogicalToDataType(type)));
         return (val, index, statement) -> {
             if (val == null
                     || val.isNullAt(index)
                     || LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+                // typeInformationToSqlType don't support 
TIMESTAMP_WITH_LOCAL_TIME_ZONE 2014
+                int sqlType = 2014;
+                if (type.getTypeRoot() != 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+                    sqlType = JdbcTypeUtil.typeInformationToSqlType(
+                            TypeConversions.fromDataTypeToLegacyInfo(
+                                    
TypeConversions.fromLogicalToDataType(type)));
+                }
                 statement.setNull(index, sqlType);
             } else {
                 jdbcSerializationConverter.serialize(val, index, statement);
@@ -232,6 +237,10 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
                 final int timestampPrecision = ((TimestampType) 
type).getPrecision();
                 return (val, index, statement) -> statement.setTimestamp(
                         index, val.getTimestamp(index, 
timestampPrecision).toTimestamp());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int localTimestampPrecision = ((LocalZonedTimestampType) 
type).getPrecision();
+                return (val, index, statement) -> statement.setTimestamp(
+                        index, val.getTimestamp(index, 
localTimestampPrecision).toTimestamp());
             case DECIMAL:
                 final int decimalPrecision = ((DecimalType) 
type).getPrecision();
                 final int decimalScale = ((DecimalType) type).getScale();
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
new file mode 100644
index 000000000..6fb9a4b12
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter.mysql;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink 
internal object for MySQL.
+ */
+public class MySQLRowConverter extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    public MySQLRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    @Override
+    public String converterName() {
+        return "MySQL";
+    }
+
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
new file mode 100644
index 000000000..0c3e77083
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.inlong.sort.jdbc.converter.mysql.MySQLRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.internal.JdbcMultiBatchingComm;
+import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * JDBC dialect for MySQL.
+ * copy from flink-jdbc
+ * modify setQueryPrimaryKeySql
+ */
+public class MySQLDialect extends AbstractJdbcDialect {
+
+    private static final long serialVersionUID = 1L;
+    private static final String QUERY_PRIMARY_KEY_SQL =
+            "select  group_concat(COLUMN_NAME SEPARATOR ',') AS pkColumn, 
TABLE_NAME AS tableName\n" +
+                    "from information_schema.COLUMNS\n" +
+                    "where TABLE_SCHEMA = ?\n" +
+                    "and TABLE_NAME = ?\n" +
+                    "and COLUMN_KEY = 'PRI'\n" +
+                    "AND length(COLUMN_NAME) > 0";
+
+    // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
+    // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
+    private static final int MAX_TIMESTAMP_PRECISION = 6;
+    private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+    // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
+    // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
+    private static final int MAX_DECIMAL_PRECISION = 65;
+    private static final int MIN_DECIMAL_PRECISION = 1;
+
+    @Override
+    public boolean canHandle(String url) {
+        return url.startsWith("jdbc:mysql:");
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter(RowType rowType) {
+        return new MySQLRowConverter(rowType);
+    }
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "LIMIT " + limit;
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("com.mysql.jdbc.Driver");
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return "`" + identifier + "`";
+    }
+
+    /**
+     * Mysql upsert query use DUPLICATE KEY UPDATE.
+     *
+     * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
+     *
+     * <p>We don't use REPLACE INTO, if there are other fields, we can keep 
their previous values.
+     */
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> quoteIdentifier(f) + "=VALUES(" + 
quoteIdentifier(f) + ")")
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                getInsertIntoStatement(tableName, fieldNames)
+                        + " ON DUPLICATE KEY UPDATE "
+                        + updateClause);
+    }
+
+    @Override
+    public String dialectName() {
+        return "MySQL";
+    }
+
+    @Override
+    public int maxDecimalPrecision() {
+        return MAX_DECIMAL_PRECISION;
+    }
+
+    @Override
+    public int minDecimalPrecision() {
+        return MIN_DECIMAL_PRECISION;
+    }
+
+    @Override
+    public int maxTimestampPrecision() {
+        return MAX_TIMESTAMP_PRECISION;
+    }
+
+    @Override
+    public int minTimestampPrecision() {
+        return MIN_TIMESTAMP_PRECISION;
+    }
+
+    @Override
+    public List<LogicalTypeRoot> unsupportedTypes() {
+        // The data types used in Mysql are list at:
+        // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
+
+        // TODO: We can't convert BINARY data type to
+        // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+        // LegacyTypeInfoDataTypeConverter.
+        return Arrays.asList(
+                LogicalTypeRoot.BINARY,
+                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+                LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+                LogicalTypeRoot.INTERVAL_DAY_TIME,
+                LogicalTypeRoot.ARRAY,
+                LogicalTypeRoot.MULTISET,
+                LogicalTypeRoot.MAP,
+                LogicalTypeRoot.ROW,
+                LogicalTypeRoot.DISTINCT_TYPE,
+                LogicalTypeRoot.STRUCTURED_TYPE,
+                LogicalTypeRoot.NULL,
+                LogicalTypeRoot.RAW,
+                LogicalTypeRoot.SYMBOL,
+                LogicalTypeRoot.UNRESOLVED);
+    }
+
+    @Override
+    public PreparedStatement setQueryPrimaryKeySql(Connection conn, String 
tableIdentifier) throws SQLException {
+        PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL);
+        st.setString(1, 
JdbcMultiBatchingComm.getDatabaseNameFromIdentifier(tableIdentifier));
+        st.setString(2, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
+        return st;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index bb1c40e26..1176ca6f4 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -58,6 +58,16 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -116,6 +126,17 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
     private Long rowSize = 0L;
     private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
 
+    private static final DateTimeFormatter 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+    private static final DateTimeFormatter SQL_TIME_FORMAT;
+
+    static {
+        SQL_TIME_FORMAT = (new 
DateTimeFormatterBuilder()).appendPattern("HH:mm:ss")
+                .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, 
true).toFormatter();
+        SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
+                (new 
DateTimeFormatterBuilder()).append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral('T')
+                        
.append(SQL_TIME_FORMAT).appendPattern("'Z'").toFormatter();
+    }
+
     public JdbcMultiBatchingOutputFormat(
             @Nonnull JdbcConnectionProvider connectionProvider,
             @Nonnull JdbcExecutionOptions executionOptions,
@@ -399,7 +420,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                         record.setField(i, Double.valueOf(fieldValue));
                         break;
                     case TIME_WITHOUT_TIME_ZONE:
-                    case TIMESTAMP_WITHOUT_TIME_ZONE:
                     case INTERVAL_DAY_TIME:
                         TimestampData timestampData = 
TimestampData.fromEpochMillis(Long.valueOf(fieldValue));
                         record.setField(i, timestampData);
@@ -407,6 +427,36 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                     case BINARY:
                         record.setField(i, 
Arrays.toString(fieldValue.getBytes(StandardCharsets.UTF_8)));
                         break;
+
+                    // support mysql
+                    case INTEGER:
+                        record.setField(i, Integer.valueOf(fieldValue));
+                        break;
+                    case SMALLINT:
+                        record.setField(i, Short.valueOf(fieldValue));
+                        break;
+                    case TINYINT:
+                        record.setField(i, Byte.valueOf(fieldValue));
+                        break;
+                    case FLOAT:
+                        record.setField(i, Float.valueOf(fieldValue));
+                        break;
+                    case DATE:
+                        record.setField(i, (int) 
LocalDate.parse(fieldValue).toEpochDay());
+                        break;
+                    case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                        TemporalAccessor parsedTimestampWithLocalZone =
+                                
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(fieldValue);
+                        LocalTime localTime = 
parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+                        LocalDate localDate = 
parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+                        record.setField(i, 
TimestampData.fromInstant(LocalDateTime.of(localDate, localTime)
+                                .toInstant(ZoneOffset.UTC)));
+                        break;
+                    case TIMESTAMP_WITHOUT_TIME_ZONE:
+                        fieldValue = fieldValue.replace("T", " ");
+                        TimestampData timestamp = 
TimestampData.fromTimestamp(Timestamp.valueOf(fieldValue));
+                        record.setField(i, timestamp);
+                        break;
                     default:
                         record.setField(i, StringData.fromString(fieldValue));
                 }
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index 217db31ea..ec04cc60f 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.jdbc.table;
 
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.inlong.sort.jdbc.dialect.MySQLDialect;
 import org.apache.inlong.sort.jdbc.dialect.OracleDialect;
 import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
 import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;

Reply via email to