This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 23d4c683c9 [cdc] mysql cdc supports table comment changes. (#5258)
23d4c683c9 is described below
commit 23d4c683c9dbe7efc204cbf6a8d183cd09522b1e
Author: Kerwin <[email protected]>
AuthorDate: Tue Mar 18 18:27:05 2025 +0800
[cdc] mysql cdc supports table comment changes. (#5258)
---
docs/content/cdc-ingestion/mysql-cdc.md | 5 +-
.../cdc/format/AbstractJsonRecordParser.java | 18 +-
.../action/cdc/format/AbstractRecordParser.java | 28 +--
.../cdc/format/aliyun/AliyunRecordParser.java | 8 +-
.../action/cdc/format/canal/CanalRecordParser.java | 10 +-
.../format/debezium/DebeziumAvroRecordParser.java | 14 +-
.../format/debezium/DebeziumBsonRecordParser.java | 8 +-
.../format/debezium/DebeziumJsonRecordParser.java | 10 +-
.../mongodb/strategy/Mongo4VersionStrategy.java | 16 +-
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 26 ++-
.../flink/action/cdc/mysql/MySqlRecordParser.java | 33 +--
.../cdc/mysql/format/DebeziumEventUtils.java | 49 ++++-
.../action/cdc/postgres/PostgresRecordParser.java | 23 +-
.../cdc/CdcDynamicTableParsingProcessFunction.java | 25 +--
.../cdc/CdcMultiTableParsingProcessFunction.java | 26 +--
.../flink/sink/cdc/CdcParsingProcessFunction.java | 20 +-
.../apache/paimon/flink/sink/cdc/CdcSchema.java | 187 ++++++++++++++++
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 2 +-
.../apache/paimon/flink/sink/cdc/EventParser.java | 2 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 4 +-
...MultiTableUpdatedDataFieldsProcessFunction.java | 16 +-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 8 +-
.../flink/sink/cdc/RichCdcMultiplexRecord.java | 45 ++--
.../cdc/RichCdcMultiplexRecordEventParser.java | 9 +-
.../paimon/flink/sink/cdc/RichCdcRecord.java | 33 ++-
.../paimon/flink/sink/cdc/RichEventParser.java | 20 +-
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 21 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 15 +-
.../flink/action/cdc/SchemaEvolutionTest.java | 243 +++++++++++----------
.../action/cdc/SyncDatabaseActionBaseTest.java | 27 +--
.../debezium/DebeziumBsonRecordParserTest.java | 24 +-
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 3 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 51 +++--
.../flink/sink/cdc/CdcRecordSerializeITCase.java | 19 +-
.../apache/paimon/flink/sink/cdc/TestCdcEvent.java | 17 +-
.../paimon/flink/sink/cdc/TestCdcEventParser.java | 5 +-
.../apache/paimon/flink/sink/cdc/TestTable.java | 21 +-
37 files changed, 639 insertions(+), 452 deletions(-)
diff --git a/docs/content/cdc-ingestion/mysql-cdc.md
b/docs/content/cdc-ingestion/mysql-cdc.md
index 1632ef4c81..01b306470a 100644
--- a/docs/content/cdc-ingestion/mysql-cdc.md
+++ b/docs/content/cdc-ingestion/mysql-cdc.md
@@ -264,8 +264,11 @@ to avoid potential name conflict.
## FAQ
1. Chinese characters in records ingested from MySQL are garbled.
+
* Try to set `env.java.opts: -Dfile.encoding=UTF-8` in `flink-conf.yaml`(Flink
version < 1.19) or `config.yaml`(Flink version >= 1.19)
(the option is changed to `env.java.opts.all` since Flink-1.17).
-2. Synchronize MySQL Table comment.
+2. Synchronize MySQL table and column comment.
+
* Synchronize MySQL create table comment to the paimon table, you need to
configure `--mysql_conf jdbc.properties.useInformationSchema=true`.
+* Synchronize MySQL alter table or column comment to the paimon table, you
need to configure `--mysql_conf debezium.include.schema.comments=true`.
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
index 1d775e9044..76289aa355 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
@@ -21,10 +21,10 @@ package org.apache.paimon.flink.action.cdc.format;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TypeUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -76,13 +76,13 @@ public abstract class AbstractJsonRecordParser extends
AbstractRecordParser {
protected abstract String dataField();
// use STRING type in default when we cannot get origin data types (most
cases)
- protected void fillDefaultTypes(JsonNode record, RowType.Builder
rowTypeBuilder) {
+ protected void fillDefaultTypes(JsonNode record, CdcSchema.Builder
schemaBuilder) {
record.fieldNames()
- .forEachRemaining(name -> rowTypeBuilder.field(name,
DataTypes.STRING()));
+ .forEachRemaining(name -> schemaBuilder.column(name,
DataTypes.STRING()));
}
- protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
- fillDefaultTypes(record, rowTypeBuilder);
+ protected Map<String, String> extractRowData(JsonNode record,
CdcSchema.Builder schemaBuilder) {
+ fillDefaultTypes(record, schemaBuilder);
Map<String, Object> recordMap =
convertValue(record, new TypeReference<Map<String, Object>>()
{});
Map<String, String> rowData =
@@ -103,7 +103,7 @@ public abstract class AbstractJsonRecordParser extends
AbstractRecordParser {
}
return
Objects.toString(entry.getValue());
}));
- evalComputedColumns(rowData, rowTypeBuilder);
+ evalComputedColumns(rowData, schemaBuilder);
return rowData;
}
@@ -121,9 +121,9 @@ public abstract class AbstractJsonRecordParser extends
AbstractRecordParser {
protected void processRecord(
JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord>
records) {
- RowType.Builder rowTypeBuilder = RowType.builder();
- Map<String, String> rowData = this.extractRowData(jsonNode,
rowTypeBuilder);
- records.add(createRecord(rowKind, rowData,
rowTypeBuilder.build().getFields()));
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+ Map<String, String> rowData = this.extractRowData(jsonNode,
schemaBuilder);
+ records.add(createRecord(rowKind, rowData, schemaBuilder));
}
protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 8b8946a99a..1834444afa 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -22,11 +22,10 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
@@ -71,20 +70,7 @@ public abstract class AbstractRecordParser
}
Optional<RichCdcMultiplexRecord> recordOpt =
extractRecords().stream().findFirst();
- if (!recordOpt.isPresent()) {
- return null;
- }
-
- Schema.Builder builder = Schema.newBuilder();
- recordOpt
- .get()
- .fields()
- .forEach(
- field ->
- builder.column(
- field.name(), field.type(),
field.description()));
- builder.primaryKey(extractPrimaryKeys());
- return builder.build();
+ return
recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null);
} catch (Exception e) {
logInvalidSourceRecord(record);
throw e;
@@ -114,24 +100,24 @@ public abstract class AbstractRecordParser
/** generate values for computed columns. */
protected void evalComputedColumns(
- Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
+ Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
computedColumns.forEach(
computedColumn -> {
rowData.put(
computedColumn.columnName(),
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
- rowTypeBuilder.field(computedColumn.columnName(),
computedColumn.columnType());
+ schemaBuilder.column(computedColumn.columnName(),
computedColumn.columnType());
});
}
/** Handle case sensitivity here. */
protected RichCdcMultiplexRecord createRecord(
- RowKind rowKind, Map<String, String> data, List<DataField>
paimonFields) {
+ RowKind rowKind, Map<String, String> data, CdcSchema.Builder
schemaBuilder) {
+ schemaBuilder.primaryKey(extractPrimaryKeys());
return new RichCdcMultiplexRecord(
getDatabaseName(),
getTableName(),
- paimonFields,
- extractPrimaryKeys(),
+ schemaBuilder.build(),
new CdcRecord(rowKind, data));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
index 9525093af5..e14e4ab4b7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -166,7 +166,7 @@ public class AliyunRecordParser extends
AbstractJsonRecordParser {
}
@Override
- protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ protected Map<String, String> extractRowData(JsonNode record,
CdcSchema.Builder schemaBuilder) {
Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
@@ -184,14 +184,14 @@ public class AliyunRecordParser extends
AbstractJsonRecordParser {
Tuple3<String, Integer, Integer> typeInfo =
MySqlTypeUtils.getTypeInfo(originalType);
DataType paimonDataType =
MySqlTypeUtils.toDataType(typeInfo.f0, typeInfo.f1,
typeInfo.f2, typeMapping);
- rowTypeBuilder.field(originalName, paimonDataType);
+ schemaBuilder.column(originalName, paimonDataType);
}
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
rowData.put(entry.getKey(), Objects.toString(entry.getValue(),
null));
}
- evalComputedColumns(rowData, rowTypeBuilder);
+ evalComputedColumns(rowData, schemaBuilder);
return rowData;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 170cea4d7f..5864396564 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -152,7 +152,7 @@ public class CanalRecordParser extends
AbstractJsonRecordParser {
}
@Override
- protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ protected Map<String, String> extractRowData(JsonNode record,
CdcSchema.Builder schemaBuilder) {
LinkedHashMap<String, String> originalFieldTypes =
tryExtractOriginalFieldTypes();
Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
@@ -167,20 +167,20 @@ public class CanalRecordParser extends
AbstractJsonRecordParser {
DataType paimonDataType =
MySqlTypeUtils.toDataType(
typeInfo.f0, typeInfo.f1, typeInfo.f2,
typeMapping);
- rowTypeBuilder.field(originalName, paimonDataType);
+ schemaBuilder.column(originalName, paimonDataType);
String filedValue =
Objects.toString(recordMap.get(originalName), null);
String newValue = transformValue(filedValue, typeInfo.f0,
originalType);
rowData.put(originalName, newValue);
}
} else {
- fillDefaultTypes(record, rowTypeBuilder);
+ fillDefaultTypes(record, schemaBuilder);
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
rowData.put(entry.getKey(), Objects.toString(entry.getValue(),
null));
}
}
- evalComputedColumns(rowData, rowTypeBuilder);
+ evalComputedColumns(rowData, schemaBuilder);
return rowData;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
index f89183d6d3..7c3763a604 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
@@ -22,9 +22,9 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -113,9 +113,9 @@ public class DebeziumAvroRecordParser extends
AbstractRecordParser {
private void processRecord(
GenericRecord record, RowKind rowKind,
List<RichCdcMultiplexRecord> records) {
- RowType.Builder rowTypeBuilder = RowType.builder();
- Map<String, String> rowData = this.extractRowData(record,
rowTypeBuilder);
- records.add(createRecord(rowKind, rowData,
rowTypeBuilder.build().getFields()));
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+ Map<String, String> rowData = this.extractRowData(record,
schemaBuilder);
+ records.add(createRecord(rowKind, rowData, schemaBuilder));
}
@Override
@@ -128,7 +128,7 @@ public class DebeziumAvroRecordParser extends
AbstractRecordParser {
}
private Map<String, String> extractRowData(
- GenericRecord record, RowType.Builder rowTypeBuilder) {
+ GenericRecord record, CdcSchema.Builder schemaBuilder) {
Schema payloadSchema = sanitizedSchema(record.getSchema());
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
@@ -155,10 +155,10 @@ public class DebeziumAvroRecordParser extends
AbstractRecordParser {
record.get(fieldName),
ZoneOffset.UTC);
resultMap.put(fieldName, transformed);
- rowTypeBuilder.field(fieldName, avroToPaimonDataType(schema));
+ schemaBuilder.column(fieldName, avroToPaimonDataType(schema));
}
- evalComputedColumns(resultMap, rowTypeBuilder);
+ evalComputedColumns(resultMap, schemaBuilder);
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 397575c8e5..5c13170638 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.TypeUtils;
@@ -121,7 +121,7 @@ public class DebeziumBsonRecordParser extends
DebeziumJsonRecordParser {
}
@Override
- protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ protected Map<String, String> extractRowData(JsonNode record,
CdcSchema.Builder schemaBuilder) {
// bson record should be a string
Preconditions.checkArgument(
record.isTextual(),
@@ -133,10 +133,10 @@ public class DebeziumBsonRecordParser extends
DebeziumJsonRecordParser {
for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
String fieldName = entry.getKey();
resultMap.put(fieldName,
toJsonString(BsonValueConvertor.convert(entry.getValue())));
- rowTypeBuilder.field(fieldName, DataTypes.STRING());
+ schemaBuilder.column(fieldName, DataTypes.STRING());
}
- evalComputedColumns(resultMap, rowTypeBuilder);
+ evalComputedColumns(resultMap, schemaBuilder);
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 6e06e3adca..19156fb916 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -22,9 +22,9 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
@@ -181,9 +181,9 @@ public class DebeziumJsonRecordParser extends
AbstractJsonRecordParser {
}
@Override
- protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ protected Map<String, String> extractRowData(JsonNode record,
CdcSchema.Builder schemaBuilder) {
if (!hasSchema) {
- return super.extractRowData(record, rowTypeBuilder);
+ return super.extractRowData(record, schemaBuilder);
}
Map<String, Object> recordMap =
@@ -205,13 +205,13 @@ public class DebeziumJsonRecordParser extends
AbstractJsonRecordParser {
ZoneOffset.UTC);
resultMap.put(fieldName, transformed);
- rowTypeBuilder.field(
+ schemaBuilder.column(
fieldName,
DebeziumSchemaUtils.toDataType(
debeziumType, className,
parameters.get(fieldName)));
}
- evalComputedColumns(resultMap, rowTypeBuilder);
+ evalComputedColumns(resultMap, schemaBuilder);
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 450eacaefe..5f9538d2fc 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -20,10 +20,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -123,16 +122,11 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
*/
private RichCdcMultiplexRecord processRecord(JsonNode fullDocument,
RowKind rowKind)
throws JsonProcessingException {
- RowType.Builder rowTypeBuilder = RowType.builder();
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
Map<String, String> record =
- getExtractRow(fullDocument, rowTypeBuilder, computedColumns,
mongodbConfig);
- List<DataField> fields = rowTypeBuilder.build().getFields();
-
+ getExtractRow(fullDocument, schemaBuilder, computedColumns,
mongodbConfig);
+ schemaBuilder.primaryKey(extractPrimaryKeys());
return new RichCdcMultiplexRecord(
- databaseName,
- collection,
- fields,
- extractPrimaryKeys(),
- new CdcRecord(rowKind, record));
+ databaseName, collection, schemaBuilder.build(), new
CdcRecord(rowKind, record));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index df288a4150..baea947276 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -71,14 +71,14 @@ public interface MongoVersionStrategy {
* Determines the extraction mode and retrieves the row accordingly.
*
* @param jsonNode The JsonNode representing the MongoDB document.
- * @param rowTypeBuilder row type builder.
+ * @param schemaBuilder schema builder.
* @param mongodbConfig Configuration for the MongoDB connection.
* @return A map representing the extracted row.
* @throws JsonProcessingException If there's an error during JSON
processing.
*/
default Map<String, String> getExtractRow(
JsonNode jsonNode,
- RowType.Builder rowTypeBuilder,
+ CdcSchema.Builder schemaBuilder,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig)
throws JsonProcessingException {
@@ -104,9 +104,9 @@ public interface MongoVersionStrategy {
mongodbConfig.get(PARSER_PATH),
mongodbConfig.get(FIELD_NAME),
computedColumns,
- rowTypeBuilder);
+ schemaBuilder);
case DYNAMIC:
- return parseAndTypeJsonRow(document.toString(),
rowTypeBuilder, computedColumns);
+ return parseAndTypeJsonRow(document.toString(), schemaBuilder,
computedColumns);
default:
throw new RuntimeException("Unsupported extraction mode: " +
mode);
}
@@ -114,9 +114,11 @@ public interface MongoVersionStrategy {
/** Parses and types a JSON row based on the given parameters. */
default Map<String, String> parseAndTypeJsonRow(
- String evaluate, RowType.Builder rowTypeBuilder,
List<ComputedColumn> computedColumns) {
+ String evaluate,
+ CdcSchema.Builder schemaBuilder,
+ List<ComputedColumn> computedColumns) {
Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
- return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
+ return processParsedData(parsedRow, schemaBuilder, computedColumns);
}
/** Parses fields from a JSON record based on the given parameters. */
@@ -125,7 +127,7 @@ public interface MongoVersionStrategy {
String fieldPaths,
String fieldNames,
List<ComputedColumn> computedColumns,
- RowType.Builder rowTypeBuilder) {
+ CdcSchema.Builder schemaBuilder) {
String[] columnNames = fieldNames.split(",");
String[] parseNames = fieldPaths.split(",");
Map<String, String> parsedRow = new HashMap<>();
@@ -135,20 +137,20 @@ public interface MongoVersionStrategy {
parsedRow.put(columnNames[i],
Optional.ofNullable(evaluate).orElse("{}"));
}
- return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
+ return processParsedData(parsedRow, schemaBuilder, computedColumns);
}
/** Processes the parsed data to generate the result map and update field
types. */
static Map<String, String> processParsedData(
Map<String, String> parsedRow,
- RowType.Builder rowTypeBuilder,
+ CdcSchema.Builder schemaBuilder,
List<ComputedColumn> computedColumns) {
int initialCapacity = parsedRow.size() + computedColumns.size();
Map<String, String> resultMap = new HashMap<>(initialCapacity);
parsedRow.forEach(
(column, value) -> {
- rowTypeBuilder.field(column, DataTypes.STRING());
+ schemaBuilder.column(column, DataTypes.STRING());
resultMap.put(column, value);
});
computedColumns.forEach(
@@ -158,7 +160,7 @@ public interface MongoVersionStrategy {
String computedValue =
computedColumn.eval(parsedRow.get(fieldReference));
resultMap.put(columnName, computedValue);
- rowTypeBuilder.field(columnName,
computedColumn.columnType());
+ schemaBuilder.column(columnName,
computedColumn.columnType());
});
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 26579e718f..0fffdf0b98 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -25,11 +25,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
@@ -165,17 +164,15 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
Table table = tableChange.getTable();
- List<DataField> fields = extractFields(table);
- List<String> primaryKeys = table.primaryKeyColumnNames();
+ CdcSchema schema = extractSchema(table);
- // TODO : add table comment and column comment when we upgrade flink
cdc to 2.4
return Collections.singletonList(
new RichCdcMultiplexRecord(
- databaseName, currentTable, fields, primaryKeys,
CdcRecord.emptyRecord()));
+ databaseName, currentTable, schema,
CdcRecord.emptyRecord()));
}
- private List<DataField> extractFields(Table table) {
- RowType.Builder rowType = RowType.builder();
+ private CdcSchema extractSchema(Table table) {
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
List<Column> columns = table.columns();
for (Column column : columns) {
@@ -189,12 +186,20 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
// add column comment when we upgrade flink cdc to 2.4
if (isDebeziumSchemaCommentsEnabled) {
- rowType.field(column.name(), dataType, column.comment());
+ schemaBuilder.column(column.name(), dataType,
column.comment());
} else {
- rowType.field(column.name(), dataType);
+ schemaBuilder.column(column.name(), dataType);
}
}
- return rowType.build().getFields();
+
+ schemaBuilder.primaryKey(table.primaryKeyColumnNames());
+
+ // add table comment when we upgrade flink cdc to 2.4
+ if (isDebeziumSchemaCommentsEnabled) {
+ schemaBuilder.comment(table.comment());
+ }
+
+ return schemaBuilder.build();
}
private List<RichCdcMultiplexRecord> extractRecords() {
@@ -270,10 +275,6 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String,
String> data) {
return new RichCdcMultiplexRecord(
- databaseName,
- currentTable,
- Collections.emptyList(),
- Collections.emptyList(),
- new CdcRecord(rowKind, data));
+ databaseName, currentTable, null, new CdcRecord(rowKind,
data));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
index c03b050fa2..fff189cfc0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
@@ -19,9 +19,13 @@
package org.apache.paimon.flink.action.cdc.mysql.format;
import io.debezium.document.Array;
+import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
+import io.debezium.relational.Table;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.relational.history.TableChanges.TableChangeType;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import java.io.IOException;
@@ -30,8 +34,6 @@ import java.io.IOException;
public class DebeziumEventUtils {
private static final DocumentReader DOCUMENT_READER =
DocumentReader.defaultReader();
- private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
- new FlinkJsonTableChangeSerializer();
public static HistoryRecord getHistoryRecord(String historyRecordStr)
throws IOException {
return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
@@ -39,7 +41,46 @@ public class DebeziumEventUtils {
public static TableChanges getTableChanges(String historyRecordStr) throws
IOException {
HistoryRecord historyRecord = getHistoryRecord(historyRecordStr);
- Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
- return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ Array tableChangesDocument =
+
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+ return deserialize(tableChangesDocument, true);
+ }
+
+ /**
+ * Copy from {@link FlinkJsonTableChangeSerializer#deserialize}, add a
method to supplement
+ * table comment. TODO remove this method after the method is added to
{@link
+ * FlinkJsonTableChangeSerializer}.
+ */
+ private static TableChanges deserialize(Array array, boolean
useCatalogBeforeSchema) {
+ TableChanges tableChanges = new TableChanges();
+
+ for (Array.Entry entry : array) {
+ Document document = entry.getValue().asDocument();
+ TableChange change =
+ FlinkJsonTableChangeSerializer.fromDocument(document,
useCatalogBeforeSchema);
+
+ if (change.getType() == TableChangeType.CREATE) {
+ // tableChanges.create(change.getTable());
+ tableChanges.create(supplementTableComment(document,
change.getTable()));
+ } else if (change.getType() == TableChangeType.ALTER) {
+ // tableChanges.alter(change.getTable());
+ tableChanges.alter(supplementTableComment(document,
change.getTable()));
+ } else if (change.getType() == TableChangeType.DROP) {
+ tableChanges.drop(change.getTable());
+ }
+ }
+
+ return tableChanges;
+ }
+
+ private static Table supplementTableComment(Document document, Table
table) {
+ if (table.comment() != null) {
+ return table;
+ }
+ String comment = document.getString("comment");
+ if (comment != null) {
+ return table.edit().setComment(comment).create();
+ }
+ return table;
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 07156823b3..af114cb920 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -24,12 +24,11 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -62,7 +61,6 @@ import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -123,7 +121,7 @@ public class PostgresRecordParser
extractRecords().forEach(out::collect);
}
- private List<DataField> extractFields(DebeziumEvent.Field schema) {
+ private CdcSchema extractSchema(DebeziumEvent.Field schema) {
Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
Preconditions.checkArgument(
!afterFields.isEmpty(),
@@ -131,7 +129,7 @@ public class PostgresRecordParser
+ "Please make sure that `includeSchema` is true "
+ "in the JsonDebeziumDeserializationSchema you
created");
- RowType.Builder rowType = RowType.builder();
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
afterFields.forEach(
(key, value) -> {
DataType dataType = extractFieldType(value);
@@ -139,9 +137,9 @@ public class PostgresRecordParser
dataType.copy(
typeMapping.containsMode(TO_NULLABLE) ||
value.optional());
- rowType.field(key, dataType);
+ schemaBuilder.column(key, dataType);
});
- return rowType.build().getFields();
+ return schemaBuilder.build();
}
/**
@@ -217,13 +215,12 @@ public class PostgresRecordParser
Map<String, String> after = extractRow(root.payload().after());
if (!after.isEmpty()) {
- List<DataField> fields = extractFields(root.schema());
+ CdcSchema schema = extractSchema(root.schema());
records.add(
new RichCdcMultiplexRecord(
databaseName,
currentTable,
- fields,
- Collections.emptyList(),
+ schema,
new CdcRecord(RowKind.INSERT, after)));
}
@@ -365,10 +362,6 @@ public class PostgresRecordParser
protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String,
String> data) {
return new RichCdcMultiplexRecord(
- databaseName,
- currentTable,
- Collections.emptyList(),
- Collections.emptyList(),
- new CdcRecord(rowKind, data));
+ databaseName, currentTable, null, new CdcRecord(rowKind,
data));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 4efcf1207e..d56963731f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.types.DataField;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -34,13 +33,11 @@ import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
/**
- * A {@link ProcessFunction} to parse CDC change event to either a list of
{@link DataField}s or
- * {@link CdcRecord} and send them to different side outputs according to
table name. This process
- * function will capture newly added tables when syncing entire database and
in cases where the
- * newly added tables are including by attesting table filters.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link
CdcSchema}s or {@link
+ * CdcRecord} and send them to different side outputs according to table name.
This process function
+ * will capture newly added tables when syncing entire database and in cases
where the newly added
+ * tables are including by attesting table filters.
*
* <p>This {@link ProcessFunction} can handle records for different tables at
the same time.
*
@@ -54,12 +51,10 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
public static final OutputTag<CdcMultiplexRecord> DYNAMIC_OUTPUT_TAG =
new OutputTag<>("paimon-dynamic-table",
TypeInformation.of(CdcMultiplexRecord.class));
- public static final OutputTag<Tuple2<Identifier, List<DataField>>>
- DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG =
- new OutputTag<>(
- "paimon-dynamic-table-schema-change",
- TypeInformation.of(
- new TypeHint<Tuple2<Identifier,
List<DataField>>>() {}));
+ public static final OutputTag<Tuple2<Identifier, CdcSchema>>
DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG =
+ new OutputTag<>(
+ "paimon-dynamic-table-schema-change",
+ TypeInformation.of(new TypeHint<Tuple2<Identifier,
CdcSchema>>() {}));
private final EventParser.Factory<T> parserFactory;
private final String database;
@@ -117,8 +112,8 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
}
});
- List<DataField> schemaChange = parser.parseSchemaChange();
- if (!schemaChange.isEmpty()) {
+ CdcSchema schemaChange = parser.parseSchemaChange();
+ if (schemaChange != null) {
context.output(
DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG,
Tuple2.of(Identifier.create(database, tableName),
schemaChange));
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 4c5e0600bb..899ca54465 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -18,23 +18,19 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataField;
-
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
- * A {@link ProcessFunction} to parse CDC change event to either a list of
{@link DataField}s or
- * {@link CdcRecord} and send them to different side outputs according to
table name.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link
CdcSchema} or {@link
+ * CdcRecord} and send them to different side outputs according to table name.
*
* <p>This {@link ProcessFunction} can handle records for different tables at
the same time.
*
@@ -45,7 +41,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
private final EventParser.Factory<T> parserFactory;
private transient EventParser<T> parser;
- private transient Map<String, OutputTag<List<DataField>>>
updatedDataFieldsOutputTags;
+ private transient Map<String, OutputTag<CdcSchema>> schemaChangeOutputTags;
private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
public CdcMultiTableParsingProcessFunction(EventParser.Factory<T>
parserFactory) {
@@ -64,7 +60,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
*/
public void open(Configuration parameters) throws Exception {
parser = parserFactory.create();
- updatedDataFieldsOutputTags = new HashMap<>();
+ schemaChangeOutputTags = new HashMap<>();
recordOutputTags = new HashMap<>();
}
@@ -72,22 +68,22 @@ public class CdcMultiTableParsingProcessFunction<T> extends
ProcessFunction<T, V
public void processElement(T raw, Context context, Collector<Void>
collector) throws Exception {
parser.setRawEvent(raw);
String tableName = parser.parseTableName();
- List<DataField> schemaChange = parser.parseSchemaChange();
- if (!schemaChange.isEmpty()) {
+ CdcSchema schemaChange = parser.parseSchemaChange();
+ if (schemaChange != null) {
context.output(getUpdatedDataFieldsOutputTag(tableName),
schemaChange);
}
parser.parseRecords()
.forEach(record ->
context.output(getRecordOutputTag(tableName), record));
}
- private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String
tableName) {
- return updatedDataFieldsOutputTags.computeIfAbsent(
- tableName,
CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
+ private OutputTag<CdcSchema> getUpdatedDataFieldsOutputTag(String
tableName) {
+ return schemaChangeOutputTags.computeIfAbsent(
+ tableName,
CdcMultiTableParsingProcessFunction::createSchameChangeOutputTag);
}
- public static OutputTag<List<DataField>>
createUpdatedDataFieldsOutputTag(String tableName) {
+ public static OutputTag<CdcSchema> createSchameChangeOutputTag(String
tableName) {
return new OutputTag<>(
- "new-data-field-list-" + tableName, new
ListTypeInfo<>(DataField.class));
+ "table-schema-change-" + tableName,
TypeInformation.of(CdcSchema.class));
}
private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index eec228f3c0..9e267eda24 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,20 +18,16 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataField;
-
import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
-import java.util.List;
-
/**
- * A {@link ProcessFunction} to parse CDC change event to either a list of
{@link DataField}s or
- * {@link CdcRecord} and send them to different downstreams.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link
CdcSchema} or {@link
+ * CdcRecord} and send them to different downstreams.
*
* <p>This {@link ProcessFunction} can only handle records for a single
constant table. To handle
* records for different tables, see {@link
CdcMultiTableParsingProcessFunction}.
@@ -40,8 +36,8 @@ import java.util.List;
*/
public class CdcParsingProcessFunction<T> extends ProcessFunction<T,
CdcRecord> {
- public static final OutputTag<List<DataField>>
NEW_DATA_FIELD_LIST_OUTPUT_TAG =
- new OutputTag<>("new-data-field-list", new
ListTypeInfo<>(DataField.class));
+ public static final OutputTag<CdcSchema> SCHEMA_CHANGE_OUTPUT_TAG =
+ new OutputTag<>("table-schema-change",
TypeInformation.of(CdcSchema.class));
private final EventParser.Factory<T> parserFactory;
@@ -69,9 +65,9 @@ public class CdcParsingProcessFunction<T> extends
ProcessFunction<T, CdcRecord>
public void processElement(T raw, Context context, Collector<CdcRecord>
collector)
throws Exception {
parser.setRawEvent(raw);
- List<DataField> schemaChange = parser.parseSchemaChange();
- if (!schemaChange.isEmpty()) {
- context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, schemaChange);
+ CdcSchema schemaChange = parser.parseSchemaChange();
+ if (schemaChange != null) {
+ context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
}
parser.parseRecords().forEach(collector::collect);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
new file mode 100644
index 0000000000..b5199d3a0b
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.ReassignFieldId;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** A schema change message from the CDC source. */
+public class CdcSchema implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<DataField> fields;
+
+ private final List<String> primaryKeys;
+
+ @Nullable private final String comment;
+
+ public CdcSchema(List<DataField> fields, List<String> primaryKeys,
@Nullable String comment) {
+ this.fields = fields;
+ this.primaryKeys = primaryKeys;
+ this.comment = comment;
+ }
+
+ public List<DataField> fields() {
+ return fields;
+ }
+
+ public List<String> primaryKeys() {
+ return primaryKeys;
+ }
+
+ public String comment() {
+ return comment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CdcSchema that = (CdcSchema) o;
+ return Objects.equals(fields, that.fields)
+ && Objects.equals(primaryKeys, that.primaryKeys)
+ && Objects.equals(comment, that.comment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fields, primaryKeys, comment);
+ }
+
+ @Override
+ public String toString() {
+ return "Schema{"
+ + "fields="
+ + fields
+ + ", primaryKeys="
+ + primaryKeys
+ + ", comment="
+ + comment
+ + '}';
+ }
+
+ /** Builder for configuring and creating instances of {@link CdcSchema}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /** A builder for constructing an immutable but still unresolved {@link
CdcSchema}. */
+ public static final class Builder {
+
+ private final List<DataField> columns = new ArrayList<>();
+
+ private List<String> primaryKeys = new ArrayList<>();
+
+ @Nullable private String comment;
+
+ private final AtomicInteger highestFieldId = new AtomicInteger(-1);
+
+ public int getHighestFieldId() {
+ return highestFieldId.get();
+ }
+
+ /**
+ * Declares a column that is appended to this schema.
+ *
+ * @param dataField data field
+ */
+ public Builder column(DataField dataField) {
+ Preconditions.checkNotNull(dataField, "Data field must not be
null.");
+ Preconditions.checkNotNull(dataField.name(), "Column name must not
be null.");
+ Preconditions.checkNotNull(dataField.type(), "Data type must not
be null.");
+ columns.add(dataField);
+ return this;
+ }
+
+ /**
+ * Declares a column that is appended to this schema.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ */
+ public Builder column(String columnName, DataType dataType) {
+ return column(columnName, dataType, null);
+ }
+
+ /**
+ * Declares a column that is appended to this schema.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ * @param description description of the column
+ */
+ public Builder column(String columnName, DataType dataType, @Nullable
String description) {
+ Preconditions.checkNotNull(columnName, "Column name must not be
null.");
+ Preconditions.checkNotNull(dataType, "Data type must not be
null.");
+
+ int id = highestFieldId.incrementAndGet();
+ DataType reassignDataType = ReassignFieldId.reassign(dataType,
highestFieldId);
+ columns.add(new DataField(id, columnName, reassignDataType,
description));
+ return this;
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns.
Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be
nullable.
+ *
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKey(String... columnNames) {
+ return primaryKey(Arrays.asList(columnNames));
+ }
+
+ /**
+ * Declares a primary key constraint for a set of given columns.
Primary key uniquely
+ * identify a row in a table. Neither of columns in a primary can be
nullable.
+ *
+ * @param columnNames columns that form a unique primary key
+ */
+ public Builder primaryKey(List<String> columnNames) {
+ this.primaryKeys = new ArrayList<>(columnNames);
+ return this;
+ }
+
+ /** Declares table comment. */
+ public Builder comment(@Nullable String comment) {
+ this.comment = comment;
+ return this;
+ }
+
+ /** Returns an instance of an unresolved {@link CdcSchema}. */
+ public CdcSchema build() {
+ return new CdcSchema(columns, primaryKeys, comment);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 7edde41f5b..7b24ff8138 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -112,7 +112,7 @@ public class CdcSinkBuilder<T> {
DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
- parsed,
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
+ parsed,
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
.process(
new UpdatedDataFieldsProcessFunction(
new SchemaManager(dataTable.fileIO(),
dataTable.location()),
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index c0e4810128..a7ce1c2cf5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -47,7 +47,7 @@ public interface EventParser<T> {
*
* @return empty if there is no schema change
*/
- List<DataField> parseSchemaChange();
+ CdcSchema parseSchemaChange();
/**
* Parse records from event.
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index b15a17a679..196b9e4350 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -222,8 +222,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
DataStream<Void> schemaChangeProcessFunction =
SingleOutputStreamOperatorUtils.getSideOutput(
parsed,
- CdcMultiTableParsingProcessFunction
-
.createUpdatedDataFieldsOutputTag(table.name()))
+
CdcMultiTableParsingProcessFunction.createSchameChangeOutputTag(
+ table.name()))
.process(
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(),
table.location()),
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 71f71b2412..cc8118f6ca 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -34,19 +33,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
- * A {@link ProcessFunction} to handle schema changes. New schema is
represented by a list of {@link
- * DataField}s.
+ * A {@link ProcessFunction} to handle schema changes. New schema is
represented by a {@link
+ * CdcSchema}.
*
* <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link
ProcessFunction} must
* be 1.
*/
public class MultiTableUpdatedDataFieldsProcessFunction
- extends UpdatedDataFieldsProcessFunctionBase<Tuple2<Identifier,
List<DataField>>, Void> {
+ extends UpdatedDataFieldsProcessFunctionBase<Tuple2<Identifier,
CdcSchema>, Void> {
private static final Logger LOG =
LoggerFactory.getLogger(MultiTableUpdatedDataFieldsProcessFunction.class);
@@ -60,11 +58,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction
@Override
public void processElement(
- Tuple2<Identifier, List<DataField>> updatedDataFields,
- Context context,
- Collector<Void> collector)
+ Tuple2<Identifier, CdcSchema> updatedSchema, Context context,
Collector<Void> collector)
throws Exception {
- Identifier tableId = updatedDataFields.f0;
+ Identifier tableId = updatedSchema.f0;
SchemaManager schemaManager =
schemaManagers.computeIfAbsent(
tableId,
@@ -82,7 +78,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
LOG.error("Failed to get schema manager for table " + tableId);
} else {
for (SchemaChange schemaChange :
- extractSchemaChanges(schemaManager, updatedDataFields.f1))
{
+ extractSchemaChanges(schemaManager, updatedSchema.f1)) {
applySchemaChange(schemaManager, schemaChange, tableId);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index ce1bcc47db..0d4a74ac2f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -59,13 +59,7 @@ public class NewTableSchemaBuilder implements Serializable {
}
public Optional<Schema> build(RichCdcMultiplexRecord record) {
- Schema sourceSchema =
- new Schema(
- record.fields(),
- Collections.emptyList(),
- record.primaryKeys(),
- Collections.emptyMap(),
- null);
+ Schema sourceSchema = record.buildSchema();
List<String> specifiedPartitionKeys = new ArrayList<>();
List<String> partitionKeyMultipleList =
partitionKeyMultiple.get(record.tableName());
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index 1a01f0273d..5417cd9561 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -18,13 +18,12 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataField;
+import org.apache.paimon.schema.Schema;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
import java.util.Objects;
/** Compared to {@link CdcMultiplexRecord}, this contains schema information.
*/
@@ -34,25 +33,17 @@ public class RichCdcMultiplexRecord implements Serializable
{
@Nullable private final String databaseName;
@Nullable private final String tableName;
- private final List<DataField> fields;
- private final List<String> primaryKeys;
+ private final CdcSchema cdcSchema;
private final CdcRecord cdcRecord;
public RichCdcMultiplexRecord(
@Nullable String databaseName,
@Nullable String tableName,
- List<DataField> fields,
- List<String> primaryKeys,
+ @Nullable CdcSchema cdcSchema,
CdcRecord cdcRecord) {
this.databaseName = databaseName;
this.tableName = tableName;
- // This class can not be deserialized by kryoSerializer,
- // Throw an exception message `com.esotericsoftware.kryo.KryoException:
- // java.lang.UnsupportedOperationException` ,
- // because fields and primaryKeys is an
- // unmodifiableList. So we need to ensure that List is a modifiable
list.
- this.fields = new ArrayList<>(fields);
- this.primaryKeys = new ArrayList<>(primaryKeys);
+ this.cdcSchema = cdcSchema == null ? CdcSchema.newBuilder().build() :
cdcSchema;
this.cdcRecord = cdcRecord;
}
@@ -66,21 +57,26 @@ public class RichCdcMultiplexRecord implements Serializable
{
return tableName;
}
- public List<DataField> fields() {
- return fields;
+ public CdcSchema cdcSchema() {
+ return cdcSchema;
}
- public List<String> primaryKeys() {
- return primaryKeys;
+ public Schema buildSchema() {
+ return new Schema(
+ cdcSchema.fields(),
+ Collections.emptyList(),
+ cdcSchema.primaryKeys(),
+ Collections.emptyMap(),
+ cdcSchema.comment());
}
public RichCdcRecord toRichCdcRecord() {
- return new RichCdcRecord(cdcRecord, fields);
+ return new RichCdcRecord(cdcRecord, cdcSchema);
}
@Override
public int hashCode() {
- return Objects.hash(databaseName, tableName, fields, primaryKeys,
cdcRecord);
+ return Objects.hash(databaseName, tableName, cdcSchema, cdcRecord);
}
@Override
@@ -94,8 +90,7 @@ public class RichCdcMultiplexRecord implements Serializable {
RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
return Objects.equals(databaseName, that.databaseName)
&& Objects.equals(tableName, that.tableName)
- && Objects.equals(fields, that.fields)
- && Objects.equals(primaryKeys, that.primaryKeys)
+ && Objects.equals(cdcSchema, that.cdcSchema)
&& Objects.equals(cdcRecord, that.cdcRecord);
}
@@ -106,10 +101,8 @@ public class RichCdcMultiplexRecord implements
Serializable {
+ databaseName
+ ", tableName="
+ tableName
- + ", fields="
- + fields
- + ", primaryKeys="
- + primaryKeys
+ + ", cdcSchema="
+ + cdcSchema
+ ", cdcRecord="
+ cdcRecord
+ '}';
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 47367c4234..4ae4da6706 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,10 +111,8 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
}
@Override
- public List<DataField> parseSchemaChange() {
- return shouldSynchronizeCurrentTable
- ? currentParser.parseSchemaChange()
- : Collections.emptyList();
+ public CdcSchema parseSchemaChange() {
+ return shouldSynchronizeCurrentTable ?
currentParser.parseSchemaChange() : null;
}
@Override
@@ -205,7 +202,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private boolean shouldCreateCurrentTable() {
return shouldSynchronizeCurrentTable
- && !record.fields().isEmpty()
+ && !record.cdcSchema().fields().isEmpty()
&& createdTables.add(parseTableName());
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 04b86fea56..73b156f0fd 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -19,19 +19,15 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
/** A change message contains schema and data. */
@Experimental
@@ -40,11 +36,11 @@ public class RichCdcRecord implements Serializable {
private static final long serialVersionUID = 1L;
private final CdcRecord cdcRecord;
- private final List<DataField> fields;
+ private final CdcSchema cdcSchema;
- public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
+ public RichCdcRecord(CdcRecord cdcRecord, CdcSchema cdcSchema) {
this.cdcRecord = cdcRecord;
- this.fields = fields;
+ this.cdcSchema = cdcSchema;
}
public boolean hasPayload() {
@@ -55,8 +51,8 @@ public class RichCdcRecord implements Serializable {
return cdcRecord.kind();
}
- public List<DataField> fields() {
- return fields;
+ public CdcSchema cdcSchema() {
+ return cdcSchema;
}
public CdcRecord toCdcRecord() {
@@ -72,34 +68,32 @@ public class RichCdcRecord implements Serializable {
return false;
}
RichCdcRecord that = (RichCdcRecord) o;
- return cdcRecord == that.cdcRecord && Objects.equals(fields,
that.fields);
+ return cdcRecord == that.cdcRecord && Objects.equals(cdcSchema,
that.cdcSchema);
}
@Override
public int hashCode() {
- return Objects.hash(cdcRecord, fields);
+ return Objects.hash(cdcRecord, cdcSchema);
}
@Override
public String toString() {
- return "{" + "cdcRecord=" + cdcRecord + ", fields=" + fields + '}';
+ return "{" + "cdcRecord=" + cdcRecord + ", cdcSchema=" + cdcSchema +
'}';
}
public static Builder builder(RowKind kind) {
- return new Builder(kind, new AtomicInteger(-1));
+ return new Builder(kind);
}
/** Builder for {@link RichCdcRecord}. */
public static class Builder {
private final RowKind kind;
- private final AtomicInteger fieldId;
- private final List<DataField> fields = new ArrayList<>();
+ private final CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
private final Map<String, String> data = new HashMap<>();
- public Builder(RowKind kind, AtomicInteger fieldId) {
+ public Builder(RowKind kind) {
this.kind = kind;
- this.fieldId = fieldId;
}
public Builder field(String name, DataType type, String value) {
@@ -108,13 +102,14 @@ public class RichCdcRecord implements Serializable {
public Builder field(
String name, DataType type, String value, @Nullable String
description) {
- fields.add(new DataField(fieldId.incrementAndGet(), name, type,
description));
+ schemaBuilder.column(name, type, description);
data.put(name, value);
return this;
}
public RichCdcRecord build() {
- return new RichCdcRecord(new CdcRecord(kind, data), fields);
+
+ return new RichCdcRecord(new CdcRecord(kind, data),
schemaBuilder.build());
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
index 01a002214e..ba6ecbf4af 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.types.DataField;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -32,15 +31,19 @@ public class RichEventParser implements
EventParser<RichCdcRecord> {
private final LinkedHashMap<String, DataField> previousDataFields = new
LinkedHashMap<>();
+ private String previousComment;
+
@Override
public void setRawEvent(RichCdcRecord rawEvent) {
this.record = rawEvent;
}
@Override
- public List<DataField> parseSchemaChange() {
- List<DataField> change = new ArrayList<>();
- record.fields()
+ public CdcSchema parseSchemaChange() {
+ CdcSchema.Builder change = CdcSchema.newBuilder();
+ CdcSchema recordedSchema = record.cdcSchema();
+ recordedSchema
+ .fields()
.forEach(
dataField -> {
DataField previous =
previousDataFields.get(dataField.name());
@@ -49,10 +52,15 @@ public class RichEventParser implements
EventParser<RichCdcRecord> {
// so the comparison should not include the ID.
if (!DataField.dataFieldEqualsIgnoreId(previous,
dataField)) {
previousDataFields.put(dataField.name(),
dataField);
- change.add(dataField);
+ change.column(dataField);
}
});
- return change;
+
+ if (recordedSchema.comment() != null &&
!recordedSchema.comment().equals(previousComment)) {
+ previousComment = recordedSchema.comment();
+ change.comment(recordedSchema.comment());
+ }
+ return change.build();
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 93e22f1e62..363d747d3b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -38,14 +38,14 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
- * A {@link ProcessFunction} to handle schema changes. New schema is
represented by a list of {@link
- * DataField}s.
+ * A {@link ProcessFunction} to handle schema changes. New schema is
represented by a {@link
+ * CdcSchema}.
*
* <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link
ProcessFunction} must
* be 1.
*/
public class UpdatedDataFieldsProcessFunction
- extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> {
+ extends UpdatedDataFieldsProcessFunctionBase<CdcSchema, Void> {
private final SchemaManager schemaManager;
@@ -65,20 +65,23 @@ public class UpdatedDataFieldsProcessFunction
}
@Override
- public void processElement(
- List<DataField> updatedDataFields, Context context,
Collector<Void> collector)
+ public void processElement(CdcSchema updatedSchema, Context context,
Collector<Void> collector)
throws Exception {
List<DataField> actualUpdatedDataFields =
- updatedDataFields.stream()
+ updatedSchema.fields().stream()
.filter(
dataField ->
!latestDataFieldContain(new
FieldIdentifier(dataField)))
.collect(Collectors.toList());
- if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
+ if (CollectionUtils.isEmpty(actualUpdatedDataFields) &&
updatedSchema.comment() == null) {
return;
}
- for (SchemaChange schemaChange :
- extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
+ CdcSchema actualUpdatedSchema =
+ new CdcSchema(
+ actualUpdatedDataFields,
+ updatedSchema.primaryKeys(),
+ updatedSchema.comment());
+ for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
actualUpdatedSchema)) {
applySchemaChange(schemaManager, schemaChange, identifier);
}
/*
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index e544848036..657b025bf5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -148,6 +148,8 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
catalog.alterTable(identifier, schemaChange, false);
+ } else if (schemaChange instanceof SchemaChange.UpdateComment) {
+ catalog.alterTable(identifier, schemaChange, false);
} else {
throw new UnsupportedOperationException(
"Unsupported schema change class "
@@ -219,8 +221,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
protected List<SchemaChange> extractSchemaChanges(
- SchemaManager schemaManager, List<DataField> updatedDataFields) {
- RowType oldRowType = schemaManager.latest().get().logicalRowType();
+ SchemaManager schemaManager, CdcSchema updatedSchema) {
+ TableSchema oldTableSchema = schemaManager.latest().get();
+ RowType oldRowType = oldTableSchema.logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
oldFields.put(oldField.name(), oldField);
@@ -232,7 +235,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);
List<SchemaChange> result = new ArrayList<>();
- for (DataField newField : updatedDataFields) {
+ for (DataField newField : updatedSchema.fields()) {
String newFieldName =
StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
if (oldFields.containsKey(newFieldName)) {
DataField oldField = oldFields.get(newFieldName);
@@ -268,6 +271,12 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
newFieldName, newField.type(),
newField.description(), null));
}
}
+
+ if (updatedSchema.comment() != null
+ && !updatedSchema.comment().equals(oldTableSchema.comment())) {
+ // update table comment
+ result.add(SchemaChange.updateComment(updatedSchema.comment()));
+ }
return result;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 8d071150e0..8167441fb6 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -35,7 +36,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
@@ -53,123 +53,128 @@ import java.util.List;
/** Used to test schema evolution related logic. */
public class SchemaEvolutionTest extends TableTestBase {
- private static List<List<DataField>> prepareData() {
- List<DataField> upField1 =
- Arrays.asList(
- new DataField(0, "col_0", new VarCharType(), "test
description."),
- new DataField(1, "col_1", new IntType(), "test
description."),
- new DataField(2, "col_2", new IntType(), "test
description."),
- new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
- new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
- new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
- new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
- new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
- new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
- new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
- new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
- new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
- new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
- new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
- new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
- new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
- new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
- new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
- new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
- new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
- new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
- List<DataField> upField2 =
- Arrays.asList(
- new DataField(0, "col_0", new VarCharType(), "test
description."),
- new DataField(1, "col_1", new BigIntType(), "test
description."),
- new DataField(2, "col_2", new IntType(), "test
description."),
- new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
- new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
- new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
- new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
- new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
- new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
- new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
- new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
- new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
- new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
- new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
- new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
- new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
- new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
- new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
- new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
- new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
- new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
- List<DataField> upField3 =
- Arrays.asList(
- new DataField(0, "col_0", new VarCharType(), "test
description."),
- new DataField(1, "col_1", new BigIntType(), "test
description."),
- new DataField(2, "col_2", new IntType(), "test
description 2."),
- new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
- new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
- new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
- new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
- new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
- new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
- new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
- new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
- new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
- new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
- new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
- new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
- new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
- new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
- new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
- new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
- new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
- new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
- List<DataField> upField4 =
- Arrays.asList(
- new DataField(0, "col_0", new VarCharType(), "test
description."),
- new DataField(1, "col_1", new BigIntType(), "test
description."),
- new DataField(2, "col_2", new IntType(), "test
description."),
- new DataField(3, "col_3_1", new VarCharType(),
"Someone's desc."),
- new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
- new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
- new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
- new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
- new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
- new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
- new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
- new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
- new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
- new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
- new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
- new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
- new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
- new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
- new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
- new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
- new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
- List<DataField> upField5 =
- Arrays.asList(
- new DataField(0, "col_0", new VarCharType(), "test
description."),
- new DataField(1, "col_1", new BigIntType(), "test
description."),
- new DataField(2, "col_2_1", new BigIntType(), "test
description 2."),
- new DataField(3, "col_3", new VarCharType(),
"Someone's desc."),
- new DataField(4, "col_4", new VarCharType(),
"Someone's desc."),
- new DataField(5, "col_5", new VarCharType(),
"Someone's desc."),
- new DataField(6, "col_6", new DecimalType(),
"Someone's desc."),
- new DataField(7, "col_7", new VarCharType(),
"Someone's desc."),
- new DataField(8, "col_8", new VarCharType(),
"Someone's desc."),
- new DataField(9, "col_9", new VarCharType(),
"Someone's desc."),
- new DataField(10, "col_10", new VarCharType(),
"Someone's desc."),
- new DataField(11, "col_11", new VarCharType(),
"Someone's desc."),
- new DataField(12, "col_12", new DoubleType(),
"Someone's desc."),
- new DataField(13, "col_13", new VarCharType(),
"Someone's desc."),
- new DataField(14, "col_14", new VarCharType(),
"Someone's desc."),
- new DataField(15, "col_15", new VarCharType(),
"Someone's desc."),
- new DataField(16, "col_16", new VarCharType(),
"Someone's desc."),
- new DataField(17, "col_17", new VarCharType(),
"Someone's desc."),
- new DataField(18, "col_18", new VarCharType(),
"Someone's desc."),
- new DataField(19, "col_19", new VarCharType(),
"Someone's desc."),
- new DataField(20, "col_20", new VarCharType(),
"Someone's desc."));
- return Arrays.asList(upField1, upField2, upField3, upField4, upField5);
+ private static List<CdcSchema> prepareData() {
+ CdcSchema upSchema1 =
+ CdcSchema.newBuilder()
+ .column("col_0", new VarCharType(), "test
description.")
+ .column("col_1", new IntType(), "test description.")
+ .column("col_2", new IntType(), "test description.")
+ .column("col_3", new VarCharType(), "Someone's desc.")
+ .column("col_4", new VarCharType(), "Someone's desc.")
+ .column("col_5", new VarCharType(), "Someone's desc.")
+ .column("col_6", new DecimalType(), "Someone's desc.")
+ .column("col_7", new VarCharType(), "Someone's desc.")
+ .column("col_8", new VarCharType(), "Someone's desc.")
+ .column("col_9", new VarCharType(), "Someone's desc.")
+ .column("col_10", new VarCharType(), "Someone's desc.")
+ .column("col_11", new VarCharType(), "Someone's desc.")
+ .column("col_12", new DoubleType(), "Someone's desc.")
+ .column("col_13", new VarCharType(), "Someone's desc.")
+ .column("col_14", new VarCharType(), "Someone's desc.")
+ .column("col_15", new VarCharType(), "Someone's desc.")
+ .column("col_16", new VarCharType(), "Someone's desc.")
+ .column("col_17", new VarCharType(), "Someone's desc.")
+ .column("col_18", new VarCharType(), "Someone's desc.")
+ .column("col_19", new VarCharType(), "Someone's desc.")
+ .column("col_20", new VarCharType(), "Someone's desc.")
+ .build();
+ CdcSchema upSchema2 =
+ CdcSchema.newBuilder()
+ .column("col_0", new VarCharType(), "test
description.")
+ .column("col_1", new BigIntType(), "test description.")
+ .column("col_2", new IntType(), "test description.")
+ .column("col_3", new VarCharType(), "Someone's desc.")
+ .column("col_4", new VarCharType(), "Someone's desc.")
+ .column("col_5", new VarCharType(), "Someone's desc.")
+ .column("col_6", new DecimalType(), "Someone's desc.")
+ .column("col_7", new VarCharType(), "Someone's desc.")
+ .column("col_8", new VarCharType(), "Someone's desc.")
+ .column("col_9", new VarCharType(), "Someone's desc.")
+ .column("col_10", new VarCharType(), "Someone's desc.")
+ .column("col_11", new VarCharType(), "Someone's desc.")
+ .column("col_12", new DoubleType(), "Someone's desc.")
+ .column("col_13", new VarCharType(), "Someone's desc.")
+ .column("col_14", new VarCharType(), "Someone's desc.")
+ .column("col_15", new VarCharType(), "Someone's desc.")
+ .column("col_16", new VarCharType(), "Someone's desc.")
+ .column("col_17", new VarCharType(), "Someone's desc.")
+ .column("col_18", new VarCharType(), "Someone's desc.")
+ .column("col_19", new VarCharType(), "Someone's desc.")
+ .column("col_20", new VarCharType(), "Someone's desc.")
+ .build();
+ CdcSchema upSchema3 =
+ CdcSchema.newBuilder()
+ .column("col_0", new VarCharType(), "test
description.")
+ .column("col_1", new BigIntType(), "test description.")
+ .column("col_2", new IntType(), "test description 2.")
+ .column("col_3", new VarCharType(), "Someone's desc.")
+ .column("col_4", new VarCharType(), "Someone's desc.")
+ .column("col_5", new VarCharType(), "Someone's desc.")
+ .column("col_6", new DecimalType(), "Someone's desc.")
+ .column("col_7", new VarCharType(), "Someone's desc.")
+ .column("col_8", new VarCharType(), "Someone's desc.")
+ .column("col_9", new VarCharType(), "Someone's desc.")
+ .column("col_10", new VarCharType(), "Someone's desc.")
+ .column("col_11", new VarCharType(), "Someone's desc.")
+ .column("col_12", new DoubleType(), "Someone's desc.")
+ .column("col_13", new VarCharType(), "Someone's desc.")
+ .column("col_14", new VarCharType(), "Someone's desc.")
+ .column("col_15", new VarCharType(), "Someone's desc.")
+ .column("col_16", new VarCharType(), "Someone's desc.")
+ .column("col_17", new VarCharType(), "Someone's desc.")
+ .column("col_18", new VarCharType(), "Someone's desc.")
+ .column("col_19", new VarCharType(), "Someone's desc.")
+ .column("col_20", new VarCharType(), "Someone's desc.")
+ .build();
+ CdcSchema upSchema4 =
+ CdcSchema.newBuilder()
+ .column("col_0", new VarCharType(), "test
description.")
+ .column("col_1", new BigIntType(), "test description.")
+ .column("col_2", new IntType(), "test description.")
+ .column("col_3_1", new VarCharType(), "Someone's
desc.")
+ .column("col_4", new VarCharType(), "Someone's desc.")
+ .column("col_5", new VarCharType(), "Someone's desc.")
+ .column("col_6", new DecimalType(), "Someone's desc.")
+ .column("col_7", new VarCharType(), "Someone's desc.")
+ .column("col_8", new VarCharType(), "Someone's desc.")
+ .column("col_9", new VarCharType(), "Someone's desc.")
+ .column("col_10", new VarCharType(), "Someone's desc.")
+ .column("col_11", new VarCharType(), "Someone's desc.")
+ .column("col_12", new DoubleType(), "Someone's desc.")
+ .column("col_13", new VarCharType(), "Someone's desc.")
+ .column("col_14", new VarCharType(), "Someone's desc.")
+ .column("col_15", new VarCharType(), "Someone's desc.")
+ .column("col_16", new VarCharType(), "Someone's desc.")
+ .column("col_17", new VarCharType(), "Someone's desc.")
+ .column("col_18", new VarCharType(), "Someone's desc.")
+ .column("col_19", new VarCharType(), "Someone's desc.")
+ .column("col_20", new VarCharType(), "Someone's desc.")
+ .build();
+ CdcSchema upSchema5 =
+ CdcSchema.newBuilder()
+ .column("col_0", new VarCharType(), "test
description.")
+ .column("col_1", new BigIntType(), "test description.")
+ .column("col_2_1", new BigIntType(), "test description
2.")
+ .column("col_3", new VarCharType(), "Someone's desc.")
+ .column("col_4", new VarCharType(), "Someone's desc.")
+ .column("col_5", new VarCharType(), "Someone's desc.")
+ .column("col_6", new DecimalType(), "Someone's desc.")
+ .column("col_7", new VarCharType(), "Someone's desc.")
+ .column("col_8", new VarCharType(), "Someone's desc.")
+ .column("col_9", new VarCharType(), "Someone's desc.")
+ .column("col_10", new VarCharType(), "Someone's desc.")
+ .column("col_11", new VarCharType(), "Someone's desc.")
+ .column("col_12", new DoubleType(), "Someone's desc.")
+ .column("col_13", new VarCharType(), "Someone's desc.")
+ .column("col_14", new VarCharType(), "Someone's desc.")
+ .column("col_15", new VarCharType(), "Someone's desc.")
+ .column("col_16", new VarCharType(), "Someone's desc.")
+ .column("col_17", new VarCharType(), "Someone's desc.")
+ .column("col_18", new VarCharType(), "Someone's desc.")
+ .column("col_19", new VarCharType(), "Someone's desc.")
+ .column("col_20", new VarCharType(), "Someone's desc.")
+ .build();
+ return Arrays.asList(upSchema1, upSchema2, upSchema3, upSchema4,
upSchema5);
}
private FileStoreTable table;
@@ -199,7 +204,7 @@ public class SchemaEvolutionTest extends TableTestBase {
@Test
public void testSchemaEvolution() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<List<DataField>> upDataFieldStream =
env.fromCollection(prepareData());
+ DataStream<CdcSchema> upDataFieldStream =
env.fromCollection(prepareData());
Options options = new Options();
options.set("warehouse", tempPath.toString());
final CatalogLoader catalogLoader = () ->
FlinkCatalogFactory.createPaimonCatalog(options);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
index 6ebfdb7550..7d8d83ca2e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -72,25 +71,13 @@ public class SyncDatabaseActionBaseTest {
rawData.put("field", "value");
CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
- whiteAnyDbCdcRecord =
- new RichCdcMultiplexRecord(
- ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
- blackAnyDbCdcRecord =
- new RichCdcMultiplexRecord(
- ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
- whiteCdcRecord =
- new RichCdcMultiplexRecord(
- WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
- blackCdcRecord =
- new RichCdcMultiplexRecord(
- BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
-
- whiteDbBlackTblCdcRecord =
- new RichCdcMultiplexRecord(
- WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
- blackDbWhiteTblCdcRecord =
- new RichCdcMultiplexRecord(
- BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(),
cdcData);
+ whiteAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, WHITE_TBL,
null, cdcData);
+ blackAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, BLACK_TBL,
null, cdcData);
+ whiteCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, WHITE_TBL, null,
cdcData);
+ blackCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, null,
cdcData);
+
+ whiteDbBlackTblCdcRecord = new RichCdcMultiplexRecord(WHITE_DB,
BLACK_TBL, null, cdcData);
+ blackDbWhiteTblCdcRecord = new RichCdcMultiplexRecord(BLACK_DB,
WHITE_TBL, null, cdcData);
}
@Test
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
index 78d3bcc3de..17277d5d7d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -23,10 +23,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.StringUtils;
@@ -40,8 +40,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.net.URL;
import java.nio.file.Files;
@@ -56,21 +54,20 @@ import java.util.Map;
/** Test for DebeziumBsonRecordParser. */
public class DebeziumBsonRecordParserTest {
- private static final Logger log =
LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
- private static List<CdcSourceRecord> insertList = new ArrayList<>();
- private static List<CdcSourceRecord> updateList = new ArrayList<>();
- private static List<CdcSourceRecord> deleteList = new ArrayList<>();
+ private static final List<CdcSourceRecord> insertList = new ArrayList<>();
+ private static final List<CdcSourceRecord> updateList = new ArrayList<>();
+ private static final List<CdcSourceRecord> deleteList = new ArrayList<>();
- private static ArrayList<CdcSourceRecord> bsonRecords = new ArrayList<>();
- private static ArrayList<CdcSourceRecord> jsonRecords = new ArrayList<>();
+ private static final ArrayList<CdcSourceRecord> bsonRecords = new
ArrayList<>();
+ private static final ArrayList<CdcSourceRecord> jsonRecords = new
ArrayList<>();
- private static Map<String, String> keyEvent = new HashMap<>();
+ private static final Map<String, String> keyEvent = new HashMap<>();
private static KafkaDeserializationSchema<CdcSourceRecord>
kafkaDeserializationSchema = null;
- private static Map<String, String> beforeEvent = new HashMap<>();
+ private static final Map<String, String> beforeEvent = new HashMap<>();
- private static Map<String, String> afterEvent = new HashMap<>();
+ private static final Map<String, String> afterEvent = new HashMap<>();
@BeforeAll
public static void beforeAll() throws Exception {
@@ -241,7 +238,8 @@ public class DebeziumBsonRecordParserTest {
JsonNode bsonTextNode =
new
TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
- Map<String, String> resultMap =
parser.extractRowData(bsonTextNode, RowType.builder());
+ Map<String, String> resultMap =
+ parser.extractRowData(bsonTextNode,
CdcSchema.newBuilder());
ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 60aa70c34b..767400d475 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -585,8 +585,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
IllegalArgumentException.class,
"Cannot synchronize record when database name
or table name is unknown. "
+ "Invalid record is:\n"
- + "{databaseName=null, tableName=null,
fields=[`k` STRING, `v0` STRING, `v1` STRING], "
- + "primaryKeys=[], cdcRecord=+I
{v0=five, k=5, v1=50}}"));
+ + "{databaseName=null, tableName=null,
cdcSchema=Schema{fields=[`k` STRING, `v0` STRING, `v1` STRING], primaryKeys=[],
comment=null}, cdcRecord=+I {v0=five, k=5, v1=50}}"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index aa7d0199bc..58df6ac36e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -21,15 +21,16 @@ package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
@@ -91,21 +92,26 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- checkTableSchema(
- "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
- + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"_id\"},"
- +
"{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+ Schema excepted =
+ Schema.newBuilder()
+ .comment("")
+ .column("pt", DataTypes.INT().notNull(), "primary")
+ .column("_id", DataTypes.INT().notNull(), "_id")
+ .column("v1", DataTypes.VARCHAR(10), "v1")
+ .build();
+ checkTableSchema(excepted);
try (Statement statement = getStatement()) {
testSchemaEvolutionImpl(statement);
}
}
- private void checkTableSchema(String excepted) throws Exception {
-
+ private void checkTableSchema(Schema excepted) throws Exception {
FileStoreTable table = getFileStoreTable();
-
assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields())).isEqualTo(excepted);
+ TableSchema schema = table.schema();
+ assertThat(schema.fields()).isEqualTo(excepted.fields());
+ assertThat(schema.comment()).isEqualTo(excepted.comment());
}
private void testSchemaEvolutionImpl(Statement statement) throws Exception
{
@@ -267,11 +273,15 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- checkTableSchema(
- "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
- +
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},"
- +
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},"
- +
"{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+ Schema excepted =
+ Schema.newBuilder()
+ .comment("")
+ .column("_id", DataTypes.INT().notNull(), "primary")
+ .column("v1", DataTypes.VARCHAR(10), "v1")
+ .column("v2", DataTypes.INT(), "v2")
+ .column("v3", DataTypes.VARCHAR(10), "v3")
+ .build();
+ checkTableSchema(excepted);
try (Statement statement = getStatement()) {
testSchemaEvolutionMultipleImpl(statement);
@@ -387,6 +397,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
FileStoreTable table = getFileStoreTable();
assertThat(table.comment()).hasValue("schema_evolution_comment");
statement.executeUpdate("USE " + DATABASE_NAME);
+ // alter table comment
+ statement.executeUpdate("ALTER TABLE schema_evolution_comment COMMENT
'table_comment_new'");
+
statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES
(1, 'one')");
RowType rowType =
@@ -414,10 +427,14 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]",
"+I[3, three, 30]");
waitForResult(expected, table, rowType, primaryKeys);
- checkTableSchema(
- "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
- +
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"},"
- +
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]");
+ Schema excepted =
+ Schema.newBuilder()
+ .comment("table_comment_new")
+ .column("_id", DataTypes.INT().notNull(), "primary")
+ .column("v1", DataTypes.VARCHAR(20), "v1-new")
+ .column("v2", DataTypes.INT(), "v2")
+ .build();
+ checkTableSchema(excepted);
}
@Test
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
index b202ca53c9..57ca081d5c 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
@@ -40,7 +40,6 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -56,20 +55,19 @@ public class CdcRecordSerializeITCase {
public void testCdcRecordKryoSerialize() throws Exception {
KryoSerializer<RichCdcMultiplexRecord> kr =
createFlinkKryoSerializer(RichCdcMultiplexRecord.class);
- RowType.Builder rowType = RowType.builder();
- rowType.field("id", new BigIntType());
- rowType.field("name", new VarCharType());
- rowType.field("pt", new VarCharType());
- // this is an unmodifiable list.
- List<DataField> fields = rowType.build().getFields();
- List<String> primaryKeys = Collections.singletonList("id");
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+ schemaBuilder.column("id", new BigIntType());
+ schemaBuilder.column("name", new VarCharType());
+ schemaBuilder.column("pt", new VarCharType());
+ schemaBuilder.primaryKey("id");
+ CdcSchema schema = schemaBuilder.build();
Map<String, String> recordData = new HashMap<>();
recordData.put("id", "1");
recordData.put("name", "HunterXHunter");
recordData.put("pt", "2024-06-28");
CdcRecord cdcRecord = new CdcRecord(RowKind.INSERT, recordData);
RichCdcMultiplexRecord serializeRecord =
- new RichCdcMultiplexRecord("default", "T", fields,
primaryKeys, cdcRecord);
+ new RichCdcMultiplexRecord("default", "T", schema, cdcRecord);
TestOutputView outputView = new TestOutputView();
kr.serialize(serializeRecord, outputView);
@@ -77,8 +75,7 @@ public class CdcRecordSerializeITCase {
assertThat(deserializeRecord.toRichCdcRecord().toCdcRecord()).isEqualTo(cdcRecord);
assertThat(deserializeRecord.databaseName()).isEqualTo("default");
assertThat(deserializeRecord.tableName()).isEqualTo("T");
- assertThat(deserializeRecord.primaryKeys()).isEqualTo(primaryKeys);
- assertThat(deserializeRecord.fields()).isEqualTo(fields);
+ assertThat(deserializeRecord.cdcSchema()).isEqualTo(schema);
}
@Test
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index 01d766edf1..5b964ebe6b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,8 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataField;
-
import java.io.Serializable;
import java.util.List;
@@ -29,20 +27,20 @@ public class TestCdcEvent implements Serializable {
private static final long serialVersionUID = 1L;
private final String tableName;
- private final List<DataField> updatedDataFields;
+ private final CdcSchema cdcSchema;
private final List<CdcRecord> records;
private final int keyHash;
- public TestCdcEvent(String tableName, List<DataField> updatedDataFields) {
+ public TestCdcEvent(String tableName, CdcSchema updatedSchema) {
this.tableName = tableName;
- this.updatedDataFields = updatedDataFields;
+ this.cdcSchema = updatedSchema;
this.records = null;
this.keyHash = 0;
}
public TestCdcEvent(String tableName, List<CdcRecord> records, int
keyHash) {
this.tableName = tableName;
- this.updatedDataFields = null;
+ this.cdcSchema = null;
this.records = records;
this.keyHash = keyHash;
}
@@ -51,8 +49,8 @@ public class TestCdcEvent implements Serializable {
return tableName;
}
- public List<DataField> updatedDataFields() {
- return updatedDataFields;
+ public CdcSchema cdcSchema() {
+ return cdcSchema;
}
public List<CdcRecord> records() {
@@ -67,7 +65,6 @@ public class TestCdcEvent implements Serializable {
@Override
public String toString() {
return String.format(
- "{tableName = %s, updatedDataFields = %s, records = %s}",
- tableName, updatedDataFields, records);
+ "{tableName = %s, schema = %s, records = %s}", tableName,
cdcSchema, records);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 204f8536d9..269907168e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.ObjectUtils;
import java.util.Collections;
@@ -40,8 +39,8 @@ public class TestCdcEventParser implements
EventParser<TestCdcEvent> {
}
@Override
- public List<DataField> parseSchemaChange() {
- return ObjectUtils.coalesce(raw.updatedDataFields(),
Collections.emptyList());
+ public CdcSchema parseSchemaChange() {
+ return raw.cdcSchema();
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 6a38c1c265..2601e4fc3b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -112,7 +112,7 @@ public class TestTable {
fieldNames.add(newName);
isBigInt.add(false);
}
- events.add(new TestCdcEvent(tableName,
currentDataFieldList(fieldNames, isBigInt)));
+ events.add(new TestCdcEvent(tableName,
currentSchema(fieldNames, isBigInt)));
} else {
Map<String, String> data = new HashMap<>();
int key = random.nextInt(numKeys);
@@ -158,23 +158,22 @@ public class TestTable {
}
}
- private List<DataField> currentDataFieldList(List<String> fieldNames,
List<Boolean> isBigInt) {
- List<DataField> fields = new ArrayList<>();
+ private CdcSchema currentSchema(List<String> fieldNames, List<Boolean>
isBigInt) {
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
// pt
- fields.add(initialRowType.getFields().get(0));
+ DataField ptField = initialRowType.getFields().get(0);
+ schemaBuilder.column(ptField.name(), ptField.type(),
ptField.description());
// k
- fields.add(initialRowType.getFields().get(1));
+ DataField pkField = initialRowType.getFields().get(1);
+ schemaBuilder.column(pkField.name(), pkField.type(),
pkField.description());
for (int i = 0; i < fieldNames.size(); i++) {
- fields.add(
- new DataField(
- 2 + i,
- fieldNames.get(i),
- isBigInt.get(i) ? DataTypes.BIGINT() :
DataTypes.INT()));
+ schemaBuilder.column(
+ fieldNames.get(i), isBigInt.get(i) ? DataTypes.BIGINT() :
DataTypes.INT());
}
- return fields;
+ return schemaBuilder.build();
}
public RowType initialRowType() {