This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3112c9768 [cdc] Combine mode should throw better exception to dynamic
bucket table (#3291)
3112c9768 is described below
commit 3112c97688f1f0da6b98f961a28e0402c7adaeb4
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 10:57:08 2024 +0800
[cdc] Combine mode should throw better exception to dynamic bucket table
(#3291)
---
.../paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java | 9 +++++++++
.../paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java | 4 +++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
index dd83c1a0e..9ade79ec6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.ChannelComputer;
@@ -77,6 +78,14 @@ public class CdcMultiplexRecordChannelComputer implements
ChannelComputer<CdcMul
LOG.error("Failed to get table " + id.getFullName());
return null;
}
+
+ if (table.bucketMode() != BucketMode.FIXED) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Combine mode Sink only supports FIXED
bucket mode, but %s is %s",
+ table.name(), table.bucketMode()));
+ }
+
CdcRecordChannelComputer channelComputer =
new CdcRecordChannelComputer(table.schema());
channelComputer.setup(numChannels);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index af544def0..869947f65 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -190,7 +190,9 @@ public class CdcRecordStoreMultiWriteOperator
if (table.bucketMode() != BucketMode.FIXED) {
throw new UnsupportedOperationException(
- "Unified Sink only supports FIXED bucket mode, but is " +
table.bucketMode());
+ String.format(
+ "Combine mode Sink only supports FIXED bucket
mode, but %s is %s",
+ table.name(), table.bucketMode()));
}
return table;
}