This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c4fd960b [cdc] Fix ambiguous naming in CdcRecord. (#4450)
7c4fd960b is described below
commit 7c4fd960be1a680a3a05a5359590eb1f7d73cd98
Author: Fantasy-Jay <[email protected]>
AuthorDate: Tue Nov 5 14:23:25 2024 +0800
[cdc] Fix ambiguous naming in CdcRecord. (#4450)
---
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 25 +-
.../paimon/flink/sink/cdc/CdcRecordUtils.java | 10 +-
.../paimon/flink/sink/cdc/RichCdcRecord.java | 8 +-
.../cdc/CdcMultiplexRecordChannelComputerTest.java | 10 +-
.../sink/cdc/CdcRecordChannelComputerTest.java | 10 +-
.../cdc/CdcRecordKeyAndBucketExtractorTest.java | 56 ++---
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 256 ++++++++++-----------
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 88 +++----
.../apache/paimon/flink/sink/cdc/TestTable.java | 18 +-
9 files changed, 229 insertions(+), 252 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 9adca753d..b23d0d6f0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -35,11 +35,12 @@ public class CdcRecord implements Serializable {
private RowKind kind;
- private final Map<String, String> fields;
+ // field name -> value
+ private final Map<String, String> data;
- public CdcRecord(RowKind kind, Map<String, String> fields) {
+ public CdcRecord(RowKind kind, Map<String, String> data) {
this.kind = kind;
- this.fields = fields;
+ this.data = data;
}
public static CdcRecord emptyRecord() {
@@ -50,16 +51,16 @@ public class CdcRecord implements Serializable {
return kind;
}
- public Map<String, String> fields() {
- return fields;
+ public Map<String, String> data() {
+ return data;
}
public CdcRecord fieldNameLowerCase() {
- Map<String, String> newFields = new HashMap<>();
- for (Map.Entry<String, String> entry : fields.entrySet()) {
- newFields.put(entry.getKey().toLowerCase(), entry.getValue());
+ Map<String, String> newData = new HashMap<>();
+ for (Map.Entry<String, String> entry : data.entrySet()) {
+ newData.put(entry.getKey().toLowerCase(), entry.getValue());
}
- return new CdcRecord(kind, newFields);
+ return new CdcRecord(kind, newData);
}
@Override
@@ -69,16 +70,16 @@ public class CdcRecord implements Serializable {
}
CdcRecord that = (CdcRecord) o;
- return Objects.equals(kind, that.kind) && Objects.equals(fields,
that.fields);
+ return Objects.equals(kind, that.kind) && Objects.equals(data,
that.data);
}
@Override
public int hashCode() {
- return Objects.hash(kind, fields);
+ return Objects.hash(kind, data);
}
@Override
public String toString() {
- return kind.shortString() + " " + fields;
+ return kind.shortString() + " " + data;
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
index 0d192dd53..91979a2c9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java
@@ -54,7 +54,7 @@ public class CdcRecordUtils {
GenericRow genericRow = new GenericRow(dataFields.size());
for (int i = 0; i < dataFields.size(); i++) {
DataField dataField = dataFields.get(i);
- String fieldValue = record.fields().get(dataField.name());
+ String fieldValue = record.data().get(dataField.name());
if (fieldValue != null) {
genericRow.setField(
i, TypeUtils.castFromCdcValueString(fieldValue,
dataField.type()));
@@ -83,7 +83,7 @@ public class CdcRecordUtils {
List<String> fieldNames =
dataFields.stream().map(DataField::name).collect(Collectors.toList());
- for (Map.Entry<String, String> field : record.fields().entrySet()) {
+ for (Map.Entry<String, String> field : record.data().entrySet()) {
String key = field.getKey();
String value = field.getValue();
@@ -117,14 +117,14 @@ public class CdcRecordUtils {
}
public static CdcRecord fromGenericRow(GenericRow row, List<String>
fieldNames) {
- Map<String, String> fields = new HashMap<>();
+ Map<String, String> data = new HashMap<>();
for (int i = 0; i < row.getFieldCount(); i++) {
Object field = row.getField(i);
if (field != null) {
- fields.put(fieldNames.get(i), field.toString());
+ data.put(fieldNames.get(i), field.toString());
}
}
- return new CdcRecord(row.getRowKind(), fields);
+ return new CdcRecord(row.getRowKind(), data);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 7fc0c3ff7..04b86fea5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -48,7 +48,7 @@ public class RichCdcRecord implements Serializable {
}
public boolean hasPayload() {
- return !cdcRecord.fields().isEmpty();
+ return !cdcRecord.data().isEmpty();
}
public RowKind kind() {
@@ -95,7 +95,7 @@ public class RichCdcRecord implements Serializable {
private final RowKind kind;
private final AtomicInteger fieldId;
private final List<DataField> fields = new ArrayList<>();
- private final Map<String, String> fieldValues = new HashMap<>();
+ private final Map<String, String> data = new HashMap<>();
public Builder(RowKind kind, AtomicInteger fieldId) {
this.kind = kind;
@@ -109,12 +109,12 @@ public class RichCdcRecord implements Serializable {
public Builder field(
String name, DataType type, String value, @Nullable String
description) {
fields.add(new DataField(fieldId.incrementAndGet(), name, type,
description));
- fieldValues.put(name, value);
+ data.put(name, value);
return this;
}
public RichCdcRecord build() {
- return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
+ return new RichCdcRecord(new CdcRecord(kind, data), fields);
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index ce0d484f4..867cbdbae 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -163,9 +163,9 @@ public class CdcMultiplexRecordChannelComputerTest {
// assert that insert and delete records are routed into same channel
- for (Map<String, String> fields : input) {
- CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
- CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+ for (Map<String, String> data : input) {
+ CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
+ CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);
assertThat(
channelComputer.channel(
@@ -184,8 +184,8 @@ public class CdcMultiplexRecordChannelComputerTest {
// assert that channel >= 0
int numTests = random.nextInt(10) + 1;
for (int test = 0; test < numTests; test++) {
- Map<String, String> fields =
input.get(random.nextInt(input.size()));
- CdcRecord record = new CdcRecord(RowKind.INSERT, fields);
+ Map<String, String> data = input.get(random.nextInt(input.size()));
+ CdcRecord record = new CdcRecord(RowKind.INSERT, data);
int numBuckets = random.nextInt(numChannels * 4) + 1;
for (int i = 0; i < numBuckets; i++) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
index 9a19013e2..8271ad187 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputerTest.java
@@ -128,9 +128,9 @@ public class CdcRecordChannelComputerTest {
// assert that channel(record) and channel(partition, bucket) gives
the same result
- for (Map<String, String> fields : input) {
- CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, fields);
- CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, fields);
+ for (Map<String, String> data : input) {
+ CdcRecord insertRecord = new CdcRecord(RowKind.INSERT, data);
+ CdcRecord deleteRecord = new CdcRecord(RowKind.DELETE, data);
extractor.setRecord(random.nextBoolean() ? insertRecord :
deleteRecord);
BinaryRow partition = extractor.partition();
@@ -151,8 +151,8 @@ public class CdcRecordChannelComputerTest {
bucketsPerChannel.put(i, 0);
}
- Map<String, String> fields =
input.get(random.nextInt(input.size()));
- extractor.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ Map<String, String> data = input.get(random.nextInt(input.size()));
+ extractor.setRecord(new CdcRecord(RowKind.INSERT, data));
BinaryRow partition = extractor.partition();
int numBuckets = random.nextInt(numChannels * 4) + 1;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
index 8384b7155..802a3ea9d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordKeyAndBucketExtractorTest.java
@@ -87,19 +87,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
StringData.fromString(v2));
expected.setRecord(rowData);
- Map<String, String> fields = new HashMap<>();
- fields.put("pt1", pt1);
- fields.put("pt2", String.valueOf(pt2));
- fields.put("k1", String.valueOf(k1));
- fields.put("v1", String.valueOf(v1));
- fields.put("k2", k2);
- fields.put("v2", v2);
-
- actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ Map<String, String> data = new HashMap<>();
+ data.put("pt1", pt1);
+ data.put("pt2", String.valueOf(pt2));
+ data.put("k1", String.valueOf(k1));
+ data.put("v1", String.valueOf(v1));
+ data.put("k2", k2);
+ data.put("v2", v2);
+
+ actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
- actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+ actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
@@ -122,19 +122,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
null, null, k1, v1, StringData.fromString(k2),
StringData.fromString(v2));
expected.setRecord(rowData);
- Map<String, String> fields = new HashMap<>();
- fields.put("pt1", null);
- fields.put("pt2", null);
- fields.put("k1", String.valueOf(k1));
- fields.put("v1", String.valueOf(v1));
- fields.put("k2", k2);
- fields.put("v2", v2);
+ Map<String, String> data = new HashMap<>();
+ data.put("pt1", null);
+ data.put("pt2", null);
+ data.put("k1", String.valueOf(k1));
+ data.put("v1", String.valueOf(v1));
+ data.put("k2", k2);
+ data.put("v2", v2);
- actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
- actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+ actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
@@ -161,19 +161,19 @@ public class CdcRecordKeyAndBucketExtractorTest {
StringData.fromString(v2));
expected.setRecord(rowData);
- Map<String, String> fields = new HashMap<>();
- fields.put("pt1", "");
- fields.put("pt2", null);
- fields.put("k1", String.valueOf(k1));
- fields.put("v1", String.valueOf(v1));
- fields.put("k2", k2);
- fields.put("v2", v2);
+ Map<String, String> data = new HashMap<>();
+ data.put("pt1", "");
+ data.put("pt2", null);
+ data.put("k1", String.valueOf(k1));
+ data.put("v1", String.valueOf(v1));
+ data.put("k2", k2);
+ data.put("v2", v2);
- actual.setRecord(new CdcRecord(RowKind.INSERT, fields));
+ actual.setRecord(new CdcRecord(RowKind.INSERT, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
- actual.setRecord(new CdcRecord(RowKind.DELETE, fields));
+ actual.setRecord(new CdcRecord(RowKind.DELETE, data));
assertThat(actual.partition()).isEqualTo(expected.partition());
assertThat(actual.bucket()).isEqualTo(expected.bucket());
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 2a1bb4004..8c78ab853 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -172,16 +172,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
t.start();
// check that records should be processed after table is created
- Map<String, String> fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
+ Map<String, String> data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
CdcMultiplexRecord actual = runner.poll(1);
@@ -192,15 +190,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
assertThat(actual).isEqualTo(expected);
// after table is created, record should be processed immediately
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "3");
- fields.put("v", "30");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "3");
+ data.put("v", "30");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
@@ -227,16 +223,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
t.start();
// check that records should be processed after table is created
- Map<String, String> fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
+ Map<String, String> data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
CdcMultiplexRecord actual = runner.poll(1);
@@ -254,15 +248,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
assertThat(operator.writes().size()).isEqualTo(1);
// after table is created, record should be processed immediately
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "3");
- fields.put("v", "30");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "3");
+ data.put("v", "30");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
@@ -302,44 +294,38 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// check that records with compatible schema can be processed
immediately
- Map<String, String> fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
+ Map<String, String> data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
CdcMultiplexRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "2");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "2");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
- // check that records with new fields should be processed after schema
is updated
+ // check that records with new data should be processed after schema
is updated
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "3");
- fields.put("v", "30");
- fields.put("v2", "300");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "3");
+ data.put("v", "30");
+ data.put("v2", "300");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -383,34 +369,30 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// check that records with compatible schema can be processed
immediately
- Map<String, String> fields = new HashMap<>();
- fields.put("k", "1");
- fields.put("v1", "10");
- fields.put("v2", "0.625");
- fields.put("v3", "one");
- fields.put("v4", "b_one");
+ Map<String, String> data = new HashMap<>();
+ data.put("k", "1");
+ data.put("v1", "10");
+ data.put("v2", "0.625");
+ data.put("v3", "one");
+ data.put("v4", "b_one");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
CdcMultiplexRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
- // check that records with new fields should be processed after schema
is updated
+ // check that records with new data should be processed after schema
is updated
// int -> bigint
- fields = new HashMap<>();
- fields.put("k", "2");
- fields.put("v1", "12345678987654321");
- fields.put("v2", "0.25");
+ data = new HashMap<>();
+ data.put("k", "2");
+ data.put("v1", "12345678987654321");
+ data.put("v2", "0.25");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -422,15 +404,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// float -> double
- fields = new HashMap<>();
- fields.put("k", "3");
- fields.put("v1", "100");
- fields.put("v2", "1.0000000000009095");
+ data = new HashMap<>();
+ data.put("k", "3");
+ data.put("v1", "100");
+ data.put("v2", "1.0000000000009095");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -441,15 +421,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// varchar(5) -> varchar(10)
- fields = new HashMap<>();
- fields.put("k", "4");
- fields.put("v1", "40");
- fields.put("v3", "long four");
+ data = new HashMap<>();
+ data.put("k", "4");
+ data.put("v1", "40");
+ data.put("v3", "long four");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -460,15 +438,13 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// varbinary(5) -> varbinary(10)
- fields = new HashMap<>();
- fields.put("k", "5");
- fields.put("v1", "50");
- fields.put("v4", "long five~");
+ data = new HashMap<>();
+ data.put("k", "5");
+ data.put("v1", "50");
+ data.put("v4", "long five~");
expected =
CdcMultiplexRecord.fromCdcRecord(
- databaseName,
- tableId.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ databaseName, tableId.getObjectName(), new
CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -499,53 +475,53 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// check that records with compatible schema from different tables
// can be processed immediately
- Map<String, String> fields;
+ Map<String, String> data;
// first table record
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
firstTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
CdcMultiplexRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
// second table record
- fields = new HashMap<>();
- fields.put("k", "1");
- fields.put("v1", "10");
- fields.put("v2", "0.625");
- fields.put("v3", "one");
- fields.put("v4", "b_one");
+ data = new HashMap<>();
+ data.put("k", "1");
+ data.put("v1", "10");
+ data.put("v2", "0.625");
+ data.put("v3", "one");
+ data.put("v4", "b_one");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
- // check that records with new fields should be processed after schema
is updated
+ // check that records with new data should be processed after schema
is updated
// int -> bigint
SchemaManager schemaManager;
// first table
- fields = new HashMap<>();
- fields.put("pt", "1");
- fields.put("k", "123456789876543211");
- fields.put("v", "varchar");
+ data = new HashMap<>();
+ data.put("pt", "1");
+ data.put("k", "123456789876543211");
+ data.put("v", "varchar");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
firstTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -556,15 +532,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
assertThat(actual).isEqualTo(expected);
// second table
- fields = new HashMap<>();
- fields.put("k", "2");
- fields.put("v1", "12345678987654321");
- fields.put("v2", "0.25");
+ data = new HashMap<>();
+ data.put("k", "2");
+ data.put("v1", "12345678987654321");
+ data.put("v2", "0.25");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -577,15 +553,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// below are schema changes only from the second table
// float -> double
- fields = new HashMap<>();
- fields.put("k", "3");
- fields.put("v1", "100");
- fields.put("v2", "1.0000000000009095");
+ data = new HashMap<>();
+ data.put("k", "3");
+ data.put("v1", "100");
+ data.put("v2", "1.0000000000009095");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -597,15 +573,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// varchar(5) -> varchar(10)
- fields = new HashMap<>();
- fields.put("k", "4");
- fields.put("v1", "40");
- fields.put("v3", "long four");
+ data = new HashMap<>();
+ data.put("k", "4");
+ data.put("v1", "40");
+ data.put("v3", "long four");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -617,15 +593,15 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// varbinary(5) -> varbinary(10)
- fields = new HashMap<>();
- fields.put("k", "5");
- fields.put("v1", "50");
- fields.put("v4", "long five~");
+ data = new HashMap<>();
+ data.put("k", "5");
+ data.put("v1", "50");
+ data.put("v4", "long five~");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -651,33 +627,33 @@ public class CdcRecordStoreMultiWriteOperatorTest {
t.start();
// write records to two tables thus two FileStoreWrite will be created
- Map<String, String> fields;
+ Map<String, String> data;
// first table record
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
CdcMultiplexRecord expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
firstTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
// second table record
- fields = new HashMap<>();
- fields.put("k", "1");
- fields.put("v1", "10");
- fields.put("v2", "0.625");
- fields.put("v3", "one");
- fields.put("v4", "b_one");
+ data = new HashMap<>();
+ data.put("k", "1");
+ data.put("v1", "10");
+ data.put("v2", "0.625");
+ data.put("v3", "one");
+ data.put("v4", "b_one");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
secondTable.getObjectName(),
- new CdcRecord(RowKind.INSERT, fields));
+ new CdcRecord(RowKind.INSERT, data));
runner.offer(expected);
// get and check compactExecutor from two FileStoreWrite
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 9af7eabda..f3693fe40 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -106,31 +106,31 @@ public class CdcRecordStoreWriteOperatorTest {
// check that records with compatible schema can be processed
immediately
- Map<String, String> fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "1");
- fields.put("v", "10");
- CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+ Map<String, String> data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "1");
+ data.put("v", "10");
+ CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
CdcRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "2");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "2");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.take();
assertThat(actual).isEqualTo(expected);
- // check that records with new fields should be processed after schema
is updated
+ // check that records with new data should be processed after schema
is updated
- fields = new HashMap<>();
- fields.put("pt", "0");
- fields.put("k", "3");
- fields.put("v", "30");
- fields.put("v2", "300");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("pt", "0");
+ data.put("k", "3");
+ data.put("v", "30");
+ data.put("v2", "300");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -172,26 +172,26 @@ public class CdcRecordStoreWriteOperatorTest {
// check that records with compatible schema can be processed
immediately
- Map<String, String> fields = new HashMap<>();
- fields.put("k", "1");
- fields.put("v1", "10");
- fields.put("v2", "0.625");
- fields.put("v3", "one");
- fields.put("v4", "b_one");
- CdcRecord expected = new CdcRecord(RowKind.INSERT, fields);
+ Map<String, String> data = new HashMap<>();
+ data.put("k", "1");
+ data.put("v1", "10");
+ data.put("v2", "0.625");
+ data.put("v3", "one");
+ data.put("v4", "b_one");
+ CdcRecord expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
CdcRecord actual = runner.take();
assertThat(actual).isEqualTo(expected);
- // check that records with new fields should be processed after schema
is updated
+ // check that records with new data should be processed after schema
is updated
// int -> bigint
- fields = new HashMap<>();
- fields.put("k", "2");
- fields.put("v1", "12345678987654321");
- fields.put("v2", "0.25");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("k", "2");
+ data.put("v1", "12345678987654321");
+ data.put("v2", "0.25");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -203,11 +203,11 @@ public class CdcRecordStoreWriteOperatorTest {
// float -> double
- fields = new HashMap<>();
- fields.put("k", "3");
- fields.put("v1", "100");
- fields.put("v2", "1.0000000000009095");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("k", "3");
+ data.put("v1", "100");
+ data.put("v2", "1.0000000000009095");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -218,11 +218,11 @@ public class CdcRecordStoreWriteOperatorTest {
// varchar(5) -> varchar(10)
- fields = new HashMap<>();
- fields.put("k", "4");
- fields.put("v1", "40");
- fields.put("v3", "long four");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("k", "4");
+ data.put("v1", "40");
+ data.put("v3", "long four");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
@@ -233,11 +233,11 @@ public class CdcRecordStoreWriteOperatorTest {
// varbinary(5) -> varbinary(10)
- fields = new HashMap<>();
- fields.put("k", "5");
- fields.put("v1", "50");
- fields.put("v4", "long five~");
- expected = new CdcRecord(RowKind.INSERT, fields);
+ data = new HashMap<>();
+ data.put("k", "5");
+ data.put("v1", "50");
+ data.put("v4", "long five~");
+ expected = new CdcRecord(RowKind.INSERT, data);
runner.offer(expected);
actual = runner.poll(1);
assertThat(actual).isNull();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 525a05096..6a38c1c26 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -114,18 +114,18 @@ public class TestTable {
}
events.add(new TestCdcEvent(tableName,
currentDataFieldList(fieldNames, isBigInt)));
} else {
- Map<String, String> fields = new HashMap<>();
+ Map<String, String> data = new HashMap<>();
int key = random.nextInt(numKeys);
- fields.put("k", String.valueOf(key));
+ data.put("k", String.valueOf(key));
int pt = key % numPartitions;
- fields.put("pt", String.valueOf(pt));
+ data.put("pt", String.valueOf(pt));
for (int j = 0; j < fieldNames.size(); j++) {
String fieldName = fieldNames.get(j);
if (isBigInt.get(j)) {
- fields.put(fieldName,
String.valueOf(random.nextLong()));
+ data.put(fieldName, String.valueOf(random.nextLong()));
} else {
- fields.put(fieldName,
String.valueOf(random.nextInt()));
+ data.put(fieldName, String.valueOf(random.nextInt()));
}
}
@@ -140,8 +140,8 @@ public class TestTable {
shouldInsert = random.nextInt(5) > 0;
}
if (shouldInsert) {
- records.add(new CdcRecord(RowKind.INSERT, fields));
- expected.put(key, fields);
+ records.add(new CdcRecord(RowKind.INSERT, data));
+ expected.put(key, data);
}
}
// Generate test data for append table
@@ -149,8 +149,8 @@ public class TestTable {
if (expected.containsKey(key)) {
records.add(new CdcRecord(RowKind.DELETE,
expected.get(key)));
} else {
- records.add(new CdcRecord(RowKind.INSERT, fields));
- expected.put(key, fields);
+ records.add(new CdcRecord(RowKind.INSERT, data));
+ expected.put(key, data);
}
}
events.add(new TestCdcEvent(tableName, records,
Objects.hash(tableName, key)));