This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit baf04c40795f9617f3489579c519aee2fe70fa73 Author: Zike Yang <[email protected]> AuthorDate: Thu Apr 29 23:45:00 2021 +0800 [Kinesis]Fix kinesis sink can not retry to send messages (#10420) ### Motivation Currently, when the kinesis sink connector fails to send a message, it will not retry. In this case, if `retainOrdering` is enabled, it will lead to subsequent messages can not be sent like the following: > 17:09:40.923 [crm/messaging-service/messaging-service-reply-0] WARN org.apache.pulsar.io.kinesis.KinesisSink - Skip acking message to retain ordering with previous failed message prod_extapi.reply.message-Optional[26380226003034] ### Modifications * Add retry logic for the kinesis sink connector. When sending a message fails, it will retry to send. (cherry picked from commit 345cd33c6977c94a5c425e412a9977b4bd144a84) --- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 52 +++++++++++++++------- .../pulsar/io/kinesis/KinesisSinkConfig.java | 12 +++++ 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index 8682332..0b2a41a 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -39,11 +39,14 @@ import io.netty.util.Recycler.Handle; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.aws.AbstractAwsConnector; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; @@ -96,6 +99,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { private static final String defaultPartitionedKey = "default"; private static final int maxPartitionedKeyLength = 256; private SinkContext sinkContext; + private ScheduledExecutorService scheduledExecutor; // private static final int FALSE = 0; private static final int TRUE = 1; @@ -107,11 +111,16 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_"; public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_"; public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_"; - - + + private void sendUserRecord(ProducerSendCallback producerSendCallback) { + ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, + producerSendCallback.partitionedKey, producerSendCallback.data); + addCallback(addRecordResult, producerSendCallback, directExecutor()); + } + @Override public void write(Record<byte[]> record) throws Exception { - // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering + // kpl-thread captures publish-failure. fail the publish on main pulsar-io-thread to maintain the ordering if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed == TRUE) { LOG.warn("Skip acking message to retain ordering with previous failed message {}-{}", this.streamName, record.getRecordSequence()); @@ -122,10 +131,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256 ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record); - ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, - partitionedKey, data); - addCallback(addRecordResult, - ProducerSendCallback.create(this, record, System.nanoTime()), directExecutor()); + sendUserRecord(ProducerSendCallback.create(this, record, System.nanoTime(), partitionedKey, data)); if (sinkContext != null) { sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1); sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length); @@ -146,6 +152,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); kinesisSinkConfig = KinesisSinkConfig.load(config); this.sinkContext = sinkContext; @@ -180,16 +187,25 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { private long startTime = 0; private final Handle<ProducerSendCallback> recyclerHandle; private KinesisSink kinesisSink; + private Backoff backoff; + private String partitionedKey; + private ByteBuffer data; private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) { this.recyclerHandle = recyclerHandle; } - static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime) { + static ProducerSendCallback create(KinesisSink kinesisSink, Record<byte[]> resultContext, long startTime, String partitionedKey, ByteBuffer data) { ProducerSendCallback sendCallback = RECYCLER.get(); sendCallback.resultContext = resultContext; sendCallback.kinesisSink = kinesisSink; sendCallback.startTime = startTime; + sendCallback.partitionedKey = partitionedKey; + sendCallback.data = data; + if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && sendCallback.backoff == null) { + sendCallback.backoff = new Backoff(kinesisSink.kinesisSinkConfig.getRetryInitialDelayInMillis(), TimeUnit.MILLISECONDS, + kinesisSink.kinesisSinkConfig.getRetryMaxDelayInMillis(), TimeUnit.MILLISECONDS, 0, TimeUnit.SECONDS); + } return sendCallback; } @@ -197,6 +213,10 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { resultContext = null; kinesisSink = null; startTime = 0; + if (backoff != null) + backoff.reset(); + partitionedKey = null; + data = null; recyclerHandle.recycle(this); } @@ -216,12 +236,7 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { if (kinesisSink.sinkContext != null) { kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1); } - if (kinesisSink.kinesisSinkConfig.isRetainOrdering() && kinesisSink.previousPublishFailed == TRUE) { - LOG.warn("Skip acking message to retain ordering with previous failed message {}-{} on shard {}", - kinesisSink.streamName, resultContext.getRecordSequence(), result.getShardId()); - } else { - this.resultContext.ack(); - } + kinesisSink.previousPublishFailed = FALSE; recycle(); } @@ -244,7 +259,14 @@ public class KinesisSink extends AbstractAwsConnector implements Sink<byte[]> { if (kinesisSink.sinkContext != null) { kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1); } - recycle(); + if (backoff != null) { + long nextDelay = backoff.next(); + LOG.info("[{}] Retry to publish message for replicator of {}-{} after {} ms.", kinesisSink.streamName, + resultContext.getPartitionId(), resultContext.getRecordSequence(), nextDelay); + kinesisSink.scheduledExecutor.schedule(() -> kinesisSink.sendUserRecord(this), nextDelay, TimeUnit.MICROSECONDS); + } else { + recycle(); + } } } diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java index ec21dd5..fa00550 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java @@ -64,6 +64,18 @@ public class KinesisSinkConfig extends BaseKinesisConfig implements Serializable help = "A flag to tell Pulsar IO to retain ordering when moving messages from Pulsar to Kinesis") private boolean retainOrdering = false; + @FieldDoc( + required = false, + defaultValue = "100", + help = "The initial delay(in milliseconds) between retries.") + private long retryInitialDelayInMillis = 100; + + @FieldDoc( + required = false, + defaultValue = "60000", + help = "The maximum delay(in milliseconds) between retries.") + private long retryMaxDelayInMillis = 60000; + public static KinesisSinkConfig load(String yamlFile) throws IOException { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
