This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 10f674b65db43ccb738010f9e20565b5f4c63b9b
Author: wangyufan <[email protected]>
AuthorDate: Tue Dec 29 13:46:23 2020 +0800

    [connector]fix debezium-connector error log (#9063)
    
    ### Motivation
    
    debezium-connector exception message not clear.
    
    ### Modifications
    
    when debezium-connector throw exception, these code will not be execute:
    ```
    // getRecordSequence is empty
    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] 
with src sequence id [%s]", topic, throwable.getMessage(), 
record.getRecordSequence().get());
    log.error(errorMsg);
    ```
    
    After repair:
    ```
    17:42:43,274 ERROR [public/default/debezium-mysql-source-exchange-0] 
[instance: 0] PulsarSink - Failed to publish to topic [public/default/xx.xx.xx] 
with error 
[org.apache.pulsar.client.api.PulsarClientException$InvalidMessageException: 
Message size is bigger than 5242880 bytes]
    ```
    
    (cherry picked from commit 19ff4c3f44d30d2f61fa34b81d1160b524f6b7b3)
---
 .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index d8a1de9..7939eb5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -191,11 +191,13 @@ public class PulsarSink<T> implements Sink<T> {
                 String errorMsg = null;
                 if (srcRecord instanceof PulsarRecord) {
                     errorMsg = String.format("Failed to publish to topic [%s] 
with error [%s] with src message id [%s]", topic, throwable.getMessage(), 
((PulsarRecord) srcRecord).getMessageId());
-                    log.error(errorMsg);
                 } else {
-                    errorMsg = String.format("Failed to publish to topic [%s] 
with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), 
record.getRecordSequence().get());
-                    log.error(errorMsg);
+                    errorMsg = String.format("Failed to publish to topic [%s] 
with error [%s]", topic, throwable.getMessage());
+                    if (record.getRecordSequence().isPresent()) {
+                        errorMsg = String.format(errorMsg + " with src 
sequence id [%s]", record.getRecordSequence().get());
+                    }
                 }
+                log.error(errorMsg);
                 stats.incrSinkExceptions(new Exception(errorMsg));
                 return null;
             };

Reply via email to