This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c8c8d8da58c2b471fd040e4222d2074ff220f91e
Author: JingsongLi <[email protected]>
AuthorDate: Sun May 24 19:13:37 2026 +0800

    [hotfix] Fix tests error in master
---
 .../cdc/CdcDynamicTableParsingProcessFunction.java | 17 +++++---------
 .../cdc/RichCdcMultiplexRecordEventParser.java     |  5 ++++-
 .../paimon/flink/DynamicBucketTableITCase.java     | 26 +++++++++++++++++-----
 3 files changed, 30 insertions(+), 18 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index f6c1b4733f..d56963731f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -104,16 +103,12 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
                         schema -> {
                             Identifier identifier = new Identifier(database, 
tableName);
                             try {
-                                Table ignore = catalog.getTable(identifier);
-                            } catch (Catalog.TableNotExistException e) {
-                                try {
-                                    catalog.createTable(identifier, schema, 
true);
-                                } catch (Exception ex) {
-                                    LOG.error(
-                                            "Cannot create newly added Paimon 
table {}",
-                                            identifier.getFullName(),
-                                            ex);
-                                }
+                                catalog.createTable(identifier, schema, true);
+                            } catch (Exception e) {
+                                LOG.error(
+                                        "Cannot create newly added Paimon 
table {}",
+                                        identifier.getFullName(),
+                                        e);
                             }
                         });
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index f219aec903..f14f873fed 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -50,6 +50,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     @Nullable private final Pattern dbIncludingPattern;
     @Nullable private final Pattern dbExcludingPattern;
     private final TableNameConverter tableNameConverter;
+    private final Set<String> createdTables = new HashSet<>();
 
     private final Map<String, RichEventParser> parsers = new HashMap<>();
     private final Set<String> includedTables = new HashSet<>();
@@ -198,6 +199,8 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     }
 
     private boolean shouldCreateCurrentTable() {
-        return shouldSynchronizeCurrentTable && 
!record.cdcSchema().fields().isEmpty();
+        return shouldSynchronizeCurrentTable
+                && !record.cdcSchema().fields().isEmpty()
+                && createdTables.add(parseTableName());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
index 30506495f6..d561450f04 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java
@@ -86,19 +86,33 @@ public class DynamicBucketTableITCase extends 
CatalogITCaseBase {
     @Test
     public void testWriteWithAssignerParallelism() {
         sql(
-                "INSERT INTO T /*+ 
OPTIONS('dynamic-bucket.initial-buckets'='3') */ "
-                        + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
+                "CREATE TABLE T_INIT3 ("
+                        + "pt INT, pk INT, v INT, "
+                        + "PRIMARY KEY (pt, pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3', "
+                        + " 'dynamic-bucket.initial-buckets'='3' "
+                        + ")");
+        sql("INSERT INTO T_INIT3 VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 
4, 4), (1, 5, 5)");
         // initial-buckets is 3, but parallelism is 2, will use 2
-        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+        assertThat(sql("SELECT DISTINCT bucket FROM T_INIT3$files"))
                 .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
     }
 
     @Test
     public void testWriteWithAssignerParallelism1() {
         sql(
-                "INSERT INTO T /*+ 
OPTIONS('dynamic-bucket.initial-buckets'='1') */ "
-                        + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), 
(1, 5, 5)");
-        assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
+                "CREATE TABLE T_INIT1 ("
+                        + "pt INT, pk INT, v INT, "
+                        + "PRIMARY KEY (pt, pk) NOT ENFORCED"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'bucket'='-1', "
+                        + " 'dynamic-bucket.target-row-num'='3', "
+                        + " 'dynamic-bucket.initial-buckets'='1' "
+                        + ")");
+        sql("INSERT INTO T_INIT1 VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 
4, 4), (1, 5, 5)");
+        assertThat(sql("SELECT DISTINCT bucket FROM T_INIT1$files"))
                 .containsExactlyInAnyOrder(Row.of(0), Row.of(1));
     }
 

Reply via email to