This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c4fd960b [cdc] Fix ambiguous naming in CdcRecord. (#4450)
7c4fd960b is described below

commit 7c4fd960be1a680a3a05a5359590eb1f7d73cd98
Author: Fantasy-Jay <[email protected]>
AuthorDate: Tue Nov 5 14:23:25 2024 +0800

    [cdc] Fix ambiguous naming in CdcRecord. (#4450)
---
 .../apache/paimon/flink/sink/cdc/CdcRecord.java    |  25 +-
 .../paimon/flink/sink/cdc/CdcRecordUtils.java      |  10 +-
 .../paimon/flink/sink/cdc/RichCdcRecord.java       |   8 +-
 .../cdc/CdcMultiplexRecordChannelComputerTest.java |  10 +-
 .../sink/cdc/CdcRecordChannelComputerTest.java     |  10 +-
 .../cdc/CdcRecordKeyAndBucketExtractorTest.java    |  56 ++---
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  | 256 ++++++++++-----------
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |  88 +++----
 .../apache/paimon/flink/sink/cdc/TestTable.java    |  18 +-
 9 files changed, 229 insertions(+), 252 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 9adca753d..b23d0d6f0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -35,11 +35,12 @@ public class CdcRecord implements Serializable {
 
     private RowKind kind;
 
-    private final Map<String, String> fields;
+    // field name -> value
+    private final Map<String, String> data;
 
-    public CdcRecord(RowKind kind, Map<String, String> fields) {
+    public CdcRecord(RowKind kind, Map<String, String> data) {
         this.kind = kind;
-        this.fields = fields;
+        this.data = data;
     }
 
     public static CdcRecord emptyRecord() {
@@ -50,16 +51,16 @@ public class CdcRecord implements Serializable {
         return kind;
     }
 
-    public Map<String, String> fields() {
-        return fields;
+    public Map<String, String> data() {
+        return data;
     }
 
     public CdcRecord fieldNameLowerCase() {
-        Map<String, String> newFields = new HashMap<>();
-        for (Map.Entry<String, String> entry : fields.entrySet()) {
-            newFields.put(entry.getKey().toLowerCase(), entry.getValue());
+        Map<String, String> newData = new HashMap<>();
+        for (Map.Entry<String, String> entry : data.entrySet()) {
+            newData.put(entry.getKey().toLowerCase(), entry.getValue());
         }
-        return new CdcRecord(kind, newFields);
+        return new CdcRecord(kind, newData);
     }
 
     @Override
@@ -69,16 +70,16 @@ public class CdcRecord implements Serializable {
         }
 
         CdcRecord that = (CdcRecord) o;
-        return Objects.equals(kind, that.kind) && Objects.equals(fields, 
that.fields);
+        return Objects.equals(kind, that.kind) && Objects.equals(data, 
that.data);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(kind, fields);
+        return Objects.hash(kind, data);
     }
 
     @Override
     public String toString() {
-        return kind.shortString() + " " + fields;
+        return kind.shortString() + " " + data;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index 0d192dd53..91979a2c9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -54,7 +54,7 @@ public class CdcRecordUtils {
         GenericRow genericRow = new GenericRow(dataFields.size());
         for (int i = 0; i < dataFields.size(); i++) {
             DataField dataField = dataFields.get(i);
-            String fieldValue = record.fields().get(dataField.name());
+            String fieldValue = record.data().get(dataField.name());
             if (fieldValue != null) {
                 genericRow.setField(
                         i, TypeUtils.castFromCdcValueString(fieldValue, 
dataField.type()));
@@ -83,7 +83,7 @@ public class CdcRecordUtils {
         List<String> fieldNames =
                 
dataFields.stream().map(DataField::name).collect(Collectors.toList());
 
-        for (Map.Entry<String, String> field : record.fields().entrySet()) {
+        for (Map.Entry<String, String> field : record.data().entrySet()) {
             String key = field.getKey();
             String value = field.getValue();
 
@@ -117,14 +117,14 @@ public class CdcRecordUtils {
     }
 
     public static CdcRecord fromGenericRow(GenericRow row, List<String> 
fieldNames) {
-        Map<String, String> fields = new HashMap<>();
+        Map<String, String> data = new HashMap<>();
         for (int i = 0; i < row.getFieldCount(); i++) {
             Object field = row.getField(i);
             if (field != null) {
-                fields.put(fieldNames.get(i), field.toString());
+                data.put(fieldNames.get(i), field.toString());
             }
         }
 
-        return new CdcRecord(row.getRowKind(), fields);
+        return new CdcRecord(row.getRowKind(), data);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 7fc0c3ff7..04b86fea5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -48,7 +48,7 @@ public class RichCdcRecord implements Serializable {
     }
 
     public boolean hasPayload() {
-        return !cdcRecord.fields().isEmpty();
+        return !cdcRecord.data().isEmpty();
     }
 
     public RowKind kind() {
@@ -95,7 +95,7 @@ public class RichCdcRecord implements Serializable {
         private final RowKind kind;
         private final AtomicInteger fieldId;
         private final List<DataField> fields = new ArrayList<>();
-        private final Map<String, String> fieldValues = new HashMap<>();
+        private final Map<String, String> data = new HashMap<>();
 
         public Builder(RowKind kind, AtomicInteger fieldId) {
             this.kind = kind;
@@ -109,12 +109,12 @@ public class RichCdcRecord implements Serializable {
         public Builder field(
                 String name, DataType type, String value, @Nullable String 
description) {
             fields.add(new DataField(fieldId.incrementAndGet(), name, type, 
description));
-            fieldValues.put(name, value);
+            data.put(name, value);
             return this;
         }
 
         public RichCdcRecord build() {
-            return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
+            return new RichCdcRecord(new CdcRecord(kind, data), fields);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index ce0d484f4..867cbdbae 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -163,9 +163,9 @@ public class CdcMultiplexRecordChannelComputerTest {
 
         // assert that insert and delete records are routed into same channel
 
-        for (Map<String, String> fields : input) {
-            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
-            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+        for (Map<String, String> data : input) {
+            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
+            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);
 
             assertThat(
                             channelComputer.channel(
@@ -184,8 +184,8 @@ public class CdcMultiplexRecordChannelComputerTest {
         // assert that channel >= 0
         int numTests = random.nextInt(10) + 1;
         for (int test = 0; test < numTests; test++) {
-            Map<String, String> fields = 
input.get(random.nextInt(input.size()));
-            CdcRecord record = new CdcRecord(RowKind.INSERT, fields);
+            Map<String, String> data = input.get(random.nextInt(input.size()));
+            CdcRecord record = new CdcRecord(RowKind.INSERT, data);
 
             int numBuckets = random.nextInt(numChannels * 4) + 1;
             for (int i = 0; i < numBuckets; i++) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
index 9a19013e2..8271ad187 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -128,9 +128,9 @@ public class CdcRecordChannelComputerTest {
 
         // assert that channel(record) and channel(partition, bucket) gives 
the same result
 
-        for (Map<String, String> fields : input) {
-            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
-            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+        for (Map<String, String> data : input) {
+            CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
+            CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);
 
             extractor.setRecord(random.nextBoolean() ? insertRecord : 
deleteRecord);
             BinaryRow partition = extractor.partition();
@@ -151,8 +151,8 @@ public class CdcRecordChannelComputerTest {
                 bucketsPerChannel.put(i, 0);
             }
 
-            Map<String, String> fields = 
input.get(random.nextInt(input.size()));
-            extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
+            Map<String, String> data = input.get(random.nextInt(input.size()));
+            extractor.setRecord(new CdcRecord(RowKind.INSERT, data));
             BinaryRow partition = extractor.partition();
 
             int numBuckets = random.nextInt(numChannels * 4) + 1;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
index 8384b7155..802a3ea9d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -87,19 +87,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
                             StringData.fromString(v2));
             expected.setRecord(rowData);
 
-            Map<String, String> fields = new HashMap<>();
-            fields.put("pt1", pt1);
-            fields.put("pt2", String.valueOf(pt2));
-            fields.put("k1", String.valueOf(k1));
-            fields.put("v1", String.valueOf(v1));
-            fields.put("k2", k2);
-            fields.put("v2", v2);
-
-            actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+            Map<String, String> data = new HashMap<>();
+            data.put("pt1", pt1);
+            data.put("pt2", String.valueOf(pt2));
+            data.put("k1", String.valueOf(k1));
+            data.put("v1", String.valueOf(v1));
+            data.put("k2", k2);
+            data.put("v2", v2);
+
+            actual.setRecord(new CdcRecord(RowKind.INSERT, data));
             assertThat(actual.partition()).isEqualTo(expected.partition());
             assertThat(actual.bucket()).isEqualTo(expected.bucket());
 
-            actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+            actual.setRecord(new CdcRecord(RowKind.DELETE, data));
             assertThat(actual.partition()).isEqualTo(expected.partition());
             assertThat(actual.bucket()).isEqualTo(expected.bucket());
         }
@@ -122,19 +122,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
                         null, null, k1, v1, StringData.fromString(k2), 
StringData.fromString(v2));
         expected.setRecord(rowData);
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt1", null);
-        fields.put("pt2", null);
-        fields.put("k1", String.valueOf(k1));
-        fields.put("v1", String.valueOf(v1));
-        fields.put("k2", k2);
-        fields.put("v2", v2);
+        Map<String, String> data = new HashMap<>();
+        data.put("pt1", null);
+        data.put("pt2", null);
+        data.put("k1", String.valueOf(k1));
+        data.put("v1", String.valueOf(v1));
+        data.put("k2", k2);
+        data.put("v2", v2);
 
-        actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+        actual.setRecord(new CdcRecord(RowKind.INSERT, data));
         assertThat(actual.partition()).isEqualTo(expected.partition());
         assertThat(actual.bucket()).isEqualTo(expected.bucket());
 
-        actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+        actual.setRecord(new CdcRecord(RowKind.DELETE, data));
         assertThat(actual.partition()).isEqualTo(expected.partition());
         assertThat(actual.bucket()).isEqualTo(expected.bucket());
     }
@@ -161,19 +161,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
                         StringData.fromString(v2));
         expected.setRecord(rowData);
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt1", "");
-        fields.put("pt2", null);
-        fields.put("k1", String.valueOf(k1));
-        fields.put("v1", String.valueOf(v1));
-        fields.put("k2", k2);
-        fields.put("v2", v2);
+        Map<String, String> data = new HashMap<>();
+        data.put("pt1", "");
+        data.put("pt2", null);
+        data.put("k1", String.valueOf(k1));
+        data.put("v1", String.valueOf(v1));
+        data.put("k2", k2);
+        data.put("v2", v2);
 
-        actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+        actual.setRecord(new CdcRecord(RowKind.INSERT, data));
         assertThat(actual.partition()).isEqualTo(expected.partition());
         assertThat(actual.bucket()).isEqualTo(expected.bucket());
 
-        actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+        actual.setRecord(new CdcRecord(RowKind.DELETE, data));
         assertThat(actual.partition()).isEqualTo(expected.partition());
         assertThat(actual.bucket()).isEqualTo(expected.bucket());
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 2a1bb4004..8c78ab853 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -172,16 +172,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         t.start();
 
         // check that records should be processed after table is created
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
+        Map<String, String> data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
 
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         CdcMultiplexRecord actual = runner.poll(1);
 
@@ -192,15 +190,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         assertThat(actual).isEqualTo(expected);
 
         // after table is created, record should be processed immediately
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "3");
-        fields.put("v", "30");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "3");
+        data.put("v", "30");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
@@ -227,16 +223,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         t.start();
 
         // check that records should be processed after table is created
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
+        Map<String, String> data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
 
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         CdcMultiplexRecord actual = runner.poll(1);
 
@@ -254,15 +248,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         assertThat(operator.writes().size()).isEqualTo(1);
 
         // after table is created, record should be processed immediately
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "3");
-        fields.put("v", "30");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "3");
+        data.put("v", "30");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
@@ -302,44 +294,38 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // check that records with compatible schema can be processed 
immediately
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
+        Map<String, String> data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
 
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         CdcMultiplexRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "2");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "2");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        // check that records with new fields should be processed after schema 
is updated
+        // check that records with new data should be processed after schema 
is updated
 
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "3");
-        fields.put("v", "30");
-        fields.put("v2", "300");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "3");
+        data.put("v", "30");
+        data.put("v2", "300");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -383,34 +369,30 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // check that records with compatible schema can be processed 
immediately
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("k", "1");
-        fields.put("v1", "10");
-        fields.put("v2", "0.625");
-        fields.put("v3", "one");
-        fields.put("v4", "b_one");
+        Map<String, String> data = new HashMap<>();
+        data.put("k", "1");
+        data.put("v1", "10");
+        data.put("v2", "0.625");
+        data.put("v3", "one");
+        data.put("v4", "b_one");
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         CdcMultiplexRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        // check that records with new fields should be processed after schema 
is updated
+        // check that records with new data should be processed after schema 
is updated
 
         // int -> bigint
 
-        fields = new HashMap<>();
-        fields.put("k", "2");
-        fields.put("v1", "12345678987654321");
-        fields.put("v2", "0.25");
+        data = new HashMap<>();
+        data.put("k", "2");
+        data.put("v1", "12345678987654321");
+        data.put("v2", "0.25");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -422,15 +404,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // float -> double
 
-        fields = new HashMap<>();
-        fields.put("k", "3");
-        fields.put("v1", "100");
-        fields.put("v2", "1.0000000000009095");
+        data = new HashMap<>();
+        data.put("k", "3");
+        data.put("v1", "100");
+        data.put("v2", "1.0000000000009095");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -441,15 +421,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // varchar(5) -> varchar(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "4");
-        fields.put("v1", "40");
-        fields.put("v3", "long four");
+        data = new HashMap<>();
+        data.put("k", "4");
+        data.put("v1", "40");
+        data.put("v3", "long four");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -460,15 +438,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // varbinary(5) -> varbinary(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "5");
-        fields.put("v1", "50");
-        fields.put("v4", "long five~");
+        data = new HashMap<>();
+        data.put("k", "5");
+        data.put("v1", "50");
+        data.put("v4", "long five~");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
-                        databaseName,
-                        tableId.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        databaseName, tableId.getObjectName(), new 
CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -499,53 +475,53 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         // check that records with compatible schema from different tables
         //     can be processed immediately
 
-        Map<String, String> fields;
+        Map<String, String> data;
 
         // first table record
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
 
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         firstTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         CdcMultiplexRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
         // second table record
-        fields = new HashMap<>();
-        fields.put("k", "1");
-        fields.put("v1", "10");
-        fields.put("v2", "0.625");
-        fields.put("v3", "one");
-        fields.put("v4", "b_one");
+        data = new HashMap<>();
+        data.put("k", "1");
+        data.put("v1", "10");
+        data.put("v2", "0.625");
+        data.put("v3", "one");
+        data.put("v4", "b_one");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        // check that records with new fields should be processed after schema 
is updated
+        // check that records with new data should be processed after schema 
is updated
 
         // int -> bigint
         SchemaManager schemaManager;
         // first table
-        fields = new HashMap<>();
-        fields.put("pt", "1");
-        fields.put("k", "123456789876543211");
-        fields.put("v", "varchar");
+        data = new HashMap<>();
+        data.put("pt", "1");
+        data.put("k", "123456789876543211");
+        data.put("v", "varchar");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         firstTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -556,15 +532,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         assertThat(actual).isEqualTo(expected);
 
         // second table
-        fields = new HashMap<>();
-        fields.put("k", "2");
-        fields.put("v1", "12345678987654321");
-        fields.put("v2", "0.25");
+        data = new HashMap<>();
+        data.put("k", "2");
+        data.put("v1", "12345678987654321");
+        data.put("v2", "0.25");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -577,15 +553,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         // below are schema changes only from the second table
         // float -> double
 
-        fields = new HashMap<>();
-        fields.put("k", "3");
-        fields.put("v1", "100");
-        fields.put("v2", "1.0000000000009095");
+        data = new HashMap<>();
+        data.put("k", "3");
+        data.put("v1", "100");
+        data.put("v2", "1.0000000000009095");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -597,15 +573,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // varchar(5) -> varchar(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "4");
-        fields.put("v1", "40");
-        fields.put("v3", "long four");
+        data = new HashMap<>();
+        data.put("k", "4");
+        data.put("v1", "40");
+        data.put("v3", "long four");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -617,15 +593,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
         // varbinary(5) -> varbinary(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "5");
-        fields.put("v1", "50");
-        fields.put("v4", "long five~");
+        data = new HashMap<>();
+        data.put("k", "5");
+        data.put("v1", "50");
+        data.put("v4", "long five~");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -651,33 +627,33 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         t.start();
 
         // write records to two tables thus two FileStoreWrite will be created
-        Map<String, String> fields;
+        Map<String, String> data;
 
         // first table record
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
 
         CdcMultiplexRecord expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         firstTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
 
         // second table record
-        fields = new HashMap<>();
-        fields.put("k", "1");
-        fields.put("v1", "10");
-        fields.put("v2", "0.625");
-        fields.put("v3", "one");
-        fields.put("v4", "b_one");
+        data = new HashMap<>();
+        data.put("k", "1");
+        data.put("v1", "10");
+        data.put("v2", "0.625");
+        data.put("v3", "one");
+        data.put("v4", "b_one");
         expected =
                 CdcMultiplexRecord.fromCdcRecord(
                         databaseName,
                         secondTable.getObjectName(),
-                        new CdcRecord(RowKind.INSERT, fields));
+                        new CdcRecord(RowKind.INSERT, data));
         runner.offer(expected);
 
         // get and check compactExecutor from two FileStoreWrite
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 9af7eabda..f3693fe40 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -106,31 +106,31 @@ public class CdcRecordStoreWriteOperatorTest {
 
         // check that records with compatible schema can be processed 
immediately
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "1");
-        fields.put("v", "10");
-        CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+        Map<String, String> data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "1");
+        data.put("v", "10");
+        CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         CdcRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "2");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "2");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        // check that records with new fields should be processed after schema 
is updated
+        // check that records with new data should be processed after schema 
is updated
 
-        fields = new HashMap<>();
-        fields.put("pt", "0");
-        fields.put("k", "3");
-        fields.put("v", "30");
-        fields.put("v2", "300");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("pt", "0");
+        data.put("k", "3");
+        data.put("v", "30");
+        data.put("v2", "300");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -172,26 +172,26 @@ public class CdcRecordStoreWriteOperatorTest {
 
         // check that records with compatible schema can be processed 
immediately
 
-        Map<String, String> fields = new HashMap<>();
-        fields.put("k", "1");
-        fields.put("v1", "10");
-        fields.put("v2", "0.625");
-        fields.put("v3", "one");
-        fields.put("v4", "b_one");
-        CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+        Map<String, String> data = new HashMap<>();
+        data.put("k", "1");
+        data.put("v1", "10");
+        data.put("v2", "0.625");
+        data.put("v3", "one");
+        data.put("v4", "b_one");
+        CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         CdcRecord actual = runner.take();
         assertThat(actual).isEqualTo(expected);
 
-        // check that records with new fields should be processed after schema 
is updated
+        // check that records with new data should be processed after schema 
is updated
 
         // int -> bigint
 
-        fields = new HashMap<>();
-        fields.put("k", "2");
-        fields.put("v1", "12345678987654321");
-        fields.put("v2", "0.25");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("k", "2");
+        data.put("v1", "12345678987654321");
+        data.put("v2", "0.25");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -203,11 +203,11 @@ public class CdcRecordStoreWriteOperatorTest {
 
         // float -> double
 
-        fields = new HashMap<>();
-        fields.put("k", "3");
-        fields.put("v1", "100");
-        fields.put("v2", "1.0000000000009095");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("k", "3");
+        data.put("v1", "100");
+        data.put("v2", "1.0000000000009095");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -218,11 +218,11 @@ public class CdcRecordStoreWriteOperatorTest {
 
         // varchar(5) -> varchar(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "4");
-        fields.put("v1", "40");
-        fields.put("v3", "long four");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("k", "4");
+        data.put("v1", "40");
+        data.put("v3", "long four");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
@@ -233,11 +233,11 @@ public class CdcRecordStoreWriteOperatorTest {
 
         // varbinary(5) -> varbinary(10)
 
-        fields = new HashMap<>();
-        fields.put("k", "5");
-        fields.put("v1", "50");
-        fields.put("v4", "long five~");
-        expected = new CdcRecord(RowKind.INSERT, fields);
+        data = new HashMap<>();
+        data.put("k", "5");
+        data.put("v1", "50");
+        data.put("v4", "long five~");
+        expected = new CdcRecord(RowKind.INSERT, data);
         runner.offer(expected);
         actual = runner.poll(1);
         assertThat(actual).isNull();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 525a05096..6a38c1c26 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -114,18 +114,18 @@ public class TestTable {
                 }
                 events.add(new TestCdcEvent(tableName, 
currentDataFieldList(fieldNames, isBigInt)));
             } else {
-                Map<String, String> fields = new HashMap<>();
+                Map<String, String> data = new HashMap<>();
                 int key = random.nextInt(numKeys);
-                fields.put("k", String.valueOf(key));
+                data.put("k", String.valueOf(key));
                 int pt = key % numPartitions;
-                fields.put("pt", String.valueOf(pt));
+                data.put("pt", String.valueOf(pt));
 
                 for (int j = 0; j < fieldNames.size(); j++) {
                     String fieldName = fieldNames.get(j);
                     if (isBigInt.get(j)) {
-                        fields.put(fieldName, 
String.valueOf(random.nextLong()));
+                        data.put(fieldName, String.valueOf(random.nextLong()));
                     } else {
-                        fields.put(fieldName, 
String.valueOf(random.nextInt()));
+                        data.put(fieldName, String.valueOf(random.nextInt()));
                     }
                 }
 
@@ -140,8 +140,8 @@ public class TestTable {
                         shouldInsert = random.nextInt(5) > 0;
                     }
                     if (shouldInsert) {
-                        records.add(new CdcRecord(RowKind.INSERT, fields));
-                        expected.put(key, fields);
+                        records.add(new CdcRecord(RowKind.INSERT, data));
+                        expected.put(key, data);
                     }
                 }
                 // Generate test data for append table
@@ -149,8 +149,8 @@ public class TestTable {
                     if (expected.containsKey(key)) {
                         records.add(new CdcRecord(RowKind.DELETE, 
expected.get(key)));
                     } else {
-                        records.add(new CdcRecord(RowKind.INSERT, fields));
-                        expected.put(key, fields);
+                        records.add(new CdcRecord(RowKind.INSERT, data));
+                        expected.put(key, data);
                     }
                 }
                 events.add(new TestCdcEvent(tableName, records, 
Objects.hash(tableName, key)));


Reply via email to