This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e7919f053e9b4fefe3b8d4c6498a18f31f4813ff Author: wangyufan <[email protected]> AuthorDate: Tue Dec 29 13:46:23 2020 +0800 [connector]fix debezium-connector error log (#9063) ### Motivation debezium-connector exception message not clear. ### Modifications when debezium-connector throw exception, these code will not be execute: ``` // getRecordSequence is empty errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get()); log.error(errorMsg); ``` After repair: ``` 17:42:43,274 ERROR [public/default/debezium-mysql-source-exchange-0] [instance: 0] PulsarSink - Failed to publish to topic [public/default/xx.xx.xx] with error [org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes] ``` (cherry picked from commit 19ff4c3f44d30d2f61fa34b81d1160b524f6b7b3) --- .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index d94db28..0495989 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -153,11 +153,13 @@ public class PulsarSink<T> implements Sink<T> { String errorMsg = null; if (srcRecord instanceof PulsarRecord) { errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId()); - log.error(errorMsg); } else { - errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get()); - log.error(errorMsg); + errorMsg = String.format("Failed to publish to topic [%s] with error [%s]", topic, throwable.getMessage()); + if (record.getRecordSequence().isPresent()) { + errorMsg = String.format(errorMsg + " with src sequence id [%s]", record.getRecordSequence().get()); + } } + log.error(errorMsg); stats.incrSinkExceptions(new Exception(errorMsg)); return null; };
