This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new e59a17006 [FLINK-37352][pipeline-connector/paimon] Supports write full 
change log to Paimon table
e59a17006 is described below

commit e59a170064051051088c3275fe44b208fbd5763a
Author: Kunni <[email protected]>
AuthorDate: Mon Mar 10 15:37:02 2025 +0800

    [FLINK-37352][pipeline-connector/paimon] Supports write full change log to 
Paimon table
    
    This closes #3935
---
 .../cdc/connectors/paimon/sink/v2/PaimonEvent.java | 30 +++++++++------
 .../sink/v2/PaimonRecordEventSerializer.java       |  7 ++--
 .../connectors/paimon/sink/v2/PaimonWriter.java    |  7 +++-
 .../paimon/sink/v2/PaimonWriterHelper.java         | 45 ++++++++++++++++++++++
 .../paimon/sink/v2/PaimonSinkITCase.java           | 13 +++++--
 5 files changed, 81 insertions(+), 21 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
index d23ca7e76..1ac14698e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEvent.java
@@ -20,35 +20,41 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 
+import java.util.List;
+
 /** Contains the data to be written for {@link PaimonWriter}. */
 public class PaimonEvent {
 
     // Identifier for the Paimon table to be written.
     Identifier tableId;
 
-    // The actual record to be written to Paimon table.
-    GenericRow genericRow;
+    // The actual records to be written to Paimon table, contains full 
changelog(before/after).
+    List<GenericRow> genericRows;
 
     // if true, means that table schema has changed right before this 
genericRow.
     boolean shouldRefreshSchema;
     int bucket;
 
-    public PaimonEvent(Identifier tableId, GenericRow genericRow) {
+    public PaimonEvent(Identifier tableId, List<GenericRow> genericRows) {
         this.tableId = tableId;
-        this.genericRow = genericRow;
+        this.genericRows = genericRows;
         this.shouldRefreshSchema = false;
     }
 
-    public PaimonEvent(Identifier tableId, GenericRow genericRow, boolean 
shouldRefreshSchema) {
+    public PaimonEvent(
+            Identifier tableId, List<GenericRow> genericRows, boolean 
shouldRefreshSchema) {
         this.tableId = tableId;
-        this.genericRow = genericRow;
+        this.genericRows = genericRows;
         this.shouldRefreshSchema = shouldRefreshSchema;
     }
 
     public PaimonEvent(
-            Identifier tableId, GenericRow genericRow, boolean 
shouldRefreshSchema, int bucket) {
+            Identifier tableId,
+            List<GenericRow> genericRows,
+            boolean shouldRefreshSchema,
+            int bucket) {
         this.tableId = tableId;
-        this.genericRow = genericRow;
+        this.genericRows = genericRows;
         this.shouldRefreshSchema = shouldRefreshSchema;
         this.bucket = bucket;
     }
@@ -69,12 +75,12 @@ public class PaimonEvent {
         this.shouldRefreshSchema = shouldRefreshSchema;
     }
 
-    public GenericRow getGenericRow() {
-        return genericRow;
+    public List<GenericRow> getGenericRows() {
+        return genericRows;
     }
 
-    public void setGenericRow(GenericRow genericRow) {
-        this.genericRow = genericRow;
+    public void setGenericRows(List<GenericRow> genericRows) {
+        this.genericRows = genericRows;
     }
 
     public int getBucket() {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
index 983a680bd..d0eceabcd 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonRecordEventSerializer.java
@@ -32,6 +32,7 @@ import org.apache.paimon.data.GenericRow;
 
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -79,11 +80,11 @@ public class PaimonRecordEventSerializer implements 
PaimonRecordSerializer<Event
             return new PaimonEvent(tableId, null, true);
         } else if (event instanceof DataChangeEvent) {
             DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-            GenericRow genericRow =
-                    PaimonWriterHelper.convertEventToGenericRow(
+            List<GenericRow> genericRows =
+                    PaimonWriterHelper.convertEventToFullGenericRows(
                             dataChangeEvent,
                             
schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
-            return new PaimonEvent(tableId, genericRow, false, bucket);
+            return new PaimonEvent(tableId, genericRows, false, bucket);
         } else {
             throw new IllegalArgumentException(
                     "failed to convert Input into PaimonEvent, unsupported 
event: " + event);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index fcf522468..d1c3fa561 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
@@ -126,7 +127,7 @@ public class PaimonWriter<InputT>
                 throw new RuntimeException(e);
             }
         }
-        if (paimonEvent.getGenericRow() != null) {
+        if (paimonEvent.getGenericRows() != null) {
             FileStoreTable table;
             table = getTable(tableId);
             if (memoryPoolFactory == null) {
@@ -155,7 +156,9 @@ public class PaimonWriter<InputT>
                                 return storeSinkWrite;
                             });
             try {
-                write.write(paimonEvent.getGenericRow(), 
paimonEvent.getBucket());
+                for (GenericRow genericRow : paimonEvent.getGenericRows()) {
+                    write.write(genericRow, paimonEvent.getBucket());
+                }
             } catch (Exception e) {
                 throw new IOException(e);
             }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
index 986125780..36bb9fbbc 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java
@@ -179,6 +179,51 @@ public class PaimonWriterHelper {
         return genericRow;
     }
 
+    /** create full {@link GenericRow}s from a {@link DataChangeEvent} for 
{@link PaimonWriter}. */
+    public static List<GenericRow> convertEventToFullGenericRows(
+            DataChangeEvent dataChangeEvent, List<RecordData.FieldGetter> 
fieldGetters) {
+        List<GenericRow> fullGenericRows = new ArrayList<>();
+        switch (dataChangeEvent.op()) {
+            case INSERT:
+                {
+                    fullGenericRows.add(
+                            convertRecordDataToGenericRow(
+                                    dataChangeEvent.after(), fieldGetters, 
RowKind.INSERT));
+                    break;
+                }
+            case UPDATE:
+            case REPLACE:
+                {
+                    fullGenericRows.add(
+                            convertRecordDataToGenericRow(
+                                    dataChangeEvent.before(), fieldGetters, 
RowKind.UPDATE_BEFORE));
+                    fullGenericRows.add(
+                            convertRecordDataToGenericRow(
+                                    dataChangeEvent.after(), fieldGetters, 
RowKind.UPDATE_AFTER));
+                    break;
+                }
+            case DELETE:
+                {
+                    fullGenericRows.add(
+                            convertRecordDataToGenericRow(
+                                    dataChangeEvent.before(), fieldGetters, 
RowKind.DELETE));
+                    break;
+                }
+            default:
+                throw new IllegalArgumentException("don't support type of " + 
dataChangeEvent.op());
+        }
+        return fullGenericRows;
+    }
+
+    private static GenericRow convertRecordDataToGenericRow(
+            RecordData recordData, List<RecordData.FieldGetter> fieldGetters, 
RowKind rowKind) {
+        GenericRow genericRow = new GenericRow(rowKind, recordData.getArity());
+        for (int i = 0; i < recordData.getArity(); i++) {
+            genericRow.setField(i, 
fieldGetters.get(i).getFieldOrNull(recordData));
+        }
+        return genericRow;
+    }
+
     /** A helper class for {@link PaimonWriter} to create FieldGetter and 
GenericRow. */
     public static class BinaryFieldDataGetter implements 
RecordData.FieldGetter {
         private final int fieldPos;
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index bc257d0db..adb43483d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -209,19 +209,24 @@ public class PaimonSinkITCase {
                         table1,
                         Arrays.asList(Tuple2.of(STRING(), "2"), 
Tuple2.of(STRING(), "2")),
                         Arrays.asList(Tuple2.of(STRING(), "2"), 
Tuple2.of(STRING(), "x"))));
-        Assertions.assertThat(fetchResults(table1))
-                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", 
"x"));
+        if (enableDeleteVector) {
+            Assertions.assertThat(fetchResults(table1))
+                    .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "2", 
"x"));
+        } else {
+            Assertions.assertThat(fetchResults(table1))
+                    
.containsExactlyInAnyOrder(Row.ofKind(RowKind.UPDATE_AFTER, "2", "x"));
+        }
 
         if (enableDeleteVector) {
             
Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName()))
                     .containsExactlyInAnyOrder(
-                            Row.ofKind(RowKind.INSERT, 1L), 
Row.ofKind(RowKind.INSERT, 3L));
+                            Row.ofKind(RowKind.INSERT, 1L), 
Row.ofKind(RowKind.INSERT, 4L));
         } else {
             
Assertions.assertThat(fetchMaxSequenceNumber(table1.getTableName()))
                     .containsExactlyInAnyOrder(
                             Row.ofKind(RowKind.INSERT, 1L),
                             Row.ofKind(RowKind.INSERT, 2L),
-                            Row.ofKind(RowKind.INSERT, 3L));
+                            Row.ofKind(RowKind.INSERT, 4L));
         }
     }
 

Reply via email to