This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d2204b3d3 [INLONG-7076][Sort] Add multi table sink for MySQL (#7079)
d2204b3d3 is described below
commit d2204b3d3d6b061acdbfb1e1560a007100f3b7e2
Author: chyueyi <[email protected]>
AuthorDate: Fri Dec 30 10:47:46 2022 +0800
[INLONG-7076][Sort] Add multi table sink for MySQL (#7079)
---
.../jdbc/converter/AbstractJdbcRowConverter.java | 21 ++-
.../jdbc/converter/mysql/MySQLRowConverter.java | 39 +++++
.../inlong/sort/jdbc/dialect/MySQLDialect.java | 164 +++++++++++++++++++++
.../internal/JdbcMultiBatchingOutputFormat.java | 52 ++++++-
.../inlong/sort/jdbc/table/JdbcDialects.java | 2 +-
5 files changed, 270 insertions(+), 8 deletions(-)
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
index 52e2eb29f..9715a4636 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/AbstractJdbcRowConverter.java
@@ -25,11 +25,12 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.utils.TypeConversions;
import java.io.Serializable;
@@ -151,6 +152,7 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay()
/ 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> val instanceof LocalDateTime
? TimestampData.fromLocalDateTime((LocalDateTime) val)
: TimestampData.fromTimestamp((Timestamp) val);
@@ -179,14 +181,17 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
protected JdbcSerializationConverter wrapIntoNullableExternalConverter(
JdbcSerializationConverter jdbcSerializationConverter, LogicalType
type) {
- final int sqlType =
- JdbcTypeUtil.typeInformationToSqlType(
- TypeConversions.fromDataTypeToLegacyInfo(
- TypeConversions.fromLogicalToDataType(type)));
return (val, index, statement) -> {
if (val == null
|| val.isNullAt(index)
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
+ // typeInformationToSqlType don't support
TIMESTAMP_WITH_LOCAL_TIME_ZONE 2014
+ int sqlType = 2014;
+ if (type.getTypeRoot() !=
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+ sqlType = JdbcTypeUtil.typeInformationToSqlType(
+ TypeConversions.fromDataTypeToLegacyInfo(
+
TypeConversions.fromLogicalToDataType(type)));
+ }
statement.setNull(index, sqlType);
} else {
jdbcSerializationConverter.serialize(val, index, statement);
@@ -232,6 +237,10 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
final int timestampPrecision = ((TimestampType)
type).getPrecision();
return (val, index, statement) -> statement.setTimestamp(
index, val.getTimestamp(index,
timestampPrecision).toTimestamp());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int localTimestampPrecision = ((LocalZonedTimestampType)
type).getPrecision();
+ return (val, index, statement) -> statement.setTimestamp(
+ index, val.getTimestamp(index,
localTimestampPrecision).toTimestamp());
case DECIMAL:
final int decimalPrecision = ((DecimalType)
type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
new file mode 100644
index 000000000..6fb9a4b12
--- /dev/null
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/converter/mysql/MySQLRowConverter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.converter.mysql;
+
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.AbstractJdbcRowConverter;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink
internal object for MySQL.
+ */
+public class MySQLRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ public MySQLRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public String converterName() {
+ return "MySQL";
+ }
+
+}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
new file mode 100644
index 000000000..0c3e77083
--- /dev/null
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.inlong.sort.jdbc.converter.mysql.MySQLRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.internal.JdbcMultiBatchingComm;
+import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * JDBC dialect for MySQL.
+ * copy from flink-jdbc
+ * modify setQueryPrimaryKeySql
+ */
+public class MySQLDialect extends AbstractJdbcDialect {
+
+ private static final long serialVersionUID = 1L;
+ private static final String QUERY_PRIMARY_KEY_SQL =
+ "select group_concat(COLUMN_NAME SEPARATOR ',') AS pkColumn,
TABLE_NAME AS tableName\n" +
+ "from information_schema.COLUMNS\n" +
+ "where TABLE_SCHEMA = ?\n" +
+ "and TABLE_NAME = ?\n" +
+ "and COLUMN_KEY = 'PRI'\n" +
+ "AND length(COLUMN_NAME) > 0";
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
+ // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
+ private static final int MAX_TIMESTAMP_PRECISION = 6;
+ private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+ // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
+ // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
+ private static final int MAX_DECIMAL_PRECISION = 65;
+ private static final int MIN_DECIMAL_PRECISION = 1;
+
+ @Override
+ public boolean canHandle(String url) {
+ return url.startsWith("jdbc:mysql:");
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new MySQLRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional<String> defaultDriverName() {
+ return Optional.of("com.mysql.jdbc.Driver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return "`" + identifier + "`";
+ }
+
+ /**
+ * Mysql upsert query use DUPLICATE KEY UPDATE.
+ *
+ * <p>NOTE: It requires Mysql's primary key to be consistent with pkFields.
+ *
+ * <p>We don't use REPLACE INTO, if there are other fields, we can keep
their previous values.
+ */
+ @Override
+ public Optional<String> getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ String updateClause =
+ Arrays.stream(fieldNames)
+ .map(f -> quoteIdentifier(f) + "=VALUES(" +
quoteIdentifier(f) + ")")
+ .collect(Collectors.joining(", "));
+ return Optional.of(
+ getInsertIntoStatement(tableName, fieldNames)
+ + " ON DUPLICATE KEY UPDATE "
+ + updateClause);
+ }
+
+ @Override
+ public String dialectName() {
+ return "MySQL";
+ }
+
+ @Override
+ public int maxDecimalPrecision() {
+ return MAX_DECIMAL_PRECISION;
+ }
+
+ @Override
+ public int minDecimalPrecision() {
+ return MIN_DECIMAL_PRECISION;
+ }
+
+ @Override
+ public int maxTimestampPrecision() {
+ return MAX_TIMESTAMP_PRECISION;
+ }
+
+ @Override
+ public int minTimestampPrecision() {
+ return MIN_TIMESTAMP_PRECISION;
+ }
+
+ @Override
+ public List<LogicalTypeRoot> unsupportedTypes() {
+ // The data types used in Mysql are list at:
+ // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
+
+ // TODO: We can't convert BINARY data type to
+ // PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+ // LegacyTypeInfoDataTypeConverter.
+ return Arrays.asList(
+ LogicalTypeRoot.BINARY,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+ LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+ LogicalTypeRoot.INTERVAL_DAY_TIME,
+ LogicalTypeRoot.ARRAY,
+ LogicalTypeRoot.MULTISET,
+ LogicalTypeRoot.MAP,
+ LogicalTypeRoot.ROW,
+ LogicalTypeRoot.DISTINCT_TYPE,
+ LogicalTypeRoot.STRUCTURED_TYPE,
+ LogicalTypeRoot.NULL,
+ LogicalTypeRoot.RAW,
+ LogicalTypeRoot.SYMBOL,
+ LogicalTypeRoot.UNRESOLVED);
+ }
+
+ @Override
+ public PreparedStatement setQueryPrimaryKeySql(Connection conn, String
tableIdentifier) throws SQLException {
+ PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL);
+ st.setString(1,
JdbcMultiBatchingComm.getDatabaseNameFromIdentifier(tableIdentifier));
+ st.setString(2,
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
+ return st;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index bb1c40e26..1176ca6f4 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -58,6 +58,16 @@ import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -116,6 +126,17 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
private Long rowSize = 0L;
private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+ private static final DateTimeFormatter
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+ private static final DateTimeFormatter SQL_TIME_FORMAT;
+
+ static {
+ SQL_TIME_FORMAT = (new
DateTimeFormatterBuilder()).appendPattern("HH:mm:ss")
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9,
true).toFormatter();
+ SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
+ (new
DateTimeFormatterBuilder()).append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral('T')
+
.append(SQL_TIME_FORMAT).appendPattern("'Z'").toFormatter();
+ }
+
public JdbcMultiBatchingOutputFormat(
@Nonnull JdbcConnectionProvider connectionProvider,
@Nonnull JdbcExecutionOptions executionOptions,
@@ -399,7 +420,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
record.setField(i, Double.valueOf(fieldValue));
break;
case TIME_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITHOUT_TIME_ZONE:
case INTERVAL_DAY_TIME:
TimestampData timestampData =
TimestampData.fromEpochMillis(Long.valueOf(fieldValue));
record.setField(i, timestampData);
@@ -407,6 +427,36 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
case BINARY:
record.setField(i,
Arrays.toString(fieldValue.getBytes(StandardCharsets.UTF_8)));
break;
+
+ // support mysql
+ case INTEGER:
+ record.setField(i, Integer.valueOf(fieldValue));
+ break;
+ case SMALLINT:
+ record.setField(i, Short.valueOf(fieldValue));
+ break;
+ case TINYINT:
+ record.setField(i, Byte.valueOf(fieldValue));
+ break;
+ case FLOAT:
+ record.setField(i, Float.valueOf(fieldValue));
+ break;
+ case DATE:
+ record.setField(i, (int)
LocalDate.parse(fieldValue).toEpochDay());
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ TemporalAccessor parsedTimestampWithLocalZone =
+
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(fieldValue);
+ LocalTime localTime =
parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+ LocalDate localDate =
parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+ record.setField(i,
TimestampData.fromInstant(LocalDateTime.of(localDate, localTime)
+ .toInstant(ZoneOffset.UTC)));
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ fieldValue = fieldValue.replace("T", " ");
+ TimestampData timestamp =
TimestampData.fromTimestamp(Timestamp.valueOf(fieldValue));
+ record.setField(i, timestamp);
+ break;
default:
record.setField(i, StringData.fromString(fieldValue));
}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
index 217db31ea..ec04cc60f 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDialects.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.jdbc.table;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.MySQLDialect;
+import org.apache.inlong.sort.jdbc.dialect.MySQLDialect;
import org.apache.inlong.sort.jdbc.dialect.OracleDialect;
import org.apache.inlong.sort.jdbc.dialect.SqlServerDialect;
import org.apache.inlong.sort.jdbc.dialect.TDSQLPostgresDialect;