This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3112c9768 [cdc] Combine mode should throw better exception to dynamic 
bucket table (#3291)
3112c9768 is described below

commit 3112c97688f1f0da6b98f961a28e0402c7adaeb4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 10:57:08 2024 +0800

    [cdc] Combine mode should throw better exception to dynamic bucket table 
(#3291)
---
 .../paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java | 9 +++++++++
 .../paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java  | 4 +++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index dd83c1a0e..9ade79ec6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
 
@@ -77,6 +78,14 @@ public class CdcMultiplexRecordChannelComputer implements 
ChannelComputer<CdcMul
                         LOG.error("Failed to get table " + id.getFullName());
                         return null;
                     }
+
+                    if (table.bucketMode() != BucketMode.FIXED) {
+                        throw new UnsupportedOperationException(
+                                String.format(
+                                        "Combine mode Sink only supports FIXED 
bucket mode, but %s is %s",
+                                        table.name(), table.bucketMode()));
+                    }
+
                     CdcRecordChannelComputer channelComputer =
                             new CdcRecordChannelComputer(table.schema());
                     channelComputer.setup(numChannels);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index af544def0..869947f65 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -190,7 +190,9 @@ public class CdcRecordStoreMultiWriteOperator
 
         if (table.bucketMode() != BucketMode.FIXED) {
             throw new UnsupportedOperationException(
-                    "Unified Sink only supports FIXED bucket mode, but is " + 
table.bucketMode());
+                    String.format(
+                            "Combine mode Sink only supports FIXED bucket 
mode, but %s is %s",
+                            table.name(), table.bucketMode()));
         }
         return table;
     }

Reply via email to