This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6c01ab4cc90ffccc70b9c8e177c0d28ea75a69d1
Author: Zike Yang <[email protected]>
AuthorDate: Wed Apr 28 18:06:00 2021 +0800

    Fix null error messages of onFailure exception in KinesisSink. (#10416)
    
    
    (cherry picked from commit 89a808cf88d10c8380e69ef129ad9f593d0b5eae)
---
 .../java/org/apache/pulsar/io/kinesis/KinesisSink.java   | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index bf58cfa..8682332 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -28,6 +28,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel;
+import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
 import com.amazonaws.services.kinesis.producer.UserRecordResult;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -226,8 +227,19 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
 
         @Override
         public void onFailure(Throwable exception) {
-            LOG.error("[{}] Failed to published message for replicator of 
{}-{}, {} ", kinesisSink.streamName,
-                    resultContext.getPartitionId(), 
resultContext.getRecordSequence(), exception.getMessage());
+            if (exception instanceof UserRecordFailedException) {
+                // If the exception is UserRecordFailedException, we need to 
extract it to see real error messages.
+                UserRecordFailedException failedException = 
(UserRecordFailedException) exception;
+                StringBuffer stringBuffer = new StringBuffer();
+                failedException.getResult().getAttempts().forEach(attempt ->
+                        stringBuffer.append(String.format("errorMessage:%s, 
errorCode:%s, delay:%d, duration:%d;",
+                                attempt.getErrorMessage(), 
attempt.getErrorCode(), attempt.getDelay(), attempt.getDuration())));
+                LOG.error("[{}] Failed to published message for replicator of 
{}-{}: Attempts:{}", kinesisSink.streamName,
+                        resultContext.getPartitionId(), 
resultContext.getRecordSequence(), stringBuffer.toString());
+            } else {
+                LOG.error("[{}] Failed to published message for replicator of 
{}-{}, {} ", kinesisSink.streamName,
+                        resultContext.getPartitionId(), 
resultContext.getRecordSequence(), exception.getMessage());
+            }
             kinesisSink.previousPublishFailed = TRUE;
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);

Reply via email to