This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7acc625f1 [FLINK-39718][pipeline][paimon] Paimon pipeline sink fails 
with distributed source when target table does not exist. (#4406)
7acc625f1 is described below

commit 7acc625f1ed9ab974f74cc60449c75886c0ca583
Author: Thorne <[email protected]>
AuthorDate: Sun May 31 14:22:46 2026 +0800

    [FLINK-39718][pipeline][paimon] Paimon pipeline sink fails with distributed 
source when target table does not exist. (#4406)
---
 .../cdc/connectors/paimon/sink/PaimonDataSink.java |  2 +-
 .../connectors/paimon/sink/PaimonHashFunction.java | 56 ++++++++++++++--------
 .../paimon/sink/PaimonHashFunctionProvider.java    |  9 +---
 .../paimon/sink/PaimonHashFunctionTest.java        | 54 ++-------------------
 4 files changed, 43 insertions(+), 78 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
index 5a95f1efd..f48879605 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
@@ -85,6 +85,6 @@ public class PaimonDataSink implements DataSink, Serializable 
{
     @Override
     public HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider(
             int parallelism) {
-        return new PaimonHashFunctionProvider(options, zoneId, parallelism);
+        return new PaimonHashFunctionProvider(zoneId, parallelism);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
index 405a07c39..b17071942 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -19,28 +19,32 @@ package org.apache.flink.cdc.connectors.paimon.sink;
 
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
 import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
 
-import org.apache.paimon.AppendOnlyFileStore;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 
 import java.io.Serializable;
 import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle 
{@link DataChangeEvent}
  * by hash of PrimaryKey.
+ *
+ * <p>Table type (append-only vs. primary-key) is inferred directly from the 
CDC {@link Schema}
+ * instead of querying the Paimon catalog. This avoids a {@code 
TableNotExistException} when the
+ * target table has not yet been created by {@code MetadataApplier}, which can 
happen in distributed
+ * pipeline topologies where pre-partitioning precedes schema coordination.
  */
 public class PaimonHashFunction implements HashFunction<DataChangeEvent>, 
Serializable {
 
@@ -52,23 +56,17 @@ public class PaimonHashFunction implements 
HashFunction<DataChangeEvent>, Serial
 
     private final int parallelism;
 
-    public PaimonHashFunction(
-            Options options, TableId tableId, Schema schema, ZoneId zoneId, 
int parallelism) {
+    public PaimonHashFunction(Schema schema, ZoneId zoneId, int parallelism) {
         this.parallelism = parallelism;
-        Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
-        FileStoreTable table;
-        try {
-            table = (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId.toString()));
-        } catch (Catalog.TableNotExistException e) {
-            throw new RuntimeException(e);
-        }
-        if (table instanceof AppendOnlyFileStore) {
+        if (schema.primaryKeys().isEmpty()) {
+            // Append-only table: spread events randomly across subtasks.
             this.fieldGetters = null;
-            channelComputer = null;
+            this.channelComputer = null;
         } else {
             this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
-            channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
-            channelComputer.setup(parallelism);
+            TableSchema tableSchema = buildTableSchema(schema);
+            this.channelComputer = new RowAssignerChannelComputer(tableSchema, 
parallelism);
+            this.channelComputer.setup(parallelism);
         }
     }
 
@@ -83,4 +81,22 @@ public class PaimonHashFunction implements 
HashFunction<DataChangeEvent>, Serial
             return ThreadLocalRandom.current().nextInt(parallelism);
         }
     }
+
+    private static TableSchema buildTableSchema(Schema schema) {
+        List<Column> columns = schema.getColumns();
+        List<DataField> dataFields = new ArrayList<>(columns.size());
+        for (int i = 0; i < columns.size(); i++) {
+            Column col = columns.get(i);
+            dataFields.add(
+                    new DataField(i, col.getName(), 
TypeUtils.toPaimonDataType(col.getType())));
+        }
+        return new TableSchema(
+                0L,
+                dataFields,
+                dataFields.size() - 1,
+                schema.partitionKeys(),
+                schema.primaryKeys(),
+                Collections.emptyMap(),
+                "");
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
index 5f641f409..eeaecb648 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
@@ -23,8 +23,6 @@ import org.apache.flink.cdc.common.function.HashFunction;
 import org.apache.flink.cdc.common.function.HashFunctionProvider;
 import org.apache.flink.cdc.common.schema.Schema;
 
-import org.apache.paimon.options.Options;
-
 import javax.annotation.Nullable;
 
 import java.time.ZoneId;
@@ -32,20 +30,17 @@ import java.time.ZoneId;
 /** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}. 
*/
 public class PaimonHashFunctionProvider implements 
HashFunctionProvider<DataChangeEvent> {
 
-    private final Options options;
-
     private final ZoneId zoneId;
 
     private final int parallelism;
 
-    public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int 
parallelism) {
-        this.options = options;
+    public PaimonHashFunctionProvider(ZoneId zoneId, int parallelism) {
         this.zoneId = zoneId;
         this.parallelism = parallelism;
     }
 
     @Override
     public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId 
tableId, Schema schema) {
-        return new PaimonHashFunction(options, tableId, schema, zoneId, 
parallelism);
+        return new PaimonHashFunction(schema, zoneId, parallelism);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index d600149d3..cfd09279f 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -18,67 +18,29 @@
 package org.apache.flink.cdc.connectors.paimon.sink;
 
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.options.Options;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
 
-import java.io.File;
 import java.io.IOException;
-import java.nio.file.Path;
 import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link PaimonHashFunction}. */
 class PaimonHashFunctionTest {
 
-    @TempDir public static Path temporaryFolder;
-
-    private Catalog catalog;
-
-    private Options catalogOptions;
-
     private static final String TEST_DATABASE = "test";
 
-    @BeforeEach
-    public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
-        catalogOptions = new Options();
-        String warehouse =
-                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
-        catalogOptions.setString("warehouse", warehouse);
-        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-        catalog.createDatabase(TEST_DATABASE, true);
-    }
-
-    @AfterEach
-    public void afterEach() throws Exception {
-        catalog.dropDatabase(TEST_DATABASE, true, true);
-        catalog.close();
-    }
-
     @Test
     public void testHashCodeForAppendOnlyTable() throws IOException {
         TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
-        Map<String, String> tableOptions = new HashMap<>();
-        MetadataApplier metadataApplier =
-                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
         Schema schema =
                 Schema.newBuilder()
                         .physicalColumn("col1", DataTypes.STRING().notNull())
@@ -86,12 +48,10 @@ class PaimonHashFunctionTest {
                         .physicalColumn("pt", DataTypes.STRING())
                         .physicalColumn("variantCol", DataTypes.VARIANT())
                         .build();
-        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
-        metadataApplier.applySchemaChange(createTableEvent);
         BinaryRecordDataGenerator generator =
                 new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
-        PaimonHashFunction hashFunction =
-                new PaimonHashFunction(catalogOptions, tableId, schema, 
ZoneId.systemDefault(), 4);
+        // No primary keys: append-only table. No catalog access required.
+        PaimonHashFunction hashFunction = new PaimonHashFunction(schema, 
ZoneId.systemDefault(), 4);
         DataChangeEvent dataChangeEvent1 =
                 DataChangeEvent.insertEvent(
                         tableId,
@@ -138,10 +98,6 @@ class PaimonHashFunctionTest {
     @Test
     void testHashCodeForFixedBucketTable() {
         TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
-        Map<String, String> tableOptions = new HashMap<>();
-        tableOptions.put("bucket", "10");
-        MetadataApplier metadataApplier =
-                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
         Schema schema =
                 Schema.newBuilder()
                         .physicalColumn("col1", DataTypes.STRING().notNull())
@@ -150,12 +106,10 @@ class PaimonHashFunctionTest {
                         .primaryKey("col1", "pt")
                         .partitionKey("pt")
                         .build();
-        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
-        metadataApplier.applySchemaChange(createTableEvent);
         BinaryRecordDataGenerator generator =
                 new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
-        PaimonHashFunction hashFunction =
-                new PaimonHashFunction(catalogOptions, tableId, schema, 
ZoneId.systemDefault(), 4);
+        // Primary keys present: table-aware hashing. No catalog access 
required.
+        PaimonHashFunction hashFunction = new PaimonHashFunction(schema, 
ZoneId.systemDefault(), 4);
         DataChangeEvent dataChangeEvent1 =
                 DataChangeEvent.insertEvent(
                         tableId,

Reply via email to