This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a057fd1c46 [cdc] add exception message for
CdcRecordStoreMultiWriteOperator (#4734)
a057fd1c46 is described below
commit a057fd1c46aa08ad1e88e5acf0a56bf816518bf4
Author: LsomeYeah <[email protected]>
AuthorDate: Wed Dec 18 11:30:48 2024 +0800
[cdc] add exception message for CdcRecordStoreMultiWriteOperator (#4734)
---
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 9387a82938..a4b4e82840 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -174,7 +174,7 @@ public class CdcRecordStoreMultiWriteOperator
try {
write.write(optionalConverted.get());
} catch (Exception e) {
- throw new IOException(e);
+ throw new IOException("Exception occurs for writing record to
table: " + tableId, e);
}
}
@@ -235,12 +235,17 @@ public class CdcRecordStoreMultiWriteOperator
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
- committables.addAll(
- write.prepareCommit(waitCompaction, checkpointId).stream()
- .map(
- committable ->
-
MultiTableCommittable.fromCommittable(key, committable))
- .collect(Collectors.toList()));
+ try {
+ committables.addAll(
+ write.prepareCommit(waitCompaction,
checkpointId).stream()
+ .map(
+ committable ->
+
MultiTableCommittable.fromCommittable(
+ key, committable))
+ .collect(Collectors.toList()));
+ } catch (Exception e) {
+ throw new IOException("Failed to prepare commit for table: " +
key.toString(), e);
+ }
}
return committables;
}