This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d3d7a36d4 [INLONG-7397][Sort] Fix MySql connector output two data with
the same UPDATE operation (#7398)
d3d7a36d4 is described below
commit d3d7a36d4cf85a6d51eea858a902c576b1eb9d34
Author: Schnapps <[email protected]>
AuthorDate: Wed Feb 22 18:53:58 2023 +0800
[INLONG-7397][Sort] Fix MySql connector output two data with the same
UPDATE operation (#7398)
---
.../cdc/mysql/table/MySqlReadableMetadata.java | 47 ++++++++++++++++------
1 file changed, 35 insertions(+), 12 deletions(-)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 23b80f2df..c5f339911 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -172,7 +172,7 @@ public enum MySqlReadableMetadata {
.mysqlType(getMysqlType(tableSchema))
.build();
DebeziumJson debeziumJson =
DebeziumJson.builder().after(field).source(source)
-
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
+
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(data))
.tableChange(tableSchema).build();
try {
@@ -247,7 +247,7 @@ public enum MySqlReadableMetadata {
@Override
public Object read(SourceRecord record) {
- return StringData.fromString(getCanalOpType(record));
+ return StringData.fromString(getOpType(record));
}
}),
@@ -453,7 +453,7 @@ public enum MySqlReadableMetadata {
.data(dataList).database(databaseName)
.sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
.mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
-
.type(getCanalOpType(record)).sqlType(getSqlType(tableSchema)).build();
+
.type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema)).build();
try {
return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
@@ -473,7 +473,7 @@ public enum MySqlReadableMetadata {
this.converter = converter;
}
- private static String getCanalOpType(SourceRecord record) {
+ private static String getOpType(SourceRecord record) {
String opType;
final Envelope.Operation op = Envelope.operationFor(record);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
@@ -486,15 +486,38 @@ public enum MySqlReadableMetadata {
return opType;
}
- private static String getDebeziumOpType(SourceRecord record) {
+ private static String getCanalOpType(GenericRowData record) {
String opType;
- final Envelope.Operation op = Envelope.operationFor(record);
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- opType = "c";
- } else if (op == Envelope.Operation.DELETE) {
- opType = "d";
- } else {
- opType = "u";
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "DELETE";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "INSERT";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
+ }
+ return opType;
+ }
+
+ private static String getDebeziumOpType(GenericRowData record) {
+ String opType;
+ switch (record.getRowKind()) {
+ case DELETE:
+ case UPDATE_BEFORE:
+ opType = "d";
+ break;
+ case INSERT:
+ case UPDATE_AFTER:
+ opType = "c";
+ break;
+ default:
+ throw new IllegalStateException("the record only have states
in DELETE, "
+ + "UPDATE_BEFORE, INSERT and UPDATE_AFTER");
}
return opType;
}