This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6c161baa5 [flink][bug] CdcRecord#toGenericRow should respect
CdcRecord.kind (#1034)
6c161baa5 is described below
commit 6c161baa54323c979baea4dc7b4cb906367c6a35
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 26 17:50:16 2023 +0800
[flink][bug] CdcRecord#toGenericRow should respect CdcRecord.kind (#1034)
---
.../org/apache/paimon/flink/sink/cdc/CdcRecord.java | 20 ++++++--------------
.../sink/cdc/CdcRecordKeyAndBucketExtractor.java | 4 ++--
.../action/cdc/mysql/MySqlSyncTableActionITCase.java | 4 +---
.../org/apache/paimon/flink/sink/cdc/TestTable.java | 10 ++++++++--
4 files changed, 17 insertions(+), 21 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 265ca72ff..cfdf64817 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -49,18 +49,9 @@ public class CdcRecord implements Serializable {
this.fields = fields;
}
- public RowKind kind() {
- return kind;
- }
-
- /** Map key is the field's name, and map value is the field's value. */
- public Map<String, String> fields() {
- return fields;
- }
-
/**
* Project {@code fields} to a {@link GenericRow}. The fields of row are
specified by the given
- * {@code dataFields}.
+ * {@code dataFields} and its {@link RowKind} will always be {@link
RowKind#INSERT}.
*
* <p>NOTE: This method will always return a {@link GenericRow} even if
some keys of {@code
* fields} are not in {@code dataFields}. If you want to make sure all
field names of {@code
@@ -69,7 +60,7 @@ public class CdcRecord implements Serializable {
* @param dataFields {@link DataField}s of the converted {@link
GenericRow}.
* @return the projected {@link GenericRow}.
*/
- public GenericRow project(List<DataField> dataFields) {
+ public GenericRow projectAsInsert(List<DataField> dataFields) {
GenericRow genericRow = new GenericRow(dataFields.size());
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
@@ -81,11 +72,12 @@ public class CdcRecord implements Serializable {
/**
* Convert {@code fields} to a {@link GenericRow}. The fields of row are
specified by the given
- * {@code dataFields}.
+ * {@code dataFields} and its {@link RowKind} is determined by {@code
kind} of this {@link
+ * CdcRecord}.
*
* <p>NOTE: This method requires all field names of {@code dataFields}
existed in keys of {@code
- * fields}. If you only want to convert some {@code fields}, use {@link
CdcRecord#project}
- * instead.
+ * fields}. If you only want to convert some {@code fields}, use {@link
+ * CdcRecord#projectAsInsert} instead.
*
* @param dataFields {@link DataField}s of the converted {@link
GenericRow}.
* @return if all field names of {@code dataFields} existed in keys of
{@code fields} and all
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index cd19e04d8..9123773a1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -74,7 +74,7 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
@Override
public BinaryRow partition() {
if (partition == null) {
- partition =
partitionProjection.apply(record.project(partitionFields));
+ partition =
partitionProjection.apply(record.projectAsInsert(partitionFields));
}
return partition;
}
@@ -82,7 +82,7 @@ public class CdcRecordKeyAndBucketExtractor implements
KeyAndBucketExtractor<Cdc
@Override
public int bucket() {
if (bucketKey == null) {
- bucketKey =
bucketKeyProjection.apply(record.project(bucketKeyFields));
+ bucketKey =
bucketKeyProjection.apply(record.projectAsInsert(bucketKeyFields));
}
if (bucket == null) {
bucket =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 99028fa5c..701390950 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -153,6 +153,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN
v2 BIGINT");
statement.executeUpdate(
"INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven',
70000000000)");
+ statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id =
5");
statement.executeUpdate("UPDATE schema_evolution_1 SET v2 =
30000000000 WHERE _id = 3");
statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN
v2 BIGINT");
statement.executeUpdate(
@@ -172,7 +173,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"+I[1, 2, second, NULL]",
"+I[2, 3, three, 30000000000]",
"+I[2, 4, four, NULL]",
- "+I[1, 5, five, 50]",
"+I[1, 6, six, 60]",
"+I[2, 7, seven, 70000000000]",
"+I[2, 8, eight, 80000000000]");
@@ -208,7 +208,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
"+I[2, 4, four, NULL, NULL, NULL, NULL]",
- "+I[1, 5, five, 50, NULL, NULL, NULL]",
"+I[1, 6, six, 60, NULL, NULL, NULL]",
"+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
"+I[2, 8, very long string, 80000000000, NULL, NULL,
NULL]",
@@ -241,7 +240,6 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"+I[1, 2, second, NULL, NULL, NULL, NULL]",
"+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
"+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46,
98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
- "+I[1, 5, five, 50, NULL, NULL, NULL]",
"+I[1, 6, six, 60, NULL, NULL, NULL]",
"+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
"+I[2, 8, very long string, 80000000000, NULL, NULL,
NULL]",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 650cf3ccf..e808b6171 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -125,12 +125,18 @@ public class TestTable {
}
List<CdcRecord> records = new ArrayList<>();
+ boolean shouldInsert = true;
if (expected.containsKey(key)) {
records.add(new CdcRecord(RowKind.DELETE,
expected.get(key)));
+ expected.remove(key);
+ // 20% chance to only delete without insert
+ shouldInsert = random.nextInt(5) > 0;
+ }
+ if (shouldInsert) {
+ records.add(new CdcRecord(RowKind.INSERT, fields));
+ expected.put(key, fields);
}
- records.add(new CdcRecord(RowKind.INSERT, fields));
events.add(new TestCdcEvent(tableName, records,
Objects.hash(tableName, key)));
- expected.put(key, fields);
}
}
}