This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6c01ab4cc90ffccc70b9c8e177c0d28ea75a69d1 Author: Zike Yang <[email protected]> AuthorDate: Wed Apr 28 18:06:00 2021 +0800 Fix null error messages of onFailure exception in KinesisSink. (#10416) (cherry picked from commit 89a808cf88d10c8380e69ef129ad9f593d0b5eae) --- .../java/org/apache/pulsar/io/kinesis/KinesisSink.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index bf58cfa..8682332 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -28,6 +28,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; import com.amazonaws.services.kinesis.producer.UserRecordResult; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; @@ -226,8 +227,19 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { @Override public void onFailure(Throwable exception) { - LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", kinesisSink.streamName, - resultContext.getPartitionId(), resultContext.getRecordSequence(), exception.getMessage()); + if (exception instanceof UserRecordFailedException) { + // If the exception is UserRecordFailedException, we need to extract it to see real error messages. + UserRecordFailedException failedException = (UserRecordFailedException) exception; + StringBuffer stringBuffer = new StringBuffer(); + failedException.getResult().getAttempts().forEach(attempt -> + stringBuffer.append(String.format("errorMessage:%s, errorCode:%s, delay:%d, duration:%d;", + attempt.getErrorMessage(), attempt.getErrorCode(), attempt.getDelay(), attempt.getDuration()))); + LOG.error("[{}] Failed to published message for replicator of {}-{}: Attempts:{}", kinesisSink.streamName, + resultContext.getPartitionId(), resultContext.getRecordSequence(), stringBuffer.toString()); + } else { + LOG.error("[{}] Failed to published message for replicator of {}-{}, {} ", kinesisSink.streamName, + resultContext.getPartitionId(), resultContext.getRecordSequence(), exception.getMessage()); + } kinesisSink.previousPublishFailed = TRUE; if (kinesisSink.sinkContext != null) { kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
