This is an automated email from the ASF dual-hosted git repository. JingsongLi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c8c8d8da58c2b471fd040e4222d2074ff220f91e Author: JingsongLi <[email protected]> AuthorDate: Sun May 24 19:13:37 2026 +0800 [hotfix] Fix tests error in master --- .../cdc/CdcDynamicTableParsingProcessFunction.java | 17 +++++--------- .../cdc/RichCdcMultiplexRecordEventParser.java | 5 ++++- .../paimon/flink/DynamicBucketTableITCase.java | 26 +++++++++++++++++----- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java index f6c1b4733f..d56963731f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java @@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.Table; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.typeinfo.TypeHint; @@ -104,16 +103,12 @@ public class CdcDynamicTableParsingProcessFunction<T> extends ProcessFunction<T, schema -> { Identifier identifier = new Identifier(database, tableName); try { - Table ignore = catalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { - try { - catalog.createTable(identifier, schema, true); - } catch (Exception ex) { - LOG.error( - "Cannot create newly added Paimon table {}", - identifier.getFullName(), - ex); - } + catalog.createTable(identifier, schema, true); + } catch (Exception e) { + LOG.error( + "Cannot create newly added Paimon table {}", + identifier.getFullName(), + e); } }); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java index f219aec903..f14f873fed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java @@ -50,6 +50,7 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul @Nullable private final Pattern dbIncludingPattern; @Nullable private final Pattern dbExcludingPattern; private final TableNameConverter tableNameConverter; + private final Set<String> createdTables = new HashSet<>(); private final Map<String, RichEventParser> parsers = new HashMap<>(); private final Set<String> includedTables = new HashSet<>(); @@ -198,6 +199,8 @@ public class RichCdcMultiplexRecordEventParser implements EventParser<RichCdcMul } private boolean shouldCreateCurrentTable() { - return shouldSynchronizeCurrentTable && !record.cdcSchema().fields().isEmpty(); + return shouldSynchronizeCurrentTable + && !record.cdcSchema().fields().isEmpty() + && createdTables.add(parseTableName()); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java index 30506495f6..d561450f04 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DynamicBucketTableITCase.java @@ -86,19 +86,33 @@ public class DynamicBucketTableITCase extends CatalogITCaseBase { @Test public void testWriteWithAssignerParallelism() { sql( - "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='3') */ " - + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)"); + "CREATE TABLE T_INIT3 (" + + "pt INT, pk INT, v INT, " + + "PRIMARY KEY (pt, pk) NOT ENFORCED" + + ") PARTITIONED BY (pt) WITH (" + + " 'bucket'='-1', " + + " 'dynamic-bucket.target-row-num'='3', " + + " 'dynamic-bucket.initial-buckets'='3' " + + ")"); + sql("INSERT INTO T_INIT3 VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)"); // initial-buckets is 3, but parallelism is 2, will use 2 - assertThat(sql("SELECT DISTINCT bucket FROM T$files")) + assertThat(sql("SELECT DISTINCT bucket FROM T_INIT3$files")) .containsExactlyInAnyOrder(Row.of(0), Row.of(1)); } @Test public void testWriteWithAssignerParallelism1() { sql( - "INSERT INTO T /*+ OPTIONS('dynamic-bucket.initial-buckets'='1') */ " - + "VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)"); - assertThat(sql("SELECT DISTINCT bucket FROM T$files")) + "CREATE TABLE T_INIT1 (" + + "pt INT, pk INT, v INT, " + + "PRIMARY KEY (pt, pk) NOT ENFORCED" + + ") PARTITIONED BY (pt) WITH (" + + " 'bucket'='-1', " + + " 'dynamic-bucket.target-row-num'='3', " + + " 'dynamic-bucket.initial-buckets'='1' " + + ")"); + sql("INSERT INTO T_INIT1 VALUES (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5)"); + assertThat(sql("SELECT DISTINCT bucket FROM T_INIT1$files")) .containsExactlyInAnyOrder(Row.of(0), Row.of(1)); }
