This is an automated email from the ASF dual-hosted git repository.
yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a24b34f48 [FLINK-39684] Fix PaimonWriterHelper NullPointerException
when handling REPLACE events (#4398)
a24b34f48 is described below
commit a24b34f48fe900b134b8f4a9c7081d9aea5cbed4
Author: yuxiqian <[email protected]>
AuthorDate: Mon May 18 10:39:41 2026 +0800
[FLINK-39684] Fix PaimonWriterHelper NullPointerException when handling
REPLACE events (#4398)
---
.../paimon/sink/v2/PaimonWriterHelper.java | 8 +-
.../paimon/sink/v2/PaimonWriterHelperTest.java | 85 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 1e34c689b..abed564fe 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -250,7 +250,6 @@ public class PaimonWriterHelper {
break;
}
case UPDATE:
- case REPLACE:
{
if (hasPrimaryKey) {
fullGenericRows.add(
@@ -264,6 +263,13 @@ public class PaimonWriterHelper {
dataChangeEvent.after(), fieldGetters,
RowKind.UPDATE_AFTER));
break;
}
+ case REPLACE:
+ {
+ fullGenericRows.add(
+ convertRecordDataToGenericRow(
+ dataChangeEvent.after(), fieldGetters,
RowKind.UPDATE_AFTER));
+ break;
+ }
case DELETE:
{
if (hasPrimaryKey) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
index b48761970..41accd73a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelperTest.java
@@ -200,6 +200,91 @@ class PaimonWriterHelperTest {
Assertions.assertThat(genericRow.getRowKind()).isEqualTo(RowKind.INSERT);
}
+ @Test
+ void testConvertEventToFullGenericRowsOfDataChangeTypes() {
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.STRING())
+ .physicalColumn("col2", DataTypes.STRING())
+ .build();
+ List<RecordData.FieldGetter> fieldGetters =
+ PaimonWriterHelper.createFieldGetters(schema,
ZoneId.systemDefault());
+ TableId tableId = TableId.parse("database.table");
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(),
DataTypes.STRING()));
+ BinaryRecordData beforeData =
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
BinaryStringData.fromString("old")
+ });
+ BinaryRecordData afterData =
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("1"),
BinaryStringData.fromString("new")
+ });
+
+ // INSERT: single INSERT row regardless of hasPrimaryKey
+ DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId,
afterData);
+ List<GenericRow> rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, true);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.INSERT);
+
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, false);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.INSERT);
+
+ // REPLACE: single INSERT row regardless of hasPrimaryKey (same as
INSERT)
+ dataChangeEvent = DataChangeEvent.replaceEvent(tableId, afterData,
null);
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, true);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
Assertions.assertThat(rows.get(0).getString(1)).isEqualTo(BinaryString.fromString("new"));
+
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, false);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
+ // UPDATE with primary key: UPDATE_BEFORE + UPDATE_AFTER
+ dataChangeEvent = DataChangeEvent.updateEvent(tableId, beforeData,
afterData);
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, true);
+ Assertions.assertThat(rows).hasSize(2);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+
Assertions.assertThat(rows.get(0).getString(1)).isEqualTo(BinaryString.fromString("old"));
+
Assertions.assertThat(rows.get(1).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
Assertions.assertThat(rows.get(1).getString(1)).isEqualTo(BinaryString.fromString("new"));
+
+ // UPDATE without primary key: only UPDATE_AFTER
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, false);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.UPDATE_AFTER);
+
+ // DELETE with primary key: single DELETE row
+ dataChangeEvent = DataChangeEvent.deleteEvent(tableId, beforeData);
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, true);
+ Assertions.assertThat(rows).hasSize(1);
+
Assertions.assertThat(rows.get(0).getRowKind()).isEqualTo(RowKind.DELETE);
+
+ // DELETE without primary key: empty (no rows)
+ rows =
+ PaimonWriterHelper.convertEventToFullGenericRows(
+ dataChangeEvent, fieldGetters, false);
+ Assertions.assertThat(rows).isEmpty();
+ }
+
@Test
void testConvertEventToGenericRowWithNestedRow() {
// Define the inner row type with an integer and a map