This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 10f674b65db43ccb738010f9e20565b5f4c63b9b Author: wangyufan <[email protected]> AuthorDate: Tue Dec 29 13:46:23 2020 +0800 [connector]fix debezium-connector error log (#9063) ### Motivation debezium-connector exception message not clear. ### Modifications when debezium-connector throw exception, these code will not be execute: ``` // getRecordSequence is empty errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get()); log.error(errorMsg); ``` After repair: ``` 17:42:43,274 ERROR [public/default/debezium-mysql-source-exchange-0] [instance: 0] PulsarSink - Failed to publish to topic [public/default/xx.xx.xx] with error [org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: Message size is bigger than 5242880 bytes] ``` (cherry picked from commit 19ff4c3f44d30d2f61fa34b81d1160b524f6b7b3) --- .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index d8a1de9..7939eb5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -191,11 +191,13 @@ public class PulsarSink<T> implements Sink<T> { String errorMsg = null; if (srcRecord instanceof PulsarRecord) { errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId()); - log.error(errorMsg); } else { - errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get()); - log.error(errorMsg); + errorMsg = String.format("Failed to publish to topic [%s] with error [%s]", topic, throwable.getMessage()); + if (record.getRecordSequence().isPresent()) { + errorMsg = String.format(errorMsg + " with src sequence id [%s]", record.getRecordSequence().get()); + } } + log.error(errorMsg); stats.incrSinkExceptions(new Exception(errorMsg)); return null; };
