This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit be5e550c66775b0bb4c760d488cab904594850a5 Author: tsreaper <[email protected]> AuthorDate: Wed Apr 26 17:50:16 2023 +0800 [flink][bug] CdcRecord#toGenericRow should respect CdcRecord.kind (#1034) --- .../org/apache/paimon/flink/sink/cdc/CdcRecord.java | 20 ++++++-------------- .../sink/cdc/CdcRecordKeyAndBucketExtractor.java | 4 ++-- .../action/cdc/mysql/MySqlSyncTableActionITCase.java | 4 +--- .../org/apache/paimon/flink/sink/cdc/TestTable.java | 10 ++++++++-- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java index 265ca72ff..cfdf64817 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java @@ -49,18 +49,9 @@ public class CdcRecord implements Serializable { this.fields = fields; } - public RowKind kind() { - return kind; - } - - /** Map key is the field's name, and map value is the field's value. */ - public Map<String, String> fields() { - return fields; - } - /** * Project {@code fields} to a {@link GenericRow}. The fields of row are specified by the given - * {@code dataFields}. + * {@code dataFields} and its {@link RowKind} will always be {@link RowKind#INSERT}. * * <p>NOTE: This method will always return a {@link GenericRow} even if some keys of {@code * fields} are not in {@code dataFields}. If you want to make sure all field names of {@code @@ -69,7 +60,7 @@ public class CdcRecord implements Serializable { * @param dataFields {@link DataField}s of the converted {@link GenericRow}. * @return the projected {@link GenericRow}. */ - public GenericRow project(List<DataField> dataFields) { + public GenericRow projectAsInsert(List<DataField> dataFields) { GenericRow genericRow = new GenericRow(dataFields.size()); for (int i = 0; i < dataFields.size(); i++) { DataField dataField = dataFields.get(i); @@ -81,11 +72,12 @@ public class CdcRecord implements Serializable { /** * Convert {@code fields} to a {@link GenericRow}. The fields of row are specified by the given - * {@code dataFields}. + * {@code dataFields} and its {@link RowKind} is determined by {@code kind} of this {@link + * CdcRecord}. * * <p>NOTE: This method requires all field names of {@code dataFields} existed in keys of {@code - * fields}. If you only want to convert some {@code fields}, use {@link CdcRecord#project} - * instead. + * fields}. If you only want to convert some {@code fields}, use {@link + * CdcRecord#projectAsInsert} instead. * * @param dataFields {@link DataField}s of the converted {@link GenericRow}. * @return if all field names of {@code dataFields} existed in keys of {@code fields} and all diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java index cd19e04d8..9123773a1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractor.java @@ -74,7 +74,7 @@ public class CdcRecordKeyAndBucketExtractor implements KeyAndBucketExtractor<Cdc @Override public BinaryRow partition() { if (partition == null) { - partition = partitionProjection.apply(record.project(partitionFields)); + partition = partitionProjection.apply(record.projectAsInsert(partitionFields)); } return partition; } @@ -82,7 +82,7 @@ public class CdcRecordKeyAndBucketExtractor implements KeyAndBucketExtractor<Cdc @Override public int bucket() { if (bucketKey == null) { - bucketKey = bucketKeyProjection.apply(record.project(bucketKeyFields)); + bucketKey = bucketKeyProjection.apply(record.projectAsInsert(bucketKeyFields)); } if (bucket == null) { bucket = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 99028fa5c..701390950 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -153,6 +153,7 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v2 BIGINT"); statement.executeUpdate( "INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)"); + statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5"); statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3"); statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v2 BIGINT"); statement.executeUpdate( @@ -172,7 +173,6 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", - "+I[1, 5, five, 50]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]"); @@ -208,7 +208,6 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", - "+I[1, 5, five, 50, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", @@ -241,7 +240,6 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", - "+I[1, 5, five, 50, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java index 650cf3ccf..e808b6171 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java @@ -125,12 +125,18 @@ public class TestTable { } List<CdcRecord> records = new ArrayList<>(); + boolean shouldInsert = true; if (expected.containsKey(key)) { records.add(new CdcRecord(RowKind.DELETE, expected.get(key))); + expected.remove(key); + // 20% chance to only delete without insert + shouldInsert = random.nextInt(5) > 0; + } + if (shouldInsert) { + records.add(new CdcRecord(RowKind.INSERT, fields)); + expected.put(key, fields); } - records.add(new CdcRecord(RowKind.INSERT, fields)); events.add(new TestCdcEvent(tableName, records, Objects.hash(tableName, key))); - expected.put(key, fields); } } }
