This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2e3a2d27e [FLINK-31435] Introduce MySqlCtasAction to consume CDC
events from MySQL
2e3a2d27e is described below
commit 2e3a2d27e549b31f81514b3219e9ecb653683e50
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 3 16:06:37 2023 +0800
[FLINK-31435] Introduce MySqlCtasAction to consume CDC events from MySQL
This closes #768.
---
.../java/org/apache/paimon/utils/TypeUtils.java | 20 +-
.../org/apache/paimon/data/DataFormatTestUtil.java | 9 +-
.../org/apache/paimon/casting/CastExecutors.java | 32 +-
paimon-flink/paimon-flink-common/pom.xml | 15 +
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 244 ++++++++
.../action/cdc/mysql/MySqlSyncTableAction.java | 430 ++++++++++++++
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 215 +++++++
.../flink/sink/cdc/CdcBucketStreamPartitioner.java | 1 -
.../flink/sink/cdc/CdcParsingProcessFunction.java | 2 -
.../apache/paimon/flink/sink}/cdc/CdcRecord.java | 2 +-
.../apache/paimon/flink/sink}/cdc/EventParser.java | 2 +-
.../apache/paimon/flink/sink/cdc/FlinkCdcSink.java | 1 -
.../paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java | 2 -
.../sink/cdc/SchemaAwareStoreWriteOperator.java | 32 +-
.../sink/cdc/SchemaChangeProcessFunction.java | 15 +-
.../org/apache/paimon/flink/FileStoreITCase.java | 2 +-
.../paimon/flink/action/ActionITCaseBase.java | 2 +-
.../flink/action/cdc/mysql/MySqlContainer.java | 180 ++++++
.../cdc/mysql/MySqlSyncTableActionITCase.java | 633 +++++++++++++++++++++
.../flink/action/cdc/mysql/MySqlVersion.java | 38 +-
.../paimon/flink/sink/cdc/FlinkCdcSinkITCase.java | 1 -
.../cdc/SchemaAwareStoreWriteOperatorTest.java | 47 +-
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 1 -
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 2 -
.../flink/sink/cdc/TestCdcSourceFunction.java | 1 -
.../apache/paimon/flink/util/AbstractTestBase.java | 4 -
.../src/test/resources/mysql/my.cnf | 65 +++
.../src/test/resources/mysql/setup.sql | 156 +++++
28 files changed, 2071 insertions(+), 83 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 71fb9378b..b4ed5bded 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
@@ -64,14 +65,27 @@ public class TypeUtils {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
+ int stringLength = DataTypeChecks.getLength(type);
+ if (s.length() > stringLength) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Length of type %s is %d, but casting
result has a length of %d",
+ type, stringLength, s.length()));
+ }
return str;
case BOOLEAN:
return toBoolean(str);
case BINARY:
case VARBINARY:
- // this implementation does not match the new behavior of
StringToBinaryCastRule,
- // change this if needed
- return s.getBytes();
+ int binaryLength = DataTypeChecks.getLength(type);
+ byte[] bytes = s.getBytes();
+ if (bytes.length > binaryLength) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Length of type %s is %d, but casting
result has a length of %d",
+ type, binaryLength, bytes.length));
+ }
+ return bytes;
case DECIMAL:
DecimalType decimalType = (DecimalType) type;
return Decimal.fromBigDecimal(
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
index 42c4af6dc..11a08583e 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/DataFormatTestUtil.java
@@ -21,6 +21,8 @@ import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;
+import java.util.Arrays;
+
import static org.assertj.core.api.Assertions.assertThat;
/** Utils for testing data formats. */
@@ -38,7 +40,12 @@ public class DataFormatTestUtil {
} else {
InternalRow.FieldGetter fieldGetter =
InternalRow.createFieldGetter(type.getTypeAt(i), i);
- build.append(fieldGetter.getFieldOrNull(row));
+ Object field = fieldGetter.getFieldOrNull(row);
+ if (field instanceof byte[]) {
+ build.append(Arrays.toString((byte[]) field));
+ } else {
+ build.append(field);
+ }
}
}
return build.toString();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
index 22915adcc..a0810f1bb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java
@@ -21,13 +21,10 @@ package org.apache.paimon.casting;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.types.BinaryType;
-import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarBinaryType;
-import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.DecimalUtils;
import org.apache.paimon.utils.StringUtils;
@@ -134,7 +131,7 @@ public class CastExecutors {
case VARCHAR:
if (outputType.getTypeRoot() == CHAR ||
outputType.getTypeRoot() == VARCHAR) {
final boolean targetCharType = outputType.getTypeRoot() ==
CHAR;
- final int targetLength = getStringLength(outputType);
+ final int targetLength =
DataTypeChecks.getLength(outputType);
return value -> {
BinaryString result;
String strVal = value.toString();
@@ -158,7 +155,7 @@ public class CastExecutors {
return result;
};
} else if (outputType.getTypeRoot() == VARBINARY) {
- final int targetLength = getBinaryLength(outputType);
+ final int targetLength =
DataTypeChecks.getLength(outputType);
return value -> {
byte[] byteArrayTerm = ((BinaryString)
value).toBytes();
if (byteArrayTerm.length <= targetLength) {
@@ -170,9 +167,10 @@ public class CastExecutors {
}
return null;
case BINARY:
+ case VARBINARY:
if (outputType.getTypeRoot() == BINARY ||
outputType.getTypeRoot() == VARBINARY) {
boolean targetBinaryType = outputType.getTypeRoot() ==
BINARY;
- final int targetLength = getBinaryLength(outputType);
+ final int targetLength =
DataTypeChecks.getLength(outputType);
return value -> {
byte[] bytes = (byte[]) value;
if (((byte[]) value).length == targetLength) {
@@ -238,24 +236,4 @@ public class CastExecutors {
public static CastExecutor<?, ?> identityCastExecutor() {
return IDENTITY_CAST_EXECUTOR;
}
-
- private static int getStringLength(DataType dataType) {
- if (dataType instanceof CharType) {
- return ((CharType) dataType).getLength();
- } else if (dataType instanceof VarCharType) {
- return ((VarCharType) dataType).getLength();
- }
-
- throw new IllegalArgumentException(String.format("Unsupported type
%s", dataType));
- }
-
- private static int getBinaryLength(DataType dataType) {
- if (dataType instanceof VarBinaryType) {
- return ((VarBinaryType) dataType).getLength();
- } else if (dataType instanceof BinaryType) {
- return ((BinaryType) dataType).getLength();
- }
-
- throw new IllegalArgumentException(String.format("Unsupported type
%s", dataType));
- }
}
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index 7a174dc63..90a4260d7 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -35,6 +35,7 @@ under the License.
<properties>
<flink.version>1.17.0</flink.version>
+ <flink.cdc.version>2.3.0</flink.cdc.version>
<frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>
@@ -67,6 +68,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>${flink.cdc.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>com.ververica</groupId>
<artifactId>frocksdbjni</artifactId>
@@ -241,6 +249,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
new file mode 100644
index 000000000..daf28fe75
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.Preconditions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * {@link EventParser} for MySQL Debezium JSON.
+ *
+ * <p>Some implementation is referenced from <a
+ *
href="https://github.com/apache/doris-flink-connector/blob/master/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java">apache
+ * / doris-flink-connector</a>.
+ */
+public class MySqlDebeziumJsonEventParser implements EventParser<String> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
+ private static final String SCHEMA_CHANGE_REGEX =
+
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP|MODIFY)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s\\(]+))?\\s*(\\((.*?)\\))?.*";
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final Pattern schemaChangePattern =
+ Pattern.compile(SCHEMA_CHANGE_REGEX, Pattern.CASE_INSENSITIVE);
+
+ private final ZoneId serverTimeZone;
+
+ private JsonNode payload;
+ private Map<String, String> mySqlFieldTypes;
+ private Map<String, String> fieldClassNames;
+
+ public MySqlDebeziumJsonEventParser() {
+ this(ZoneId.systemDefault());
+ }
+
+ public MySqlDebeziumJsonEventParser(ZoneId serverTimeZone) {
+ this.serverTimeZone = serverTimeZone;
+ }
+
+ @Override
+ public void setRawEvent(String rawEvent) {
+ try {
+ JsonNode root = objectMapper.readValue(rawEvent, JsonNode.class);
+ JsonNode schema =
+ Preconditions.checkNotNull(
+ root.get("schema"),
+ "MySqlDebeziumJsonEventParser only supports
debezium JSON with schema. "
+ + "Please make sure that `includeSchema`
is true "
+ + "in the
JsonDebeziumDeserializationSchema you created");
+ payload = root.get("payload");
+
+ if (!isSchemaChange()) {
+ updateFieldTypes(schema);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void updateFieldTypes(JsonNode schema) {
+ mySqlFieldTypes = new HashMap<>();
+ fieldClassNames = new HashMap<>();
+ JsonNode arrayNode = schema.get("fields");
+ for (int i = 0; i < arrayNode.size(); i++) {
+ JsonNode elementNode = arrayNode.get(i);
+ String field = elementNode.get("field").asText();
+ if ("before".equals(field) || "after".equals(field)) {
+ JsonNode innerArrayNode = elementNode.get("fields");
+ for (int j = 0; j < innerArrayNode.size(); j++) {
+ JsonNode innerElementNode = innerArrayNode.get(j);
+ String fieldName = innerElementNode.get("field").asText();
+ String fieldType = innerElementNode.get("type").asText();
+ mySqlFieldTypes.put(fieldName, fieldType);
+ if (innerElementNode.get("name") != null) {
+ String className =
innerElementNode.get("name").asText();
+ fieldClassNames.put(fieldName, className);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isSchemaChange() {
+ return payload.get("op") == null;
+ }
+
+ @Override
+ public List<SchemaChange> getSchemaChanges() {
+ JsonNode historyRecord = payload.get("historyRecord");
+ if (historyRecord == null) {
+ return Collections.emptyList();
+ }
+
+ JsonNode ddlNode;
+ try {
+ ddlNode = objectMapper.readTree(historyRecord.asText()).get("ddl");
+ } catch (Exception e) {
+ LOG.debug("Failed to parse history record for schema changes", e);
+ return Collections.emptyList();
+ }
+ if (ddlNode == null) {
+ return Collections.emptyList();
+ }
+ String ddl = ddlNode.asText();
+
+ Matcher matcher = schemaChangePattern.matcher(ddl);
+ if (matcher.find()) {
+ String op = matcher.group(1);
+ String column = matcher.group(3);
+ String type = matcher.group(5);
+ String len = matcher.group(7);
+ if ("add".equalsIgnoreCase(op)) {
+ return Collections.singletonList(
+ SchemaChange.addColumn(column,
MySqlTypeUtils.toDataType(type, len)));
+ } else if ("modify".equalsIgnoreCase(op)) {
+ return Collections.singletonList(
+ SchemaChange.updateColumnType(
+ column, MySqlTypeUtils.toDataType(type, len)));
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<CdcRecord> getRecords() {
+ List<CdcRecord> records = new ArrayList<>();
+
+ Map<String, String> before = extractRow(payload.get("before"));
+ if (before.size() > 0) {
+ records.add(new CdcRecord(RowKind.DELETE, before));
+ }
+
+ Map<String, String> after = extractRow(payload.get("after"));
+ if (after.size() > 0) {
+ records.add(new CdcRecord(RowKind.INSERT, after));
+ }
+
+ return records;
+ }
+
+ private Map<String, String> extractRow(JsonNode recordRow) {
+ Map<String, String> recordMap =
+ objectMapper.convertValue(recordRow, new
TypeReference<Map<String, String>>() {});
+ if (recordMap == null) {
+ return new HashMap<>();
+ }
+
+ for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+ String fieldName = field.getKey();
+ String mySqlType = field.getValue();
+ if (recordMap.containsKey(fieldName)) {
+ String className = fieldClassNames.get(fieldName);
+ String oldValue = recordMap.get(fieldName);
+ String newValue = oldValue;
+
+ if (newValue == null) {
+ continue;
+ }
+
+ if ("bytes".equals(mySqlType) && className == null) {
+ // MySQL binary, varbinary, blob
+ newValue = new
String(Base64.getDecoder().decode(oldValue));
+ } else if ("bytes".equals(mySqlType)
+ &&
"org.apache.kafka.connect.data.Decimal".equals(className)) {
+ // MySQL numeric, fixed, decimal
+ try {
+ new BigDecimal(oldValue);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid big decimal value "
+ + oldValue
+ + ". Make sure that in the
`customConverterConfigs` "
+ + "of the
JsonDebeziumDeserializationSchema you created, set '"
+ +
JsonConverterConfig.DECIMAL_FORMAT_CONFIG
+ + "' to 'numeric'",
+ e);
+ }
+ } else if ("io.debezium.time.Date".equals(className)) {
+ // MySQL date
+ newValue =
LocalDate.ofEpochDay(Integer.parseInt(oldValue)).toString();
+ } else if ("io.debezium.time.Timestamp".equals(className)) {
+ // MySQL datetime
+ newValue =
+ Instant.ofEpochMilli(Long.parseLong(oldValue))
+ .atZone(serverTimeZone)
+ .toLocalDateTime()
+ .toString()
+ .replace('T', ' ');
+ } else if
("io.debezium.time.ZonedTimestamp".equals(className)) {
+ // MySQL timestamp
+ newValue =
+ Instant.parse(oldValue)
+ .atZone(serverTimeZone)
+ .toLocalDateTime()
+ .toString()
+ .replace('T', ' ');
+ }
+
+ recordMap.put(fieldName, newValue);
+ }
+ }
+
+ return recordMap;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
new file mode 100644
index 000000000..9a7710840
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.action.Action;
+import org.apache.paimon.flink.sink.cdc.EventParser;
+import org.apache.paimon.flink.sink.cdc.FlinkCdcSinkBuilder;
+import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
+import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
+import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
+import com.ververica.cdc.connectors.mysql.table.JdbcUrlUtils;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link Action} which synchronize one or multiple MySQL tables into one
Paimon table.
+ *
+ * <p>You should specify MySQL source table in {@code mySqlConfig}. See <a
+ *
href="https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#connector-options">document
+ * of flink-cdc-connectors</a> for detailed keys and values.
+ *
+ * <p>If the specified Paimon table does not exist, this action will
automatically create the table.
+ * Its schema will be derived from all specified MySQL tables. If the Paimon
table already exists,
+ * its schema will be compared against the schema of all specified MySQL
tables.
+ *
+ * <p>This action supports a limited number of schema changes. Unsupported
schema changes will be
+ * ignored. Currently supported schema changes includes:
+ *
+ * <ul>
+ * <li>Adding columns.
+ * <li>Altering column types. More specifically,
+ * <ul>
+ * <li>altering from a string type (char, varchar, text) to another
string type with longer
+ * length,
+ * <li>altering from a binary type (binary, varbinary, blob) to
another binary type with
+ * longer length,
+ * <li>altering from an integer type (tinyint, smallint, int, bigint)
to another integer
+ * type with wider range,
+ * <li>altering from a floating-point type (float, double) to another
floating-point type
+ * with wider range,
+ * </ul>
+ * are supported. Other type changes will cause exceptions.
+ * </ul>
+ */
+public class MySqlSyncTableAction implements Action {
+
+ private final Map<String, String> mySqlConfig;
+ private final String warehouse;
+ private final String database;
+ private final String table;
+ private final List<String> partitionKeys;
+ private final List<String> primaryKeys;
+ private final Map<String, String> paimonConfig;
+
+ MySqlSyncTableAction(
+ Map<String, String> mySqlConfig,
+ String warehouse,
+ String database,
+ String table,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ Map<String, String> paimonConfig) {
+ this.mySqlConfig = mySqlConfig;
+ this.warehouse = warehouse;
+ this.database = database;
+ this.table = table;
+ this.partitionKeys = partitionKeys;
+ this.primaryKeys = primaryKeys;
+ this.paimonConfig = paimonConfig;
+ }
+
+ @Override
+ public void run() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ build(env);
+ env.execute(String.format("MySQL CTAS: %s.%s", database, table));
+ }
+
+ public void build(StreamExecutionEnvironment env) throws Exception {
+ MySqlSource<String> source = buildSource();
+ MySqlSchema mySqlSchema =
+ getMySqlSchemaList().stream()
+ .reduce(MySqlSchema::merge)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "No table satisfies the given
database name and table name"));
+
+ Catalog catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(
+ new Options().set(CatalogOptions.WAREHOUSE,
warehouse)));
+ catalog.createDatabase(database, true);
+
+ Identifier identifier = new Identifier(database, table);
+ FileStoreTable table;
+ try {
+ table = (FileStoreTable) catalog.getTable(identifier);
+ if (!schemaCompatible(table.schema(), mySqlSchema)) {
+ throw new IllegalArgumentException(
+ "Paimon schema and MySQL schema are not compatible.\n"
+ + "Paimon fields are: "
+ + table.schema().fields()
+ + ".\nMySQL fields are: "
+ + mySqlSchema.fields);
+ }
+ } catch (Catalog.TableNotExistException e) {
+ Schema schema = buildSchema(mySqlSchema);
+ catalog.createTable(identifier, schema, false);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ }
+
+ EventParser.Factory<String> parserFactory;
+ String serverTimeZone = mySqlConfig.get("server-time-zone");
+ if (serverTimeZone != null) {
+ parserFactory = () -> new
MySqlDebeziumJsonEventParser(ZoneId.of(serverTimeZone));
+ } else {
+ parserFactory = MySqlDebeziumJsonEventParser::new;
+ }
+
+ FlinkCdcSinkBuilder<String> sinkBuilder =
+ new FlinkCdcSinkBuilder<String>()
+ .withInput(
+ env.fromSource(
+ source,
WatermarkStrategy.noWatermarks(), "MySQL Source"))
+ .withParserFactory(parserFactory)
+ .withTable(table);
+ String sinkParallelism =
paimonConfig.get(FlinkConnectorOptions.SINK_PARALLELISM.key());
+ if (sinkParallelism != null) {
+ sinkBuilder.withParallelism(Integer.parseInt(sinkParallelism));
+ }
+ sinkBuilder.build();
+ }
+
+ private MySqlSource<String> buildSource() {
+ MySqlSourceBuilder<String> sourceBuilder = MySqlSource.builder();
+
+ String databaseName =
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key());
+ String tableName =
mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key());
+ sourceBuilder
+ .hostname(mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()))
+
.port(Integer.parseInt(mySqlConfig.get(MySqlSourceOptions.PORT.key())))
+ .username(mySqlConfig.get(MySqlSourceOptions.USERNAME.key()))
+ .password(mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))
+ .databaseList(databaseName)
+ .tableList(databaseName + "." + tableName);
+
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_ID.key()))
+ .ifPresent(sourceBuilder::serverId);
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE.key()))
+ .ifPresent(sourceBuilder::serverTimeZone);
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key()))
+ .ifPresent(size ->
sourceBuilder.fetchSize(Integer.parseInt(size)));
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_TIMEOUT.key()))
+ .ifPresent(timeout ->
sourceBuilder.connectTimeout(Duration.parse(timeout)));
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECT_MAX_RETRIES.key()))
+ .ifPresent(retries ->
sourceBuilder.connectMaxRetries(Integer.parseInt(retries)));
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.CONNECTION_POOL_SIZE.key()))
+ .ifPresent(size ->
sourceBuilder.connectionPoolSize(Integer.parseInt(size)));
+
Optional.ofNullable(mySqlConfig.get(MySqlSourceOptions.HEARTBEAT_INTERVAL.key()))
+ .ifPresent(interval ->
sourceBuilder.heartbeatInterval(Duration.parse(interval)));
+
+ String startupMode =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_MODE.key());
+ // see
+ //
https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java#L196
+ if ("initial".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.initial());
+ } else if ("earliest-offset".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.earliest());
+ } else if ("latest-offset".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(StartupOptions.latest());
+ } else if ("specific-offset".equalsIgnoreCase(startupMode)) {
+ BinlogOffsetBuilder offsetBuilder = BinlogOffset.builder();
+ String file =
+
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE.key());
+ String pos =
mySqlConfig.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS.key());
+ if (file != null && pos != null) {
+ offsetBuilder.setBinlogFilePosition(file, Long.parseLong(pos));
+ }
+ Optional.ofNullable(
+ mySqlConfig.get(
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET.key()))
+ .ifPresent(offsetBuilder::setGtidSet);
+ Optional.ofNullable(
+ mySqlConfig.get(
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS
+ .key()))
+ .ifPresent(
+ skipEvents ->
offsetBuilder.setSkipEvents(Long.parseLong(skipEvents)));
+ Optional.ofNullable(
+ mySqlConfig.get(
+
MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS
+ .key()))
+ .ifPresent(skipRows ->
offsetBuilder.setSkipRows(Long.parseLong(skipRows)));
+
sourceBuilder.startupOptions(StartupOptions.specificOffset(offsetBuilder.build()));
+ } else if ("timestamp".equalsIgnoreCase(startupMode)) {
+ sourceBuilder.startupOptions(
+ StartupOptions.timestamp(
+ Long.parseLong(
+ mySqlConfig.get(
+
MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS
+ .key()))));
+ }
+
+ Properties jdbcProperties = new Properties();
+ Properties debeziumProperties = new Properties();
+ for (Map.Entry<String, String> entry : mySqlConfig.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(JdbcUrlUtils.PROPERTIES_PREFIX)) {
+
jdbcProperties.put(key.substring(JdbcUrlUtils.PROPERTIES_PREFIX.length()),
value);
+ } else if
(key.startsWith(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX)) {
+ debeziumProperties.put(
+
key.substring(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX.length()), value);
+ }
+ }
+ sourceBuilder.jdbcProperties(jdbcProperties);
+ sourceBuilder.debeziumProperties(debeziumProperties);
+
+ Map<String, Object> customConverterConfigs = new HashMap<>();
+ customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,
"numeric");
+ JsonDebeziumDeserializationSchema schema =
+ new JsonDebeziumDeserializationSchema(true,
customConverterConfigs);
+ return
sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
+ }
+
+ private List<MySqlSchema> getMySqlSchemaList() throws Exception {
+ Pattern databasePattern =
+
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME.key()));
+ Pattern tablePattern =
+
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.TABLE_NAME.key()));
+ List<MySqlSchema> mySqlSchemaList = new ArrayList<>();
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format(
+ "jdbc:mysql://%s:%s/",
+
mySqlConfig.get(MySqlSourceOptions.HOSTNAME.key()),
+
mySqlConfig.get(MySqlSourceOptions.PORT.key())),
+ mySqlConfig.get(MySqlSourceOptions.USERNAME.key()),
+ mySqlConfig.get(MySqlSourceOptions.PASSWORD.key()))) {
+ DatabaseMetaData metaData = conn.getMetaData();
+ try (ResultSet schemas = metaData.getCatalogs()) {
+ while (schemas.next()) {
+ String databaseName = schemas.getString("TABLE_CAT");
+ Matcher databaseMatcher =
databasePattern.matcher(databaseName);
+ if (databaseMatcher.matches()) {
+ try (ResultSet tables =
metaData.getTables(databaseName, null, "%", null)) {
+ while (tables.next()) {
+ String tableName =
tables.getString("TABLE_NAME");
+ Matcher tableMatcher =
tablePattern.matcher(tableName);
+ if (tableMatcher.matches()) {
+ mySqlSchemaList.add(
+ new MySqlSchema(metaData,
databaseName, tableName));
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ return mySqlSchemaList;
+ }
+
+ private boolean schemaCompatible(TableSchema tableSchema, MySqlSchema
mySqlSchema) {
+ for (Map.Entry<String, DataType> entry :
mySqlSchema.fields.entrySet()) {
+ int idx = tableSchema.fieldNames().indexOf(entry.getKey());
+ if (idx < 0) {
+ return false;
+ }
+ DataType type = tableSchema.fields().get(idx).type();
+ if (!SchemaChangeProcessFunction.canConvert(entry.getValue(),
type)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private Schema buildSchema(MySqlSchema mySqlSchema) {
+ Schema.Builder builder = Schema.newBuilder();
+ builder.options(paimonConfig);
+
+ for (Map.Entry<String, DataType> entry :
mySqlSchema.fields.entrySet()) {
+ builder.column(entry.getKey(), entry.getValue());
+ }
+
+ if (primaryKeys.size() > 0) {
+ for (String key : primaryKeys) {
+ if (!mySqlSchema.fields.containsKey(key)) {
+ throw new IllegalArgumentException(
+ "Specified primary key " + key + " does not exist
in MySQL tables");
+ }
+ }
+ builder.primaryKey(primaryKeys);
+ } else if (mySqlSchema.primaryKeys.size() > 0) {
+ builder.primaryKey(mySqlSchema.primaryKeys);
+ } else {
+ throw new IllegalArgumentException(
+ "Primary keys are not specified. "
+ + "Also, can't infer primary keys from MySQL table
schemas because "
+ + "MySQL tables have no primary keys or have
different primary keys.");
+ }
+
+ if (partitionKeys.size() > 0) {
+ builder.partitionKeys(partitionKeys);
+ }
+
+ return builder.build();
+ }
+
+ private static class MySqlSchema {
+
+ private final String databaseName;
+ private final String tableName;
+
+ private final Map<String, DataType> fields;
+ private final List<String> primaryKeys;
+
+ private MySqlSchema(DatabaseMetaData metaData, String databaseName,
String tableName)
+ throws Exception {
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+
+ fields = new LinkedHashMap<>();
+ try (ResultSet rs = metaData.getColumns(null, databaseName,
tableName, null)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ String fieldType = rs.getString("TYPE_NAME");
+ Integer precision = rs.getInt("COLUMN_SIZE");
+ if (rs.wasNull()) {
+ precision = null;
+ }
+ Integer scale = rs.getInt("DECIMAL_DIGITS");
+ if (rs.wasNull()) {
+ scale = null;
+ }
+ fields.put(fieldName, MySqlTypeUtils.toDataType(fieldType,
precision, scale));
+ }
+ }
+
+ primaryKeys = new ArrayList<>();
+ try (ResultSet rs = metaData.getPrimaryKeys(null, databaseName,
tableName)) {
+ while (rs.next()) {
+ String fieldName = rs.getString("COLUMN_NAME");
+ primaryKeys.add(fieldName);
+ }
+ }
+ }
+
+ private MySqlSchema merge(MySqlSchema other) {
+ for (Map.Entry<String, DataType> entry : other.fields.entrySet()) {
+ String fieldName = entry.getKey();
+ DataType newType = entry.getValue();
+ if (fields.containsKey(fieldName)) {
+ DataType oldType = fields.get(fieldName);
+ if (SchemaChangeProcessFunction.canConvert(oldType,
newType)) {
+ fields.put(fieldName, newType);
+ } else if (SchemaChangeProcessFunction.canConvert(newType,
oldType)) {
+ // nothing to do
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Column %s have different types in
table %s.%s and table %s.%s",
+ fieldName,
+ databaseName,
+ tableName,
+ other.databaseName,
+ other.tableName));
+ }
+ } else {
+ fields.put(fieldName, newType);
+ }
+ }
+ if (!primaryKeys.equals(other.primaryKeys)) {
+ primaryKeys.clear();
+ }
+ return this;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
new file mode 100644
index 000000000..d508852af
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts from MySQL type to {@link DataType}.
+ *
+ * <p>Mostly referenced from <a
+ *
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTypeUtils.java">ververica
+ * / flink-cdc-connectors</a>.
+ */
+public class MySqlTypeUtils {
+
+ // ------ MySQL Type ------
+ // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
+ private static final String BIT = "BIT";
+ private static final String TINYINT = "TINYINT";
+ private static final String TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+ private static final String TINYINT_UNSIGNED_ZEROFILL = "TINYINT UNSIGNED
ZEROFILL";
+ private static final String SMALLINT = "SMALLINT";
+ private static final String SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+ private static final String SMALLINT_UNSIGNED_ZEROFILL = "SMALLINT
UNSIGNED ZEROFILL";
+ private static final String MEDIUMINT = "MEDIUMINT";
+ private static final String MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+ private static final String MEDIUMINT_UNSIGNED_ZEROFILL = "MEDIUMINT
UNSIGNED ZEROFILL";
+ private static final String INT = "INT";
+ private static final String INT_UNSIGNED = "INT UNSIGNED";
+ private static final String INT_UNSIGNED_ZEROFILL = "INT UNSIGNED
ZEROFILL";
+ private static final String BIGINT = "BIGINT";
+ private static final String SERIAL = "SERIAL";
+ private static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+ private static final String BIGINT_UNSIGNED_ZEROFILL = "BIGINT UNSIGNED
ZEROFILL";
+ private static final String REAL = "REAL";
+ private static final String REAL_UNSIGNED = "REAL UNSIGNED";
+ private static final String REAL_UNSIGNED_ZEROFILL = "REAL UNSIGNED
ZEROFILL";
+ private static final String FLOAT = "FLOAT";
+ private static final String FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+ private static final String FLOAT_UNSIGNED_ZEROFILL = "FLOAT UNSIGNED
ZEROFILL";
+ private static final String DOUBLE = "DOUBLE";
+ private static final String DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+ private static final String DOUBLE_UNSIGNED_ZEROFILL = "DOUBLE UNSIGNED
ZEROFILL";
+ private static final String DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String DOUBLE_PRECISION_UNSIGNED = "DOUBLE PRECISION
UNSIGNED";
+ private static final String DOUBLE_PRECISION_UNSIGNED_ZEROFILL =
+ "DOUBLE PRECISION UNSIGNED ZEROFILL";
+ private static final String NUMERIC = "NUMERIC";
+ private static final String NUMERIC_UNSIGNED = "NUMERIC UNSIGNED";
+ private static final String NUMERIC_UNSIGNED_ZEROFILL = "NUMERIC UNSIGNED
ZEROFILL";
+ private static final String FIXED = "FIXED";
+ private static final String FIXED_UNSIGNED = "FIXED UNSIGNED";
+ private static final String FIXED_UNSIGNED_ZEROFILL = "FIXED UNSIGNED
ZEROFILL";
+ private static final String DECIMAL = "DECIMAL";
+ private static final String DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+ private static final String DECIMAL_UNSIGNED_ZEROFILL = "DECIMAL UNSIGNED
ZEROFILL";
+ private static final String CHAR = "CHAR";
+ private static final String VARCHAR = "VARCHAR";
+ private static final String TINYTEXT = "TINYTEXT";
+ private static final String MEDIUMTEXT = "MEDIUMTEXT";
+ private static final String TEXT = "TEXT";
+ private static final String LONGTEXT = "LONGTEXT";
+ private static final String DATE = "DATE";
+ private static final String TIME = "TIME";
+ private static final String DATETIME = "DATETIME";
+ private static final String TIMESTAMP = "TIMESTAMP";
+ private static final String YEAR = "YEAR";
+ private static final String BINARY = "BINARY";
+ private static final String VARBINARY = "VARBINARY";
+ private static final String TINYBLOB = "TINYBLOB";
+ private static final String MEDIUMBLOB = "MEDIUMBLOB";
+ private static final String BLOB = "BLOB";
+ private static final String LONGBLOB = "LONGBLOB";
+ private static final String JSON = "JSON";
+ private static final String SET = "SET";
+ private static final String ENUM = "ENUM";
+ private static final String GEOMETRY = "GEOMETRY";
+ private static final String UNKNOWN = "UNKNOWN";
+
+ // This length is from JDBC.
+ // It returns the number of characters when converting this timestamp to
string.
+ // The base length of a timestamp is 19, for example "2023-03-23 17:20:00".
+ private static final int JDBC_TIMESTAMP_BASE_LENGTH = 19;
+
+ public static DataType toDataType(String type, String params) {
+ List<Integer> paramList = new ArrayList<>();
+ if (params != null) {
+ for (String s : params.split(",")) {
+ paramList.add(Integer.parseInt(s.trim()));
+ }
+ }
+ return toDataType(
+ type,
+ paramList.size() > 0 ? paramList.get(0) : null,
+ paramList.size() > 1 ? paramList.get(1) : null);
+ }
+
+ public static DataType toDataType(
+ String type, @Nullable Integer length, @Nullable Integer scale) {
+ switch (type.toUpperCase()) {
+ case BIT:
+ return DataTypes.BOOLEAN();
+ case TINYINT:
+ // MySQL haven't boolean type, it uses tinyint(1) to
represents boolean type
+ // user should not use tinyint(1) to store number although
jdbc url parameter
+ // tinyInt1isBit=false can help change the return value, it's
not a general way
+ // btw: mybatis and mysql-connector-java map tinyint(1) to
boolean by default
+ return length != null && length == 1 ? DataTypes.BOOLEAN() :
DataTypes.TINYINT();
+ case TINYINT_UNSIGNED:
+ case TINYINT_UNSIGNED_ZEROFILL:
+ case SMALLINT:
+ return DataTypes.SMALLINT();
+ case SMALLINT_UNSIGNED:
+ case SMALLINT_UNSIGNED_ZEROFILL:
+ case INT:
+ case MEDIUMINT:
+ return DataTypes.INT();
+ case INT_UNSIGNED:
+ case INT_UNSIGNED_ZEROFILL:
+ case MEDIUMINT_UNSIGNED:
+ case MEDIUMINT_UNSIGNED_ZEROFILL:
+ case BIGINT:
+ return DataTypes.BIGINT();
+ case BIGINT_UNSIGNED:
+ case BIGINT_UNSIGNED_ZEROFILL:
+ case SERIAL:
+ return DataTypes.DECIMAL(20, 0);
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ case FLOAT_UNSIGNED_ZEROFILL:
+ return DataTypes.FLOAT();
+ case REAL:
+ case REAL_UNSIGNED:
+ case REAL_UNSIGNED_ZEROFILL:
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ case DOUBLE_UNSIGNED_ZEROFILL:
+ case DOUBLE_PRECISION:
+ case DOUBLE_PRECISION_UNSIGNED:
+ case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
+ return DataTypes.DOUBLE();
+ case NUMERIC:
+ case NUMERIC_UNSIGNED:
+ case NUMERIC_UNSIGNED_ZEROFILL:
+ case FIXED:
+ case FIXED_UNSIGNED:
+ case FIXED_UNSIGNED_ZEROFILL:
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ case DECIMAL_UNSIGNED_ZEROFILL:
+ return length != null && length <= 38
+ ? DataTypes.DECIMAL(length, scale != null && scale >=
0 ? scale : 0)
+ : DataTypes.STRING();
+ case DATE:
+ return DataTypes.DATE();
+ case DATETIME:
+ case TIMESTAMP:
+ if (length == null) {
+ return DataTypes.TIMESTAMP();
+ } else if (length >= JDBC_TIMESTAMP_BASE_LENGTH) {
+ if (length > JDBC_TIMESTAMP_BASE_LENGTH + 1) {
+ // Timestamp with a fraction of seconds.
+ // For example "2023-03-23 17:20:00.01".
+ // The decimal point will occupy 1 character.
+ return DataTypes.TIMESTAMP(length -
JDBC_TIMESTAMP_BASE_LENGTH - 1);
+ } else {
+ return DataTypes.TIMESTAMP(0);
+ }
+ } else if (length >= 0 && length <=
TimestampType.MAX_PRECISION) {
+ return DataTypes.TIMESTAMP(length);
+ } else {
+ return DataTypes.TIMESTAMP();
+ }
+ case CHAR:
+ return DataTypes.CHAR(Preconditions.checkNotNull(length));
+ case VARCHAR:
+ return DataTypes.VARCHAR(Preconditions.checkNotNull(length));
+ case TEXT:
+ return DataTypes.STRING();
+ case BINARY:
+ return DataTypes.BINARY(Preconditions.checkNotNull(length));
+ case VARBINARY:
+ return DataTypes.VARBINARY(Preconditions.checkNotNull(length));
+ case BLOB:
+ return DataTypes.BYTES();
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Don't support MySQL type '%s' yet.",
type));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
index 3b4699c84..69b64740f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcBucketStreamPartitioner.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index 223b709ec..357c9996d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
import org.apache.paimon.schema.SchemaChange;
import org.apache.flink.api.common.typeinfo.TypeInformation;
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
similarity index 97%
rename from paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 4e80002af..e6692c072 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.types.RowKind;
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
similarity index 97%
copy from paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index 46c744699..9e575a338 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.cdc;
+package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.schema.SchemaChange;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 7981fec6e..24ddd9e56 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
index 43e940b3b..acee7b911 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkBuilder.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.operation.Lock;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
index 6988f2782..c637d2f73 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperator.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.AbstractStoreWriteOperator;
import org.apache.paimon.flink.sink.LogSinkFunction;
@@ -104,23 +103,38 @@ public class SchemaAwareStoreWriteOperator extends
AbstractStoreWriteOperator<Cd
private Map<String, Object> tryConvert(Map<String, String> fields) {
Map<String, Object> converted = new HashMap<>();
TableSchema schema = table.schema();
+
for (Map.Entry<String, String> field : fields.entrySet()) {
String key = field.getKey();
String value = field.getValue();
+
int idx = schema.fieldNames().indexOf(key);
if (idx < 0) {
+ LOG.info("Field " + key + " not found. Waiting for schema
update.");
return null;
}
- DataType type = schema.fields().get(idx).type();
- // TODO TypeUtils.castFromString cannot deal with complex types
like arrays and maps.
- // Change type of CdcRecord#field if needed.
- try {
- converted.put(key, TypeUtils.castFromString(value, type));
- } catch (Exception e) {
- LOG.debug("Failed to convert value " + value + " to type " +
type, e);
- return null;
+
+ if (value == null) {
+ converted.put(key, null);
+ } else {
+ DataType type = schema.fields().get(idx).type();
+ // TODO TypeUtils.castFromString cannot deal with complex
types like arrays and
+ // maps. Change type of CdcRecord#field if needed.
+ try {
+ converted.put(key, TypeUtils.castFromString(value, type));
+ } catch (Exception e) {
+ LOG.info(
+ "Failed to convert value "
+ + value
+ + " to type "
+ + type
+ + ". Waiting for schema update.",
+ e);
+ return null;
+ }
}
}
+
return converted;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
index 85a9f080e..d8ad51d38 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
@@ -22,6 +22,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.Preconditions;
@@ -86,7 +87,7 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
- if (checkTypeConversion(oldType, newType)) {
+ if (canConvert(oldType, newType)) {
schemaManager.commitChanges(schemaChange);
} else {
throw new UnsupportedOperationException(
@@ -109,6 +110,8 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
+ private static final List<DataTypeRoot> BINARY_TYPES =
+ Arrays.asList(DataTypeRoot.BINARY, DataTypeRoot.VARBINARY);
private static final List<DataTypeRoot> INTEGER_TYPES =
Arrays.asList(
DataTypeRoot.TINYINT,
@@ -118,11 +121,17 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
- private boolean checkTypeConversion(DataType oldType, DataType newType) {
+ public static boolean canConvert(DataType oldType, DataType newType) {
int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
- return true;
+ return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType);
+ }
+
+ oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
+ newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
+ if (oldIdx >= 0 && newIdx >= 0) {
+ return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType);
}
oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 8b5fba73a..1e17967e0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -92,7 +92,7 @@ public class FileStoreITCase extends AbstractTestBase {
new RowType(
Arrays.asList(
new RowType.RowField("v", new IntType()),
- new RowType.RowField("p", new VarCharType()),
+ new RowType.RowField("p", new VarCharType(10)),
// rename key
new RowType.RowField("_k", new IntType())));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index ee1d010a3..74b8bc374 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -47,7 +47,7 @@ import java.util.Map;
import java.util.UUID;
/** {@link Action} test base. */
-public class ActionITCaseBase extends AbstractTestBase {
+public abstract class ActionITCaseBase extends AbstractTestBase {
protected String warehouse;
protected String database;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
new file mode 100644
index 000000000..023f1d4e7
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlContainer.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Docker container for MySQL. The difference between this class and {@link
+ * org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has
problems when
+ * overriding mysql conf file, i.e. my.cnf.
+ *
+ * <p>Copied from <a
+ *
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlContainer.java">ververica
+ * / flink-cdc-connectors</a>.
+ */
+@SuppressWarnings("rawtypes")
+public class MySqlContainer extends JdbcDatabaseContainer {
+
+ public static final String IMAGE = "mysql";
+ public static final Integer MYSQL_PORT = 3306;
+
+ private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+ private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+ private static final String MYSQL_ROOT_USER = "root";
+
+ private String databaseName = "test";
+ private String username = "test";
+ private String password = "test";
+
+ public MySqlContainer(MySqlVersion version) {
+ super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
+ addExposedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected Set<Integer> getLivenessCheckPorts() {
+ return new HashSet<>(getMappedPort(MYSQL_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ optionallyMapResourceParameterAsVolume(
+ MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/",
"mysql-default-conf");
+
+ if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+ optionallyMapResourceParameterAsVolume(
+ SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/",
"N/A");
+ }
+
+ addEnv("MYSQL_DATABASE", databaseName);
+ addEnv("MYSQL_USER", username);
+ if (password != null && !password.isEmpty()) {
+ addEnv("MYSQL_PASSWORD", password);
+ addEnv("MYSQL_ROOT_PASSWORD", password);
+ } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+ addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+ } else {
+ throw new ContainerLaunchException(
+ "Empty password can be used only with the root user");
+ }
+ setStartupAttempts(3);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl(databaseName);
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected String constructUrlForConnection(String queryString) {
+ String url = super.constructUrlForConnection(queryString);
+
+ if (!url.contains("useSSL=")) {
+ String separator = url.contains("?") ? "&" : "?";
+ url = url + separator + "useSSL=false";
+ }
+
+ if (!url.contains("allowPublicKeyRetrieval=")) {
+ url = url + "&allowPublicKeyRetrieval=true";
+ }
+
+ return url;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public String getUsername() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withConfigurationOverride(String s) {
+ parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public MySqlContainer withSetupSQL(String sqlPath) {
+ parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withDatabaseName(final String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withUsername(final String username) {
+ this.username = username;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withPassword(final String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
new file mode 100644
index 000000000..9769a1672
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** IT cases for {@link MySqlSyncTableAction}. */
+public class MySqlSyncTableActionITCase extends ActionITCaseBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlSyncTableActionITCase.class);
+
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V5_7);
+ private static final String USER = "paimonuser";
+ private static final String PASSWORD = "paimonpw";
+ private static final String DATABASE_NAME = "paimon_test";
+
+ @BeforeAll
+ public static void startContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @AfterAll
+ public static void stopContainers() {
+ LOG.info("Stopping containers...");
+ MYSQL_CONTAINER.stop();
+ LOG.info("Containers are stopped.");
+ }
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return (MySqlContainer)
+ new MySqlContainer(version)
+ .withConfigurationOverride("mysql/my.cnf")
+ .withSetupSQL("mysql/setup.sql")
+ .withUsername(USER)
+ .withPassword(PASSWORD)
+ .withDatabaseName(DATABASE_NAME)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolution() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "schema_evolution_\\d+");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ Map<String, String> paimonConfig = new HashMap<>();
+ paimonConfig.put("bucket", String.valueOf(random.nextInt(3) + 1));
+ paimonConfig.put("sink.parallelism", String.valueOf(random.nextInt(3)
+ 1));
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "_id"),
+ paimonConfig);
+ action.build(env);
+ JobClient client = env.executeAsync();
+
+ while (true) {
+ JobStatus status = client.getJobStatus().get();
+ if (status == JobStatus.RUNNING) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ String.format(
+ "jdbc:mysql://%s:%s/",
+ MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort()),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testSchemaEvolutionImpl(statement);
+ }
+ }
+ }
+
+ private void testSchemaEvolutionImpl(Statement statement) throws Exception
{
+ FileStoreTable table = getFileStoreTable();
+ statement.executeUpdate("USE paimon_test");
+
+ statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1,
'one')");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4,
'four')");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"pt", "_id", "v1"});
+ List<String> primaryKeys = Arrays.asList("pt", "_id");
+ List<String> expected = Arrays.asList("+I[1, 1, one]", "+I[1, 2,
two]", "+I[2, 4, four]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2
INT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30),
(1, 5, 'five', 50)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2
INT");
+ statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6,
'six', 60)");
+ statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second'
WHERE _id = 2");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.INT()
+ },
+ new String[] {"pt", "_id", "v1", "v2"});
+ expected =
+ Arrays.asList(
+ "+I[1, 1, one, NULL]",
+ "+I[1, 2, second, NULL]",
+ "+I[2, 3, three, 30]",
+ "+I[2, 4, four, NULL]",
+ "+I[1, 5, five, 50]",
+ "+I[1, 6, six, 60]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v2 BIGINT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven',
70000000000)");
+ statement.executeUpdate("UPDATE schema_evolution_1 SET v2 =
30000000000 WHERE _id = 3");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v2 BIGINT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight',
80000000000)");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.BIGINT()
+ },
+ new String[] {"pt", "_id", "v1", "v2"});
+ expected =
+ Arrays.asList(
+ "+I[1, 1, one, NULL]",
+ "+I[1, 2, second, NULL]",
+ "+I[2, 3, three, 30000000000]",
+ "+I[2, 4, four, NULL]",
+ "+I[1, 5, five, 50]",
+ "+I[1, 6, six, 60]",
+ "+I[2, 7, seven, 70000000000]",
+ "+I[2, 8, eight, 80000000000]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3
NUMERIC(8, 3)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4
VARBINARY(10)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5
FLOAT");
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v1 VARCHAR(20)");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine',
90000000000, 99999.999, 'nine.bin', 9.9)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3
NUMERIC(8, 3)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4
VARBINARY(10)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5
FLOAT");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v1 VARCHAR(20)");
+ statement.executeUpdate(
+ "UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE
_id = 8");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(20),
+ DataTypes.BIGINT(),
+ DataTypes.DECIMAL(8, 3),
+ DataTypes.VARBINARY(10),
+ DataTypes.FLOAT()
+ },
+ new String[] {"pt", "_id", "v1", "v2", "v3", "v4",
"v5"});
+ expected =
+ Arrays.asList(
+ "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+ "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+ "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+ "+I[2, 4, four, NULL, NULL, NULL, NULL]",
+ "+I[1, 5, five, 50, NULL, NULL, NULL]",
+ "+I[1, 6, six, 60, NULL, NULL, NULL]",
+ "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+ "+I[2, 8, very long string, 80000000000, NULL, NULL,
NULL]",
+ "+I[1, 9, nine, 90000000000, 99999.999, [110, 105,
110, 101, 46, 98, 105, 110], 9.9]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v4 VARBINARY(20)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v5 DOUBLE");
+ statement.executeUpdate(
+ "UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 =
9.00000000009 WHERE _id = 9");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v4 VARBINARY(20)");
+ statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v5 DOUBLE");
+ statement.executeUpdate(
+ "UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 =
4.00000000004 WHERE _id = 4");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(20),
+ DataTypes.BIGINT(),
+ DataTypes.DECIMAL(8, 3),
+ DataTypes.VARBINARY(20),
+ DataTypes.DOUBLE()
+ },
+ new String[] {"pt", "_id", "v1", "v2", "v3", "v4",
"v5"});
+ expected =
+ Arrays.asList(
+ "+I[1, 1, one, NULL, NULL, NULL, NULL]",
+ "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+ "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
+ "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46,
98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
+ "+I[1, 5, five, 50, NULL, NULL, NULL]",
+ "+I[1, 6, six, 60, NULL, NULL, NULL]",
+ "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
+ "+I[2, 8, very long string, 80000000000, NULL, NULL,
NULL]",
+ "+I[1, 9, nine, 90000000000, 99999.999, [110, 105,
110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testAllTypes() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "all_types_table");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>());
+ action.build(env);
+ env.executeAsync();
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), // _id
+ DataTypes.BOOLEAN(), // _boolean
+ DataTypes.TINYINT(), // _tinyint
+ DataTypes.SMALLINT(), // _tinyint_unsigned
+ DataTypes.SMALLINT(), // _tinyint_unsigned_zerofill
+ DataTypes.SMALLINT(), // _smallint
+ DataTypes.INT(), // _smallint_unsigned
+ DataTypes.INT(), // _smallint_unsigned_zerofill
+ DataTypes.INT(), // _mediumint
+ DataTypes.BIGINT(), // _mediumint_unsigned
+ DataTypes.BIGINT(), // _mediumint_unsigned_zerofill
+ DataTypes.INT(), // _int
+ DataTypes.BIGINT(), // _int_unsigned
+ DataTypes.BIGINT(), // _int_unsigned_zerofill
+ DataTypes.BIGINT(), // _bigint
+ DataTypes.DECIMAL(20, 0), // _bigint_unsigned
+ DataTypes.DECIMAL(20, 0), //
_bigint_unsigned_zerofill
+ DataTypes.DECIMAL(20, 0), // _serial
+ DataTypes.FLOAT(), // _float
+ DataTypes.FLOAT(), // _float_unsigned
+ DataTypes.FLOAT(), // _float_unsigned_zerofill
+ DataTypes.DOUBLE(), // _real
+ DataTypes.DOUBLE(), // _real_unsigned
+ DataTypes.DOUBLE(), // _real_unsigned_zerofill
+ DataTypes.DOUBLE(), // _double
+ DataTypes.DOUBLE(), // _double_unsigned
+ DataTypes.DOUBLE(), // _double_unsigned_zerofill
+ DataTypes.DOUBLE(), // _double_precision
+ DataTypes.DOUBLE(), // _double_precision_unsigned
+ DataTypes.DOUBLE(), //
_double_precision_unsigned_zerofill
+ DataTypes.DECIMAL(8, 3), // _numeric
+ DataTypes.DECIMAL(8, 3), // _numeric_unsigned
+ DataTypes.DECIMAL(8, 3), //
_numeric_unsigned_zerofill
+ DataTypes.STRING(), // _fixed
+ DataTypes.STRING(), // _fixed_unsigned
+ DataTypes.STRING(), // _fixed_unsigned_zerofill
+ DataTypes.DECIMAL(8, 0), // _decimal
+ DataTypes.DECIMAL(8, 0), // _decimal_unsigned
+ DataTypes.DECIMAL(8, 0), //
_decimal_unsigned_zerofill
+ DataTypes.DATE(), // _date
+ DataTypes.TIMESTAMP(0), // _datetime
+ DataTypes.TIMESTAMP(6), // _timestamp
+ DataTypes.CHAR(10), // _char
+ DataTypes.VARCHAR(20), // _varchar
+ DataTypes.STRING(), // _text
+ DataTypes.BINARY(10), // _bin
+ DataTypes.VARBINARY(20), // _varbin
+ DataTypes.BYTES() // _blob
+ },
+ new String[] {
+ "_id",
+ "_boolean",
+ "_tinyint",
+ "_tinyint_unsigned",
+ "_tinyint_unsigned_zerofill",
+ "_smallint",
+ "_smallint_unsigned",
+ "_smallint_unsigned_zerofill",
+ "_mediumint",
+ "_mediumint_unsigned",
+ "_mediumint_unsigned_zerofill",
+ "_int",
+ "_int_unsigned",
+ "_int_unsigned_zerofill",
+ "_bigint",
+ "_bigint_unsigned",
+ "_bigint_unsigned_zerofill",
+ "_serial",
+ "_float",
+ "_float_unsigned",
+ "_float_unsigned_zerofill",
+ "_real",
+ "_real_unsigned",
+ "_real_unsigned_zerofill",
+ "_double",
+ "_double_unsigned",
+ "_double_unsigned_zerofill",
+ "_double_precision",
+ "_double_precision_unsigned",
+ "_double_precision_unsigned_zerofill",
+ "_numeric",
+ "_numeric_unsigned",
+ "_numeric_unsigned_zerofill",
+ "_fixed",
+ "_fixed_unsigned",
+ "_fixed_unsigned_zerofill",
+ "_decimal",
+ "_decimal_unsigned",
+ "_decimal_unsigned_zerofill",
+ "_date",
+ "_datetime",
+ "_timestamp",
+ "_char",
+ "_varchar",
+ "_text",
+ "_bin",
+ "_varbin",
+ "_blob"
+ });
+ FileStoreTable table = getFileStoreTable();
+ List<String> expected =
+ Arrays.asList(
+ "+I["
+ + "1, true, 1, 2, 3, "
+ + "1000, 2000, 3000, "
+ + "100000, 200000, 300000, "
+ + "1000000, 2000000, 3000000, "
+ + "10000000000, 20000000000, 30000000000,
40000000000, "
+ + "1.5, 2.5, 3.5, "
+ + "1.000001, 2.000002, 3.000003, "
+ + "1.000011, 2.000022, 3.000033, "
+ + "1.000111, 2.000222, 3.000333, "
+ + "12345.110, 12345.220, 12345.330, "
+ + "1.2345678987654322E32,
1.2345678987654322E32, 1.2345678987654322E32, "
+ + "11111, 22222, 33333, "
+ + "19439, 2023-03-23T14:30:05,
2023-03-23T15:00:10.123456, "
+ + "Paimon, Apache Paimon, Apache Paimon MySQL
Test Data, "
+ + "[98, 121, 116, 101, 115, 0, 0, 0, 0, 0], "
+ + "[109, 111, 114, 101, 32, 98, 121, 116, 101,
115], "
+ + "[118, 101, 114, 121, 32, 108, 111, 110,
103, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97]"
+ + "]",
+ "+I["
+ + "2, NULL, NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, 50000000000, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL, "
+ + "NULL, NULL, NULL"
+ + "]");
+ waitForResult(expected, table, rowType,
Collections.singletonList("_id"));
+ }
+
+ @Test
+ public void testIncompatibleMySqlTable() {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "incompatible_field_\\d+");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("_id"),
+ new HashMap<>());
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> action.build(env),
+ "Expecting IllegalArgumentException");
+ assertThat(e)
+ .hasMessage(
+ "Column v1 have different types in table "
+ + DATABASE_NAME
+ + ".incompatible_field_1 and table "
+ + DATABASE_NAME
+ + ".incompatible_field_2");
+ }
+
+ @Test
+ public void testIncompatiblePaimonTable() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "incompatible_pk_\\d+");
+
+ createFileStoreTable(
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.DOUBLE()},
+ new String[] {"a", "b", "c"}),
+ Collections.emptyList(),
+ Collections.singletonList("a"),
+ new HashMap<>());
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("a"),
+ new HashMap<>());
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> action.build(env),
+ "Expecting IllegalArgumentException");
+ assertThat(e).hasMessageContaining("Paimon schema and MySQL schema are
not compatible.");
+ }
+
+ @Test
+ public void testInvalidPrimaryKey() {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "schema_evolution_\\d+");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("pk"),
+ new HashMap<>());
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> action.build(env),
+ "Expecting IllegalArgumentException");
+ assertThat(e).hasMessage("Specified primary key pk does not exist in
MySQL tables");
+ }
+
+ @Test
+ public void testNoPrimaryKey() {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "incompatible_pk_\\d+");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new HashMap<>());
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> action.build(env),
+ "Expecting IllegalArgumentException");
+ assertThat(e)
+ .hasMessage(
+ "Primary keys are not specified. "
+ + "Also, can't infer primary keys from MySQL
table schemas because "
+ + "MySQL tables have no primary keys or have
different primary keys.");
+ }
+
+ private void waitForResult(
+ List<String> expected, FileStoreTable table, RowType rowType,
List<String> primaryKeys)
+ throws Exception {
+ assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
+
+ // wait for table schema to become our expected schema
+ while (true) {
+ int cnt = 0;
+ for (int i = 0; i < table.schema().fields().size(); i++) {
+ DataField field = table.schema().fields().get(i);
+ boolean sameName =
field.name().equals(rowType.getFieldNames().get(i));
+ boolean sameType =
field.type().equals(rowType.getFieldTypes().get(i));
+ if (sameName && sameType) {
+ cnt++;
+ }
+ }
+ if (cnt == rowType.getFieldCount()) {
+ break;
+ }
+ table = table.copyWithLatestSchema();
+ Thread.sleep(1000);
+ }
+
+ // wait for data to become expected
+ List<String> sortedExpected = new ArrayList<>(expected);
+ Collections.sort(sortedExpected);
+ while (true) {
+ TableScan.Plan plan = table.newScan().plan();
+ List<String> result =
+ getResult(
+ table.newRead(),
+ plan == null ? Collections.emptyList() :
plan.splits(),
+ rowType);
+ List<String> sortedActual = new ArrayList<>(result);
+ Collections.sort(sortedActual);
+ if (sortedExpected.equals(sortedActual)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+
+ private Map<String, String> getBasicMySqlConfig() {
+ Map<String, String> config = new HashMap<>();
+ config.put("hostname", MYSQL_CONTAINER.getHost());
+ config.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ config.put("username", USER);
+ config.put("password", PASSWORD);
+ config.put("server-time-zone", ZoneId.of("+00:00").toString());
+ return config;
+ }
+
+ private FileStoreTable getFileStoreTable() throws Exception {
+ Catalog catalog =
CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Identifier identifier = Identifier.create(database, tableName);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
similarity index 54%
rename from paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
index 46c744699..087f48677 100644
--- a/paimon-core/src/main/java/org/apache/paimon/cdc/EventParser.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlVersion.java
@@ -16,31 +16,33 @@
* limitations under the License.
*/
-package org.apache.paimon.cdc;
-
-import org.apache.paimon.schema.SchemaChange;
-
-import java.io.Serializable;
-import java.util.List;
+package org.apache.paimon.flink.action.cdc.mysql;
/**
- * Parse a CDC change event to a list of {@link SchemaChange} or {@link
CdcRecord}.
+ * MySql version enum.
*
- * @param <T> CDC change event type
+ * <p>Copied from <a
+ *
href="https://github.com/ververica/flink-cdc-connectors/blob/master/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/testutils/MySqlVersion.java">ververica
+ * / flink-cdc-connectors</a>.
*/
-public interface EventParser<T> {
-
- void setRawEvent(T rawEvent);
+public enum MySqlVersion {
+ V5_5("5.5"),
+ V5_6("5.6"),
+ V5_7("5.7"),
+ V8_0("8.0");
- boolean isSchemaChange();
+ private String version;
- List<SchemaChange> getSchemaChanges();
-
- List<CdcRecord> getRecords();
+ MySqlVersion(String version) {
+ this.version = version;
+ }
- /** Factory to create an {@link EventParser}. */
- interface Factory<T> extends Serializable {
+ public String getVersion() {
+ return version;
+ }
- EventParser<T> create();
+ @Override
+ public String toString() {
+ return "MySqlVersion{" + "version='" + version + '\'' + '}';
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
index b5645404f..2abeb87e4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSinkITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
index 50cc9a95c..38ec56338 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/SchemaAwareStoreWriteOperatorTest.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
@@ -150,8 +149,14 @@ public class SchemaAwareStoreWriteOperatorTest {
public void testUpdateColumnType() throws Exception {
RowType rowType =
RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.FLOAT()},
- new String[] {"k", "v1", "v2"});
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.FLOAT(),
+ DataTypes.VARCHAR(5),
+ DataTypes.VARBINARY(5)
+ },
+ new String[] {"k", "v1", "v2", "v3", "v4"});
FileStoreTable table =
createFileStoreTable(
@@ -170,6 +175,8 @@ public class SchemaAwareStoreWriteOperatorTest {
fields.put("k", "1");
fields.put("v1", "10");
fields.put("v2", "0.625");
+ fields.put("v3", "one");
+ fields.put("v4", "b_one");
CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
runner.offer(expected);
CdcRecord actual = runner.take();
@@ -177,6 +184,8 @@ public class SchemaAwareStoreWriteOperatorTest {
// check that records with new fields should be processed after schema
is updated
+ // int -> bigint
+
fields = new HashMap<>();
fields.put("k", "2");
fields.put("v1", "12345678987654321");
@@ -191,6 +200,8 @@ public class SchemaAwareStoreWriteOperatorTest {
actual = runner.take();
assertThat(actual).isEqualTo(expected);
+ // float -> double
+
fields = new HashMap<>();
fields.put("k", "3");
fields.put("v1", "100");
@@ -204,6 +215,36 @@ public class SchemaAwareStoreWriteOperatorTest {
actual = runner.take();
assertThat(actual).isEqualTo(expected);
+ // varchar(5) -> varchar(10)
+
+ fields = new HashMap<>();
+ fields.put("k", "4");
+ fields.put("v1", "40");
+ fields.put("v3", "long four");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ runner.offer(expected);
+ actual = runner.poll(1);
+ assertThat(actual).isNull();
+
+ schemaManager.commitChanges(SchemaChange.updateColumnType("v3",
DataTypes.VARCHAR(10)));
+ actual = runner.take();
+ assertThat(actual).isEqualTo(expected);
+
+ // varbinary(5) -> varbinary(10)
+
+ fields = new HashMap<>();
+ fields.put("k", "5");
+ fields.put("v1", "50");
+ fields.put("v4", "long five~");
+ expected = new CdcRecord(RowKind.INSERT, fields);
+ runner.offer(expected);
+ actual = runner.poll(1);
+ assertThat(actual).isNull();
+
+ schemaManager.commitChanges(SchemaChange.updateColumnType("v4",
DataTypes.VARBINARY(10)));
+ actual = runner.take();
+ assertThat(actual).isEqualTo(expected);
+
runner.stop();
t.join();
harness.close();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index f65e3002a..aedcc0362 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.schema.SchemaChange;
import java.io.Serializable;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 75d11afa1..bb544d24c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
-import org.apache.paimon.cdc.EventParser;
import org.apache.paimon.schema.SchemaChange;
import java.util.Collections;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
index fc952af49..889c8216d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcSourceFunction.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.cdc.CdcRecord;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableFunction;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
index 410b1a7fe..8e63c8b7e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/AbstractTestBase.java
@@ -28,8 +28,6 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -40,8 +38,6 @@ import java.util.UUID;
@ExtendWith({TestLoggerExtension.class})
public class AbstractTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractTestBase.class);
-
private static final int DEFAULT_PARALLELISM = 8;
@RegisterExtension
diff --git a/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf
new file mode 100644
index 000000000..9c9a8747b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/my.cnf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# If this option is off
+# MySQL will set current timestamp to any timestamp field with NULL value
+# which is bad for our tests
+explicit_defaults_for_timestamp = ON
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..3cc6c9e51
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -0,0 +1,156 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'paimonuser' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'paimonuser'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+USE paimon_test;
+
+CREATE TABLE schema_evolution_1 (
+ pt INT,
+ _id INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE schema_evolution_2 (
+ pt INT,
+ _id INT,
+ v1 VARCHAR(10),
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE all_types_table (
+ _id INT,
+ _boolean TINYINT(1),
+ _tinyint TINYINT,
+ _tinyint_unsigned TINYINT(2) UNSIGNED,
+ _tinyint_unsigned_zerofill TINYINT(2) UNSIGNED ZEROFILL,
+ _smallint SMALLINT,
+ _smallint_unsigned SMALLINT UNSIGNED,
+ _smallint_unsigned_zerofill SMALLINT(4) UNSIGNED ZEROFILL,
+ _mediumint MEDIUMINT,
+ _mediumint_unsigned MEDIUMINT UNSIGNED,
+ _mediumint_unsigned_zerofill MEDIUMINT(8) UNSIGNED ZEROFILL,
+ _int INT,
+ _int_unsigned INT UNSIGNED,
+ _int_unsigned_zerofill INT(8) UNSIGNED ZEROFILL,
+ _bigint BIGINT,
+ _bigint_unsigned BIGINT UNSIGNED,
+ _bigint_unsigned_zerofill BIGINT(16) UNSIGNED ZEROFILL,
+ _serial SERIAL,
+ _float FLOAT,
+ _float_unsigned FLOAT UNSIGNED,
+ _float_unsigned_zerofill FLOAT(4) UNSIGNED ZEROFILL,
+ _real REAL,
+ _real_unsigned REAL UNSIGNED,
+ _real_unsigned_zerofill REAL(10, 7) UNSIGNED ZEROFILL,
+ _double DOUBLE,
+ _double_unsigned DOUBLE UNSIGNED,
+ _double_unsigned_zerofill DOUBLE(10, 7) UNSIGNED ZEROFILL,
+ _double_precision DOUBLE PRECISION,
+ _double_precision_unsigned DOUBLE PRECISION UNSIGNED,
+ _double_precision_unsigned_zerofill DOUBLE PRECISION(10, 7) UNSIGNED
ZEROFILL,
+ _numeric NUMERIC(8, 3),
+ _numeric_unsigned NUMERIC(8, 3) UNSIGNED,
+ _numeric_unsigned_zerofill NUMERIC(8, 3) UNSIGNED ZEROFILL,
+ _fixed FIXED(40, 3),
+ _fixed_unsigned FIXED(40, 3) UNSIGNED,
+ _fixed_unsigned_zerofill FIXED(40, 3) UNSIGNED ZEROFILL,
+ _decimal DECIMAL(8),
+ _decimal_unsigned DECIMAL(8) UNSIGNED,
+ _decimal_unsigned_zerofill DECIMAL(8) UNSIGNED ZEROFILL,
+ _date DATE,
+ _datetime DATETIME,
+ _timestamp TIMESTAMP(6) DEFAULT NULL,
+ _char CHAR(10),
+ _varchar VARCHAR(20),
+ _text TEXT,
+ _bin BINARY(10),
+ _varbin VARBINARY(20),
+ _blob BLOB,
+ PRIMARY KEY (_id)
+);
+
+INSERT INTO all_types_table VALUES (
+ 1,
+ true, 1, 2, 3,
+ 1000, 2000, 3000,
+ 100000, 200000, 300000,
+ 1000000, 2000000, 3000000,
+ 10000000000, 20000000000, 30000000000, 40000000000,
+ 1.5, 2.5, 3.5,
+ 1.000001, 2.000002, 3.000003,
+ 1.000011, 2.000022, 3.000033,
+ 1.000111, 2.000222, 3.000333,
+ 12345.11, 12345.22, 12345.33,
+ 123456789876543212345678987654321.11,
123456789876543212345678987654321.22, 123456789876543212345678987654321.33,
+ 11111, 22222, 33333,
+ '2023-03-23', '2023-03-23 14:30:05', '2023-03-23 15:00:10.123456',
+ 'Paimon', 'Apache Paimon', 'Apache Paimon MySQL Test Data',
+ 'bytes', 'more bytes', 'very long bytes test data'
+), (
+ 2,
+ NULL, NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL, 50000000000, -- SERIAL is never NULL
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL,
+ NULL, NULL, NULL
+);
+
+CREATE TABLE incompatible_field_1 (
+ _id INT,
+ v1 DATETIME,
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE incompatible_field_2 (
+ _id INT,
+ v1 INT,
+ PRIMARY KEY (_id)
+);
+
+CREATE TABLE incompatible_pk_1 (
+ a INT,
+ b BIGINT,
+ c VARCHAR(20),
+ PRIMARY KEY (a, b)
+);
+
+CREATE TABLE incompatible_pk_2 (
+ a INT,
+ b BIGINT,
+ c VARCHAR(20),
+ PRIMARY KEY (a)
+);