This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new de458dcf9 [flink] MySql CDC can now deal with multiple changes in one
ALTER TABLE statement (#912)
de458dcf9 is described below
commit de458dcf99a31b98ace66fc0b95c6ea75f0fed39
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 17 19:20:26 2023 +0800
[flink] MySql CDC can now deal with multiple changes in one ALTER TABLE
statement (#912)
---
.../flink/action/cdc/mysql/MySqlActionUtils.java | 5 +-
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 71 +++++++------
.../paimon/flink/action/cdc/mysql/MySqlSchema.java | 28 ++---
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 16 ---
.../cdc/CdcMultiTableParsingProcessFunction.java | 29 +++---
.../flink/sink/cdc/CdcParsingProcessFunction.java | 21 ++--
.../apache/paimon/flink/sink/cdc/EventParser.java | 9 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 6 +-
.../sink/cdc/FlinkCdcSyncTableSinkBuilder.java | 4 +-
....java => UpdatedDataFieldsProcessFunction.java} | 116 +++++++++++++++++----
.../cdc/mysql/MySqlSyncTableActionITCase.java | 93 +++++++++++++++++
.../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java | 2 +-
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 18 ++--
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 12 +--
.../apache/paimon/flink/sink/cdc/TestTable.java | 30 ++++--
.../src/test/resources/mysql/setup.sql | 9 +-
16 files changed, 324 insertions(+), 145 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 11aca6b06..bbbaaeb26 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
-import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.DataType;
@@ -72,7 +72,8 @@ class MySqlActionUtils {
return false;
}
DataType type = tableSchema.fields().get(idx).type();
- if (!SchemaChangeProcessFunction.canConvert(entry.getValue(),
type)) {
+ if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue(),
type)
+ != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
return false;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 4855eda4d..83f98b0e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -20,10 +20,11 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.schema.SchemaChange;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
@@ -38,12 +39,10 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.Optional;
/**
* {@link EventParser} for MySQL Debezium JSON.
@@ -55,13 +54,8 @@ import java.util.regex.Pattern;
public class MySqlDebeziumJsonEventParser implements EventParser<String> {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
- private static final String SCHEMA_CHANGE_REGEX =
-
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP|MODIFY)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s\\(]+))?\\s*(\\((.*?)\\))?.*";
private final ObjectMapper objectMapper = new ObjectMapper();
- private final Pattern schemaChangePattern =
- Pattern.compile(SCHEMA_CHANGE_REGEX, Pattern.CASE_INSENSITIVE);
-
private final ZoneId serverTimeZone;
private JsonNode payload;
@@ -88,7 +82,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
+ "in the
JsonDebeziumDeserializationSchema you created");
payload = root.get("payload");
- if (!isSchemaChange()) {
+ if (!isUpdatedDataFields()) {
updateFieldTypes(schema);
}
} catch (Exception e) {
@@ -125,45 +119,54 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
@Override
- public boolean isSchemaChange() {
+ public boolean isUpdatedDataFields() {
return payload.get("op") == null;
}
@Override
- public List<SchemaChange> getSchemaChanges() {
+ public Optional<List<DataField>> getUpdatedDataFields() {
JsonNode historyRecord = payload.get("historyRecord");
if (historyRecord == null) {
- return Collections.emptyList();
+ return Optional.empty();
}
- JsonNode ddlNode;
+ JsonNode columns;
try {
- ddlNode = objectMapper.readTree(historyRecord.asText()).get("ddl");
+ String historyRecordString = historyRecord.asText();
+ JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
+ if (tableChanges.size() != 1) {
+ throw new IllegalArgumentException(
+ "Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
+ + historyRecordString);
+ }
+ columns = tableChanges.get(0).get("table").get("columns");
} catch (Exception e) {
- LOG.debug("Failed to parse history record for schema changes", e);
- return Collections.emptyList();
+ LOG.info("Failed to parse history record for schema changes", e);
+ return Optional.empty();
}
- if (ddlNode == null) {
- return Collections.emptyList();
+ if (columns == null) {
+ return Optional.empty();
}
- String ddl = ddlNode.asText();
- Matcher matcher = schemaChangePattern.matcher(ddl);
- if (matcher.find()) {
- String op = matcher.group(1);
- String column = matcher.group(3);
- String type = matcher.group(5);
- String len = matcher.group(7);
- if ("add".equalsIgnoreCase(op)) {
- return Collections.singletonList(
- SchemaChange.addColumn(column,
MySqlTypeUtils.toDataType(type, len)));
- } else if ("modify".equalsIgnoreCase(op)) {
- return Collections.singletonList(
- SchemaChange.updateColumnType(
- column, MySqlTypeUtils.toDataType(type, len)));
+ List<DataField> result = new ArrayList<>();
+ for (int i = 0; i < columns.size(); i++) {
+ JsonNode column = columns.get(i);
+ JsonNode length = column.get("length");
+ JsonNode scale = column.get("scale");
+ DataType type =
+ MySqlTypeUtils.toDataType(
+ column.get("typeName").asText(),
+ length == null ? null : length.asInt(),
+ scale == null ? null : scale.asInt());
+ if (column.get("optional").asBoolean()) {
+ type = type.nullable();
+ } else {
+ type = type.notNull();
}
+
+ result.add(new DataField(i, column.get("name").asText(), type));
}
- return Collections.emptyList();
+ return Optional.of(result);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index fc06a1efb..a9b5b6a44 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action.cdc.mysql;
-import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.types.DataType;
import java.sql.DatabaseMetaData;
@@ -86,19 +86,19 @@ public class MySqlSchema {
DataType newType = entry.getValue();
if (fields.containsKey(fieldName)) {
DataType oldType = fields.get(fieldName);
- if (SchemaChangeProcessFunction.canConvert(oldType, newType)) {
- fields.put(fieldName, newType);
- } else if (SchemaChangeProcessFunction.canConvert(newType,
oldType)) {
- // nothing to do
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Column %s have different types in table
%s.%s and table %s.%s",
- fieldName,
- databaseName,
- tableName,
- other.databaseName,
- other.tableName));
+ switch (UpdatedDataFieldsProcessFunction.canConvert(oldType,
newType)) {
+ case CONVERT:
+ fields.put(fieldName, newType);
+ break;
+ case EXCEPTION:
+ throw new IllegalArgumentException(
+ String.format(
+ "Column %s have different types in
table %s.%s and table %s.%s",
+ fieldName,
+ databaseName,
+ tableName,
+ other.databaseName,
+ other.tableName));
}
} else {
fields.put(fieldName, newType);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index d508852af..1c1f813db 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -25,9 +25,6 @@ import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Converts from MySQL type to {@link DataType}.
*
@@ -106,19 +103,6 @@ public class MySqlTypeUtils {
// The base length of a timestamp is 19, for example "2023-03-23 17:20:00".
private static final int JDBC_TIMESTAMP_BASE_LENGTH = 19;
- public static DataType toDataType(String type, String params) {
- List<Integer> paramList = new ArrayList<>();
- if (params != null) {
- for (String s : params.split(",")) {
- paramList.add(Integer.parseInt(s.trim()));
- }
- }
- return toDataType(
- type,
- paramList.size() > 0 ? paramList.get(0) : null,
- paramList.size() > 1 ? paramList.get(1) : null);
- }
-
public static DataType toDataType(
String type, @Nullable Integer length, @Nullable Integer scale) {
switch (type.toUpperCase()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 36e61b443..815513ac2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -18,20 +18,22 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
- * A {@link ProcessFunction} to parse CDC change event to either {@link
SchemaChange} or {@link
- * CdcRecord} and send them to different side outputs according to table name.
+ * A {@link ProcessFunction} to parse CDC change event to either a list of
{@link DataField}s or
+ * {@link CdcRecord} and send them to different side outputs according to
table name.
*
* <p>This {@link ProcessFunction} can handle records for different tables at
the same time.
*
@@ -42,7 +44,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
private final EventParser.Factory<T> parserFactory;
private transient EventParser<T> parser;
- private transient Map<String, OutputTag<SchemaChange>>
schemaChangeOutputTags;
+ private transient Map<String, OutputTag<List<DataField>>>
updatedDataFieldsOutputTags;
private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
public CdcMultiTableParsingProcessFunction(EventParser.Factory<T>
parserFactory) {
@@ -52,7 +54,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
@Override
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
- schemaChangeOutputTags = new HashMap<>();
+ updatedDataFieldsOutputTags = new HashMap<>();
recordOutputTags = new HashMap<>();
}
@@ -61,10 +63,9 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
parser.setRawEvent(raw);
String tableName = parser.tableName();
- if (parser.isSchemaChange()) {
- for (SchemaChange schemaChange : parser.getSchemaChanges()) {
- context.output(getSchemaChangeOutputTag(tableName),
schemaChange);
- }
+ if (parser.isUpdatedDataFields()) {
+ parser.getUpdatedDataFields()
+ .ifPresent(t ->
context.output(getUpdatedDataFieldsOutputTag(tableName), t));
} else {
for (CdcRecord record : parser.getRecords()) {
context.output(getRecordOutputTag(tableName), record);
@@ -72,14 +73,14 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
}
}
- private OutputTag<SchemaChange> getSchemaChangeOutputTag(String tableName)
{
- return schemaChangeOutputTags.computeIfAbsent(
- tableName,
CdcMultiTableParsingProcessFunction::createSchemaChangeOutputTag);
+ private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String
tableName) {
+ return updatedDataFieldsOutputTags.computeIfAbsent(
+ tableName,
CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
}
- public static OutputTag<SchemaChange> createSchemaChangeOutputTag(String
tableName) {
+ public static OutputTag<List<DataField>>
createUpdatedDataFieldsOutputTag(String tableName) {
return new OutputTag<>(
- "schema-change-" + tableName,
TypeInformation.of(SchemaChange.class));
+ "new-data-field-list-" + tableName, new
ListTypeInfo<>(DataField.class));
}
private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index ba3f97cad..b40c160dc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,17 +18,19 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
+import java.util.List;
+
/**
- * A {@link ProcessFunction} to parse CDC change event to either {@link
SchemaChange} or {@link
- * CdcRecord} and send them to different downstreams.
+ * A {@link ProcessFunction} to parse CDC change event to either a list of
{@link DataField}s or
+ * {@link CdcRecord} and send them to different downstreams.
*
* <p>This {@link ProcessFunction} can only handle records for a single
constant table. To handle
* records for different tables, see {@link
CdcMultiTableParsingProcessFunction}.
@@ -37,8 +39,8 @@ import org.apache.flink.util.OutputTag;
*/
public class CdcParsingProcessFunction<T> extends ProcessFunction<T,
CdcRecord> {
- public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
- new OutputTag<>("schema-change",
TypeInformation.of(SchemaChange.class));
+ public static final OutputTag<List<DataField>>
NEW_DATA_FIELD_LIST_OUTPUT_TAG =
+ new OutputTag<>("new-data-field-list", new
ListTypeInfo<>(DataField.class));
private final EventParser.Factory<T> parserFactory;
@@ -57,10 +59,9 @@ public class CdcParsingProcessFunction<T> extends
ProcessFunction<T, CdcRecord>
public void processElement(T raw, Context context, Collector<CdcRecord>
collector)
throws Exception {
parser.setRawEvent(raw);
- if (parser.isSchemaChange()) {
- for (SchemaChange schemaChange : parser.getSchemaChanges()) {
- context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
- }
+ if (parser.isUpdatedDataFields()) {
+ parser.getUpdatedDataFields()
+ .ifPresent(t ->
context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, t));
} else {
for (CdcRecord record : parser.getRecords()) {
collector.collect(record);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index e4f192d1b..161eb5d1d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -18,13 +18,14 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
import java.io.Serializable;
import java.util.List;
+import java.util.Optional;
/**
- * Parse a CDC change event to a list of {@link SchemaChange} or {@link
CdcRecord}.
+ * Parse a CDC change event to a list of {@link DataField}s or {@link
CdcRecord}.
*
* @param <T> CDC change event type
*/
@@ -34,9 +35,9 @@ public interface EventParser<T> {
String tableName();
- boolean isSchemaChange();
+ boolean isUpdatedDataFields();
- List<SchemaChange> getSchemaChanges();
+ Optional<List<DataField>> getUpdatedDataFields();
List<CdcRecord> getRecords();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index e48f929bc..b62992d08 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -94,10 +94,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
parsed,
-
CdcMultiTableParsingProcessFunction.createSchemaChangeOutputTag(
- table.name()))
+ CdcMultiTableParsingProcessFunction
+
.createUpdatedDataFieldsOutputTag(table.name()))
.process(
- new SchemaChangeProcessFunction(
+ new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(),
table.location())));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
index 7c770b045..6847f5405 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
@@ -84,9 +84,9 @@ public class FlinkCdcSyncTableSinkBuilder<T> {
DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
- parsed,
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+ parsed,
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
.process(
- new SchemaChangeProcessFunction(
+ new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(),
table.location())));
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
similarity index 55%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 4c94444f0..cce1ee7e1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -22,9 +22,11 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -32,29 +34,77 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
/**
- * A {@link ProcessFunction} to handle {@link SchemaChange}.
+ * A {@link ProcessFunction} to handle schema changes. New schema is
represented by a list of {@link
+ * DataField}s.
*
* <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link
ProcessFunction} must
* be 1.
*/
-public class SchemaChangeProcessFunction extends ProcessFunction<SchemaChange,
Void> {
+public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataField>, Void> {
- private static final Logger LOG =
LoggerFactory.getLogger(SchemaChangeProcessFunction.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(UpdatedDataFieldsProcessFunction.class);
private final SchemaManager schemaManager;
- public SchemaChangeProcessFunction(SchemaManager schemaManager) {
+ public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
@Override
public void processElement(
- SchemaChange schemaChange, Context context, Collector<Void>
collector)
+ List<DataField> updatedDataFields, Context context,
Collector<Void> collector)
throws Exception {
+ for (SchemaChange schemaChange :
extractSchemaChanges(updatedDataFields)) {
+ applySchemaChange(schemaChange);
+ }
+ }
+
+ private List<SchemaChange> extractSchemaChanges(List<DataField>
updatedDataFields) {
+ RowType oldRowType = schemaManager.latest().get().logicalRowType();
+ Map<String, DataField> oldFields = new HashMap<>();
+ for (DataField oldField : oldRowType.getFields()) {
+ oldFields.put(oldField.name(), oldField);
+ }
+
+ List<SchemaChange> result = new ArrayList<>();
+ for (DataField newField : updatedDataFields) {
+ if (oldFields.containsKey(newField.name())) {
+ DataField oldField = oldFields.get(newField.name());
+ // we compare by ignoring nullable, because partition keys and
primary keys might be
+ // nullable in source database, but they can't be null in
Paimon
+ if (oldField.type().equalsIgnoreNullable(newField.type())) {
+ if (!Objects.equals(oldField.description(),
newField.description())) {
+ result.add(
+ SchemaChange.updateColumnComment(
+ new String[] {newField.name()},
newField.description()));
+ }
+ } else {
+ result.add(SchemaChange.updateColumnType(newField.name(),
newField.type()));
+ if (newField.description() != null) {
+ result.add(
+ SchemaChange.updateColumnComment(
+ new String[] {newField.name()},
newField.description()));
+ }
+ }
+ } else {
+ result.add(
+ SchemaChange.addColumn(
+ newField.name(), newField.type(),
newField.description(), null));
+ }
+ }
+ return result;
+ }
+
+ private void applySchemaChange(SchemaChange schemaChange) throws Exception
{
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
schemaManager.commitChanges(schemaChange);
@@ -88,18 +138,18 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
- if (canConvert(oldType, newType)) {
- schemaManager.commitChanges(schemaChange);
- } else {
- throw new UnsupportedOperationException(
- String.format(
- "Cannot convert field %s from type %s to %s",
- updateColumnType.fieldName(), oldType,
newType));
+ switch (canConvert(oldType, newType)) {
+ case CONVERT:
+ schemaManager.commitChanges(schemaChange);
+ break;
+ case EXCEPTION:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot convert field %s from type %s to
%s",
+ updateColumnType.fieldName(), oldType,
newType));
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
schemaManager.commitChanges(schemaChange);
- } else if (schemaChange instanceof
SchemaChange.UpdateColumnNullability) {
- schemaManager.commitChanges(schemaChange);
} else {
throw new UnsupportedOperationException(
"Unsupported schema change class "
@@ -122,31 +172,57 @@ public class SchemaChangeProcessFunction extends
ProcessFunction<SchemaChange, V
private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
- public static boolean canConvert(DataType oldType, DataType newType) {
+ public static ConvertAction canConvert(DataType oldType, DataType newType)
{
int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
- return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType);
+ return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType)
+ ? ConvertAction.CONVERT
+ : ConvertAction.IGNORE;
}
oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
- return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType);
+ return DataTypeChecks.getLength(oldType) <=
DataTypeChecks.getLength(newType)
+ ? ConvertAction.CONVERT
+ : ConvertAction.IGNORE;
}
oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
- return oldIdx <= newIdx;
+ return oldIdx <= newIdx ? ConvertAction.CONVERT :
ConvertAction.IGNORE;
}
oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
- return oldIdx <= newIdx;
+ return oldIdx <= newIdx ? ConvertAction.CONVERT :
ConvertAction.IGNORE;
}
- return false;
+ return ConvertAction.EXCEPTION;
+ }
+
+ /**
+ * Return type of {@link
UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
+ * enum indicates the action to perform.
+ */
+ public enum ConvertAction {
+
+ /** {@code oldType} can be converted to {@code newType}. */
+ CONVERT,
+
+ /**
+ * {@code oldType} and {@code newType} belongs to the same type
family, but old type has
+ * higher precision than new type. Ignore this convert request.
+ */
+ IGNORE,
+
+ /**
+ * {@code oldType} and {@code newType} belongs to different type
family. Throw an exception
+ * indicating that this convert request cannot be handled.
+ */
+ EXCEPTION
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index ed0e9fba7..44f6a5f83 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -243,6 +243,99 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, primaryKeys);
}
+ @Test
+ @Timeout(60)
+ public void testMultipleSchemaEvolutions() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "schema_evolution_multiple");
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ MySqlSyncTableAction action =
+ new MySqlSyncTableAction(
+ mySqlConfig,
+ warehouse,
+ database,
+ tableName,
+ Collections.emptyList(),
+ Collections.singletonList("_id"),
+ new HashMap<>());
+ action.build(env);
+ JobClient client = env.executeAsync();
+
+ while (true) {
+ JobStatus status = client.getJobStatus().get();
+ if (status == JobStatus.RUNNING) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ try (Connection conn =
+ DriverManager.getConnection(
+ MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+ MYSQL_CONTAINER.getUsername(),
+ MYSQL_CONTAINER.getPassword())) {
+ try (Statement statement = conn.createStatement()) {
+ testSchemaEvolutionMultipleImpl(statement);
+ }
+ }
+ }
+
+ private void testSchemaEvolutionMultipleImpl(Statement statement) throws
Exception {
+ FileStoreTable table = getFileStoreTable();
+ statement.executeUpdate("USE paimon_sync_table");
+
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10,
'string_1')");
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.INT(),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"_id", "v1", "v2", "v3"});
+ List<String> primaryKeys = Collections.singletonList("_id");
+ List<String> expected = Collections.singletonList("+I[1, one, 10,
string_1]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate(
+ "ALTER TABLE schema_evolution_multiple "
+ + "ADD v4 INT, "
+ + "MODIFY COLUMN v1 VARCHAR(20), "
+ // I'd love to change COMMENT to DEFAULT
+ // however debezium parser seems to have a bug here
+ + "ADD COLUMN (v5 DOUBLE, v6 DECIMAL(5, 3), `$% ^,&
*(` VARCHAR(10) COMMENT 'Hi, v700 DOUBLE \\', v701 INT a test'), "
+ + "MODIFY v2 BIGINT");
+ statement.executeUpdate(
+ "INSERT INTO schema_evolution_multiple VALUES "
+ + "(2, 'long_string_two', 2000000000000, 'string_2',
20, 20.5, 20.002, 'test_2')");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(20),
+ DataTypes.BIGINT(),
+ DataTypes.VARCHAR(10),
+ DataTypes.INT(),
+ DataTypes.DOUBLE(),
+ DataTypes.DECIMAL(5, 3),
+ DataTypes.VARCHAR(10)
+ },
+ new String[] {"_id", "v1", "v2", "v3", "v4", "v5",
"v6", "$% ^,& *("});
+ expected =
+ Arrays.asList(
+ "+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]",
+ "+I[2, long_string_two, 2000000000000, string_2, 20,
20.5, 20.002, test_2]");
+ waitForResult(expected, table, rowType, primaryKeys);
+ }
+
@Test
@Timeout(30)
public void testAllTypes() throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 5eab8d1e5..6d8e93daf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -69,7 +69,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends
AbstractTestBase {
int numTables = random.nextInt(3) + 1;
boolean enableFailure = random.nextBoolean();
- int maxEvents = 1500;
+ int maxEvents = 1000;
int maxSchemaChanges = 10;
int maxPartitions = 3;
int maxKeys = 150;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index f7a933951..01d766edf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
import java.io.Serializable;
import java.util.List;
@@ -29,20 +29,20 @@ public class TestCdcEvent implements Serializable {
private static final long serialVersionUID = 1L;
private final String tableName;
- private final SchemaChange schemaChange;
+ private final List<DataField> updatedDataFields;
private final List<CdcRecord> records;
private final int keyHash;
- public TestCdcEvent(String tableName, SchemaChange schemaChange) {
+ public TestCdcEvent(String tableName, List<DataField> updatedDataFields) {
this.tableName = tableName;
- this.schemaChange = schemaChange;
+ this.updatedDataFields = updatedDataFields;
this.records = null;
this.keyHash = 0;
}
public TestCdcEvent(String tableName, List<CdcRecord> records, int
keyHash) {
this.tableName = tableName;
- this.schemaChange = null;
+ this.updatedDataFields = null;
this.records = records;
this.keyHash = keyHash;
}
@@ -51,8 +51,8 @@ public class TestCdcEvent implements Serializable {
return tableName;
}
- public SchemaChange schemaChange() {
- return schemaChange;
+ public List<DataField> updatedDataFields() {
+ return updatedDataFields;
}
public List<CdcRecord> records() {
@@ -67,7 +67,7 @@ public class TestCdcEvent implements Serializable {
@Override
public String toString() {
return String.format(
- "{tableName = %s, schemChange = %s, records = %s}",
- tableName, schemaChange, records);
+ "{tableName = %s, updatedDataFields = %s, records = %s}",
+ tableName, updatedDataFields, records);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 9772d3292..ab03ea1f5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,10 +18,10 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
-import java.util.Collections;
import java.util.List;
+import java.util.Optional;
/** Testing {@link EventParser} for {@link TestCdcEvent}. */
public class TestCdcEventParser implements EventParser<TestCdcEvent> {
@@ -39,13 +39,13 @@ public class TestCdcEventParser implements
EventParser<TestCdcEvent> {
}
@Override
- public boolean isSchemaChange() {
- return raw.schemaChange() != null;
+ public boolean isUpdatedDataFields() {
+ return raw.updatedDataFields() != null;
}
@Override
- public List<SchemaChange> getSchemaChanges() {
- return Collections.singletonList(raw.schemaChange());
+ public Optional<List<DataField>> getUpdatedDataFields() {
+ return Optional.ofNullable(raw.updatedDataFields());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index f5be3dca3..650cf3ccf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -19,8 +19,8 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
@@ -102,19 +102,12 @@ public class TestTable {
if (random.nextBoolean()) {
int idx = random.nextInt(fieldNames.size());
isBigInt.set(idx, true);
- events.add(
- new TestCdcEvent(
- tableName,
- SchemaChange.updateColumnType(
- fieldNames.get(idx),
DataTypes.BIGINT())));
} else {
String newName = "v" + fieldNames.size();
fieldNames.add(newName);
isBigInt.add(false);
- events.add(
- new TestCdcEvent(
- tableName, SchemaChange.addColumn(newName,
DataTypes.INT())));
}
+ events.add(new TestCdcEvent(tableName,
currentDataFieldList(fieldNames, isBigInt)));
} else {
Map<String, String> fields = new HashMap<>();
int key = random.nextInt(numKeys);
@@ -142,6 +135,25 @@ public class TestTable {
}
}
+ private List<DataField> currentDataFieldList(List<String> fieldNames,
List<Boolean> isBigInt) {
+ List<DataField> fields = new ArrayList<>();
+
+ // pt
+ fields.add(initialRowType.getFields().get(0));
+ // k
+ fields.add(initialRowType.getFields().get(1));
+
+ for (int i = 0; i < fieldNames.size(); i++) {
+ fields.add(
+ new DataField(
+ 2 + i,
+ fieldNames.get(i),
+ isBigInt.get(i) ? DataTypes.BIGINT() :
DataTypes.INT()));
+ }
+
+ return fields;
+ }
+
public RowType initialRowType() {
return initialRowType;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 902dd9d4c..f2e125c89 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -41,7 +41,14 @@ CREATE TABLE schema_evolution_2 (
PRIMARY KEY (_id)
);
--- add comment lines for the convenience of reading
+CREATE TABLE schema_evolution_multiple (
+ _id INT,
+ v1 VARCHAR(10),
+ v2 INT,
+ v3 VARCHAR(10),
+ PRIMARY KEY (_id)
+);
+
CREATE TABLE all_types_table (
_id INT,
-- TINYINT