This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c161baa5 [flink][bug] CdcRecord#toGenericRow should respect 
CdcRecord.kind (#1034)
6c161baa5 is described below

commit 6c161baa54323c979baea4dc7b4cb906367c6a35
Author: tsreaper <[email protected]>
AuthorDate: Wed Apr 26 17:50:16 2023 +0800

    [flink][bug] CdcRecord#toGenericRow should respect CdcRecord.kind (#1034)
---
 .../org/apache/paimon/flink/sink/cdc/CdcRecord.java  | 20 ++++++--------------
 .../sink/cdc/CdcRecordKeyAndBucketExtractor.java     |  4 ++--
 .../action/cdc/mysql/MySqlSyncTableActionITCase.java |  4 +---
 .../org/apache/paimon/flink/sink/cdc/TestTable.java  | 10 ++++++++--
 4 files changed, 17 insertions(+), 21 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 265ca72ff..cfdf64817 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -49,18 +49,9 @@ public class CdcRecord implements Serializable {
         this.fields = fields;
     }
 
-    public RowKind kind() {
-        return kind;
-    }
-
-    /** Map key is the field's name, and map value is the field's value. */
-    public Map<String, String> fields() {
-        return fields;
-    }
-
     /**
      * Project {@code fields} to a {@link GenericRow}. The fields of row are 
specified by the given
-     * {@code dataFields}.
+     * {@code dataFields} and its {@link RowKind} will always be {@link 
RowKind#INSERT}.
      *
      * <p>NOTE: This method will always return a {@link GenericRow} even if 
some keys of {@code
      * fields} are not in {@code dataFields}. If you want to make sure all 
field names of {@code
@@ -69,7 +60,7 @@ public class CdcRecord implements Serializable {
      * @param dataFields {@link DataField}s of the converted {@link 
GenericRow}.
      * @return the projected {@link GenericRow}.
      */
-    public GenericRow project(List<DataField> dataFields) {
+    public GenericRow projectAsInsert(List<DataField> dataFields) {
         GenericRow genericRow = new GenericRow(dataFields.size());
         for (int i = 0; i < dataFields.size(); i++) {
             DataField dataField = dataFields.get(i);
@@ -81,11 +72,12 @@ public class CdcRecord implements Serializable {
 
     /**
      * Convert {@code fields} to a {@link GenericRow}. The fields of row are 
specified by the given
-     * {@code dataFields}.
+     * {@code dataFields} and its {@link RowKind} is determined by {@code 
kind} of this {@link
+     * CdcRecord}.
      *
      * <p>NOTE: This method requires all field names of {@code dataFields} 
existed in keys of {@code
-     * fields}. If you only want to convert some {@code fields}, use {@link 
CdcRecord#project}
-     * instead.
+     * fields}. If you only want to convert some {@code fields}, use {@link
+     * CdcRecord#projectAsInsert} instead.
      *
      * @param dataFields {@link DataField}s of the converted {@link 
GenericRow}.
      * @return if all field names of {@code dataFields} existed in keys of 
{@code fields} and all
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
index cd19e04d8..9123773a1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java
@@ -74,7 +74,7 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
     @Override
     public BinaryRow partition() {
         if (partition == null) {
-            partition = 
partitionProjection.apply(record.project(partitionFields));
+            partition = 
partitionProjection.apply(record.projectAsInsert(partitionFields));
         }
         return partition;
     }
@@ -82,7 +82,7 @@ public class CdcRecordKeyAndBucketExtractor implements 
KeyAndBucketExtractor<Cdc
     @Override
     public int bucket() {
         if (bucketKey == null) {
-            bucketKey = 
bucketKeyProjection.apply(record.project(bucketKeyFields));
+            bucketKey = 
bucketKeyProjection.apply(record.projectAsInsert(bucketKeyFields));
         }
         if (bucket == null) {
             bucket =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 99028fa5c..701390950 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -153,6 +153,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN 
v2 BIGINT");
         statement.executeUpdate(
                 "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 
70000000000)");
+        statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 
5");
         statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 
30000000000 WHERE _id = 3");
         statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN 
v2 BIGINT");
         statement.executeUpdate(
@@ -172,7 +173,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "+I[1, 2, second, NULL]",
                         "+I[2, 3, three, 30000000000]",
                         "+I[2, 4, four, NULL]",
-                        "+I[1, 5, five, 50]",
                         "+I[1, 6, six, 60]",
                         "+I[2, 7, seven, 70000000000]",
                         "+I[2, 8, eight, 80000000000]");
@@ -208,7 +208,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "+I[1, 2, second, NULL, NULL, NULL, NULL]",
                         "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
                         "+I[2, 4, four, NULL, NULL, NULL, NULL]",
-                        "+I[1, 5, five, 50, NULL, NULL, NULL]",
                         "+I[1, 6, six, 60, NULL, NULL, NULL]",
                         "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
                         "+I[2, 8, very long string, 80000000000, NULL, NULL, 
NULL]",
@@ -241,7 +240,6 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "+I[1, 2, second, NULL, NULL, NULL, NULL]",
                         "+I[2, 3, three, 30000000000, NULL, NULL, NULL]",
                         "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 
98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]",
-                        "+I[1, 5, five, 50, NULL, NULL, NULL]",
                         "+I[1, 6, six, 60, NULL, NULL, NULL]",
                         "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]",
                         "+I[2, 8, very long string, 80000000000, NULL, NULL, 
NULL]",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 650cf3ccf..e808b6171 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -125,12 +125,18 @@ public class TestTable {
                 }
 
                 List<CdcRecord> records = new ArrayList<>();
+                boolean shouldInsert = true;
                 if (expected.containsKey(key)) {
                     records.add(new CdcRecord(RowKind.DELETE, 
expected.get(key)));
+                    expected.remove(key);
+                    // 20% chance to only delete without insert
+                    shouldInsert = random.nextInt(5) > 0;
+                }
+                if (shouldInsert) {
+                    records.add(new CdcRecord(RowKind.INSERT, fields));
+                    expected.put(key, fields);
                 }
-                records.add(new CdcRecord(RowKind.INSERT, fields));
                 events.add(new TestCdcEvent(tableName, records, 
Objects.hash(tableName, key)));
-                expected.put(key, fields);
             }
         }
     }

Reply via email to