This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new dca94bdd7 [FLINK-36790][cdc-connector][paimon] Set waitCompaction to 
true in PaimonWriter to avoid CME problem
dca94bdd7 is described below

commit dca94bdd71bf9872d403e70b5325932e94b4d555
Author: stayrascal <stayras...@users.noreply.github.com>
AuthorDate: Mon Jan 6 22:31:13 2025 +0800

    [FLINK-36790][cdc-connector][paimon] Set waitCompaction to true in 
PaimonWriter to avoid CME problem
    
    This closes  #3760
    
    Co-authored-by: wuzhiping <wuzhiping....@bytedance.com>
---
 .../connectors/paimon/sink/v2/PaimonWriter.java    |  2 +-
 .../paimon/sink/v2/PaimonSinkITCase.java           | 56 ++++++++++++----------
 2 files changed, 33 insertions(+), 25 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index aa4dd2cb2..fcf522468 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -96,7 +96,7 @@ public class PaimonWriter<InputT>
         for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
             Identifier key = entry.getKey();
             StoreSinkWrite write = entry.getValue();
-            boolean waitCompaction = false;
+            boolean waitCompaction = true;
             committables.addAll(
                     // here we set it to lastCheckpointId+1 to
                     // avoid prepareCommit the same checkpointId with the 
first round.
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 5635dcfd8..3a554ef2f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -58,7 +58,7 @@ import org.apache.paimon.options.Options;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.CsvSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -140,7 +140,7 @@ public class PaimonSinkITCase {
                 .dropDatabase(TEST_DATABASE, true, true);
     }
 
-    private List<Event> createTestEvents() throws SchemaEvolveException {
+    private List<Event> createTestEvents(boolean enableDeleteVectors) throws 
SchemaEvolveException {
         List<Event> testEvents = new ArrayList<>();
         // create table
         Schema schema =
@@ -149,6 +149,7 @@ public class PaimonSinkITCase {
                         .physicalColumn("col2", DataTypes.STRING())
                         .primaryKey("col1")
                         .option("bucket", "1")
+                        .option("deletion-vectors.enabled", 
String.valueOf(enableDeleteVectors))
                         .build();
         CreateTableEvent createTableEvent = new CreateTableEvent(table1, 
schema);
         testEvents.add(createTableEvent);
@@ -180,8 +181,8 @@ public class PaimonSinkITCase {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"filesystem", "hive"})
-    public void testSinkWithDataChange(String metastore)
+    @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, 
false"})
+    public void testSinkWithDataChange(String metastore, boolean 
enableDeleteVector)
             throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
                     Catalog.DatabaseNotExistException, SchemaEvolveException {
         initialize(metastore);
@@ -192,7 +193,7 @@ public class PaimonSinkITCase {
         Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
 
         // insert
-        for (Event event : createTestEvents()) {
+        for (Event event : createTestEvents(enableDeleteVector)) {
             writer.write(event, null);
         }
         writer.flush(false);
@@ -215,7 +216,7 @@ public class PaimonSinkITCase {
         // delete
         Event event =
                 DataChangeEvent.deleteEvent(
-                        TableId.tableId("test", "table1"),
+                        table1,
                         generator.generate(
                                 new Object[] {
                                     BinaryStringData.fromString("1"),
@@ -240,7 +241,7 @@ public class PaimonSinkITCase {
         // update
         event =
                 DataChangeEvent.updateEvent(
-                        TableId.tableId("test", "table1"),
+                        table1,
                         generator.generate(
                                 new Object[] {
                                     BinaryStringData.fromString("2"),
@@ -273,17 +274,19 @@ public class PaimonSinkITCase {
                 .collect()
                 .forEachRemaining(result::add);
         // Each commit will generate one sequence number(equal to 
checkpointId).
-        Assertions.assertEquals(
-                Arrays.asList(
-                        Row.ofKind(RowKind.INSERT, 1L),
-                        Row.ofKind(RowKind.INSERT, 2L),
-                        Row.ofKind(RowKind.INSERT, 3L)),
-                result);
+        List<Row> expected =
+                enableDeleteVector
+                        ? Collections.singletonList(Row.ofKind(RowKind.INSERT, 
3L))
+                        : Arrays.asList(
+                                Row.ofKind(RowKind.INSERT, 1L),
+                                Row.ofKind(RowKind.INSERT, 2L),
+                                Row.ofKind(RowKind.INSERT, 3L));
+        Assertions.assertEquals(expected, result);
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"filesystem", "hive"})
-    public void testSinkWithSchemaChange(String metastore)
+    @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, 
false"})
+    public void testSinkWithSchemaChange(String metastore, boolean 
enableDeleteVector)
             throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
                     Catalog.DatabaseNotExistException, SchemaEvolveException {
         initialize(metastore);
@@ -294,7 +297,7 @@ public class PaimonSinkITCase {
         Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
 
         // 1. receive only DataChangeEvents during one checkpoint
-        for (Event event : createTestEvents()) {
+        for (Event event : createTestEvents(enableDeleteVector)) {
             writer.write(event, null);
         }
         writer.flush(false);
@@ -427,8 +430,8 @@ public class PaimonSinkITCase {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"filesystem", "hive"})
-    public void testSinkWithMultiTables(String metastore)
+    @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, 
false"})
+    public void testSinkWithMultiTables(String metastore, boolean 
enableDeleteVector)
             throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
                     Catalog.DatabaseNotExistException, SchemaEvolveException {
         initialize(metastore);
@@ -437,7 +440,7 @@ public class PaimonSinkITCase {
                         catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
         PaimonWriter<Event> writer = paimonSink.createWriter(new 
MockInitContext());
         Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
-        List<Event> testEvents = createTestEvents();
+        List<Event> testEvents = createTestEvents(enableDeleteVector);
         // create table
         TableId table2 = TableId.tableId("test", "table2");
         Schema schema =
@@ -492,8 +495,8 @@ public class PaimonSinkITCase {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"filesystem", "hive"})
-    public void testDuplicateCommitAfterRestore(String metastore)
+    @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, 
false"})
+    public void testDuplicateCommitAfterRestore(String metastore, boolean 
enableDeleteVector)
             throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
                     Catalog.DatabaseNotExistException, SchemaEvolveException {
         initialize(metastore);
@@ -504,7 +507,7 @@ public class PaimonSinkITCase {
         Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
 
         // insert
-        for (Event event : createTestEvents()) {
+        for (Event event : createTestEvents(enableDeleteVector)) {
             writer.write(event, null);
         }
         writer.flush(false);
@@ -553,8 +556,13 @@ public class PaimonSinkITCase {
                 .execute()
                 .collect()
                 .forEachRemaining(result::add);
-        // 8 APPEND and 1 COMPACT
-        Assertions.assertEquals(result.size(), 9);
+        if (enableDeleteVector) {
+            // Each APPEND will trigger COMPACT once enable deletion-vectors.
+            Assertions.assertEquals(16, result.size());
+        } else {
+            // 8 APPEND and 1 COMPACT
+            Assertions.assertEquals(9, result.size());
+        }
         result.clear();
 
         tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")

Reply via email to