This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit baf04c40795f9617f3489579c519aee2fe70fa73
Author: Zike Yang <[email protected]>
AuthorDate: Thu Apr 29 23:45:00 2021 +0800

    [Kinesis]Fix kinesis sink can not retry to send messages (#10420)
    
    ### Motivation
    
    Currently, when the kinesis sink connector fails to send a message, it will 
not retry. In this case, if `retainOrdering` is enabled, it will lead to 
subsequent messages can not be sent like the following:
    > 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN  
org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain 
ordering with previous failed message 
prod_extapi.reply.message-Optional[26380226003034]
    
    
    ### Modifications
    
    * Add retry logic for the kinesis sink connector. When sending a message 
fails, it will retry to send.
    
    (cherry picked from commit 345cd33c6977c94a5c425e412a9977b4bd144a84)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 52 +++++++++++++++-------
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 12 +++++
 2 files changed, 49 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index 8682332..0b2a41a 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -39,11 +39,14 @@ import io.netty.util.Recycler.Handle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
@@ -96,6 +99,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
     private SinkContext sinkContext;
+    private ScheduledExecutorService scheduledExecutor;
     // 
     private static final int FALSE = 0;
     private static final int TRUE = 1;
@@ -107,11 +111,16 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
     public static final String METRICS_TOTAL_INCOMING_BYTES = 
"_kinesis_total_incoming_bytes_";
     public static final String METRICS_TOTAL_SUCCESS = 
"_kinesis_total_success_";
     public static final String METRICS_TOTAL_FAILURE = 
"_kinesis_total_failure_";
-         
-    
+
+    private void sendUserRecord(ProducerSendCallback producerSendCallback) {
+        ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
+                producerSendCallback.partitionedKey, 
producerSendCallback.data);
+        addCallback(addRecordResult, producerSendCallback, directExecutor());
+    }
+
     @Override
     public void write(Record<byte[]> record) throws Exception {
-        // kpl-thread captures publish-failure. fail the publish on main 
pulsar-io-thread to maintain the ordering 
+        // kpl-thread captures publish-failure. fail the publish on main 
pulsar-io-thread to maintain the ordering
         if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == 
TRUE) {
             LOG.warn("Skip acking message to retain ordering with previous 
failed message {}-{}", this.streamName,
                     record.getRecordSequence());
@@ -122,10 +131,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ByteBuffer data = 
createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
-        ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
-                partitionedKey, data);
-        addCallback(addRecordResult,
-                ProducerSendCallback.create(this, record, System.nanoTime()), 
directExecutor());
+        sendUserRecord(ProducerSendCallback.create(this, record, 
System.nanoTime(), partitionedKey, data));
         if (sinkContext != null) {
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
             sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, 
data.array().length);
@@ -146,6 +152,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
         kinesisSinkConfig = KinesisSinkConfig.load(config);
         this.sinkContext = sinkContext;
 
@@ -180,16 +187,25 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
         private KinesisSink kinesisSink;
+        private Backoff backoff;
+        private String partitionedKey;
+        private ByteBuffer data;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> 
recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(KinesisSink kinesisSink, 
Record<byte[]> resultContext, long startTime) {
+        static ProducerSendCallback create(KinesisSink kinesisSink, 
Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer 
data) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
             sendCallback.kinesisSink = kinesisSink;
             sendCallback.startTime = startTime;
+            sendCallback.partitionedKey = partitionedKey;
+            sendCallback.data = data;
+            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && 
sendCallback.backoff == null) {
+                sendCallback.backoff = new 
Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), 
TimeUnit.MILLISECONDS,
+                        
kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), 
TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS);
+            }
             return sendCallback;
         }
 
@@ -197,6 +213,10 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
             resultContext = null;
             kinesisSink = null;
             startTime = 0;
+            if (backoff != null)
+                backoff.reset();
+            partitionedKey = null;
+            data = null;
             recyclerHandle.recycle(this);
         }
 
@@ -216,12 +236,7 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
             }
-            if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && 
kinesisSink.previousPublishFailed == TRUE) {
-                LOG.warn("Skip acking message to retain ordering with previous 
failed message {}-{} on shard {}",
-                        kinesisSink.streamName, 
resultContext.getRecordSequence(), result.getShardId());
-            } else {
-                this.resultContext.ack();
-            }
+            kinesisSink.previousPublishFailed = FALSE;
             recycle();
         }
 
@@ -244,7 +259,14 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<byte[]> {
             if (kinesisSink.sinkContext != null) {
                 kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
             }
-            recycle();
+            if (backoff != null) {
+                long nextDelay = backoff.next();
+                LOG.info("[{}] Retry to publish message for replicator of 
{}-{} after {} ms.", kinesisSink.streamName,
+                        resultContext.getPartitionId(), 
resultContext.getRecordSequence(), nextDelay);
+                kinesisSink.scheduledExecutor.schedule(() -> 
kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS);
+            } else {
+                recycle();
+            }
         }
     }
 
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index ec21dd5..fa00550 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -64,6 +64,18 @@ public class KinesisSinkConfig extends BaseKinesisConfig 
implements Serializable
         help = "A flag to tell Pulsar IO to retain ordering when moving 
messages from Pulsar to Kinesis")
     private boolean retainOrdering = false;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "100",
+            help = "The initial delay(in milliseconds) between retries.")
+    private long retryInitialDelayInMillis = 100;
+
+    @FieldDoc(
+            required = false,
+            defaultValue = "60000",
+            help = "The maximum delay(in milliseconds) between retries.")
+    private long retryMaxDelayInMillis = 60000;
+
     public static KinesisSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);

Reply via email to