This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a057fd1c46 [cdc] add exception message for 
CdcRecordStoreMultiWriteOperator (#4734)
a057fd1c46 is described below

commit a057fd1c46aa08ad1e88e5acf0a56bf816518bf4
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Dec 18 11:30:48 2024 +0800

    [cdc] add exception message for CdcRecordStoreMultiWriteOperator (#4734)
---
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 9387a82938..a4b4e82840 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -174,7 +174,7 @@ public class CdcRecordStoreMultiWriteOperator
         try {
             write.write(optionalConverted.get());
         } catch (Exception e) {
-            throw new IOException(e);
+            throw new IOException("Exception occurs for writing record to 
table: " + tableId, e);
         }
     }
 
@@ -235,12 +235,17 @@ public class CdcRecordStoreMultiWriteOperator
         for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
             Identifier key = entry.getKey();
             StoreSinkWrite write = entry.getValue();
-            committables.addAll(
-                    write.prepareCommit(waitCompaction, checkpointId).stream()
-                            .map(
-                                    committable ->
-                                            
MultiTableCommittable.fromCommittable(key, committable))
-                            .collect(Collectors.toList()));
+            try {
+                committables.addAll(
+                        write.prepareCommit(waitCompaction, 
checkpointId).stream()
+                                .map(
+                                        committable ->
+                                                
MultiTableCommittable.fromCommittable(
+                                                        key, committable))
+                                .collect(Collectors.toList()));
+            } catch (Exception e) {
+                throw new IOException("Failed to prepare commit for table: " + 
key.toString(), e);
+            }
         }
         return committables;
     }

Reply via email to