rdhabalia closed pull request #2285: kinesis-sink: manage msg ordering for
publish callback failure
URL: https://github.com/apache/incubator-pulsar/pull/2285
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dc70b98c5c..67de21a4ee 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -44,6 +44,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -89,6 +90,12 @@
private static final String defaultPartitionedKey = "default";
private static final int maxPartitionedKeyLength = 256;
private SinkContext sinkContext;
+ //
+ private static final int FALSE = 0;
+ private static final int TRUE = 1;
+ private volatile int previousPublishFailed = FALSE;
+ private static final AtomicIntegerFieldUpdater<KinesisSink>
IS_PUBLISH_FAILED =
+ AtomicIntegerFieldUpdater.newUpdater(KinesisSink.class,
"previousPublishFailed");
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";
@@ -101,6 +108,12 @@
@Override
public void write(Record<byte[]> record) throws Exception {
+ // kpl-thread captures publish-failure. fail the publish on main
pulsar-io-thread to maintain the ordering
+ if (kinesisSinkConfig.isRetainOrdering() && previousPublishFailed ==
TRUE) {
+ LOG.warn("Skip acking message to retain ordering with previous
failed message {}-{}", this.streamName,
+ record.getRecordSequence());
+ throw new IllegalStateException("kinesis queue has publish
failure");
+ }
String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
@@ -109,7 +122,7 @@ public void write(Record<byte[]> record) throws Exception {
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
partitionedKey, data);
addCallback(addRecordResult,
- ProducerSendCallback.create(this.streamName, record,
System.nanoTime(), sinkContext), directExecutor());
+ ProducerSendCallback.create(this, record, System.nanoTime()),
directExecutor());
if (sinkContext != null) {
sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES,
data.array().length);
@@ -151,6 +164,7 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
this.streamName = kinesisSinkConfig.getAwsKinesisStreamName();
this.kinesisProducer = new KinesisProducer(kinesisConfig);
+ IS_PUBLISH_FAILED.set(this, FALSE);
LOG.info("Kinesis sink started. {}",
(ReflectionToStringBuilder.toString(kinesisConfig,
ToStringStyle.SHORT_PREFIX_STYLE)));
}
@@ -167,30 +181,26 @@ protected AWSCredentialsProvider
createCredentialProvider(String awsCredentialPl
private static final class ProducerSendCallback implements
FutureCallback<UserRecordResult> {
private Record<byte[]> resultContext;
- private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
- private SinkContext sinkContext;
+ private KinesisSink kinesisSink;
private ProducerSendCallback(Handle<ProducerSendCallback>
recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime,
- SinkContext sinkContext) {
+ static ProducerSendCallback create(KinesisSink kinesisSink,
Record<byte[]> resultContext, long startTime) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
- sendCallback.streamName = streamName;
+ sendCallback.kinesisSink = kinesisSink;
sendCallback.startTime = startTime;
- sendCallback.sinkContext = sinkContext;
return sendCallback;
}
private void recycle() {
resultContext = null;
- streamName = null;
+ kinesisSink = null;
startTime = 0;
- sinkContext = null;
recyclerHandle.recycle(this);
}
@@ -204,23 +214,28 @@ protected ProducerSendCallback
newObject(Handle<ProducerSendCallback> handle) {
@Override
public void onSuccess(UserRecordResult result) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully published message for {}-{} with
latency", this.streamName, result.getShardId(),
+ LOG.debug("Successfully published message for {}-{} with
latency {}", kinesisSink.streamName, result.getShardId(),
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() -
startTime)));
}
- this.resultContext.ack();
- if (sinkContext != null) {
- sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+ if (kinesisSink.sinkContext != null) {
+ kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+ }
+ if (kinesisSink.kinesisSinkConfig.isRetainOrdering() &&
kinesisSink.previousPublishFailed == TRUE) {
+ LOG.warn("Skip acking message to retain ordering with previous
failed message {}-{} on shard {}",
+ kinesisSink.streamName,
resultContext.getRecordSequence(), result.getShardId());
+ } else {
+ this.resultContext.ack();
}
recycle();
}
@Override
public void onFailure(Throwable exception) {
- LOG.error("[{}] Failed to published message for replicator of
{}-{} ", streamName,
+ LOG.error("[{}] Failed to published message for replicator of
{}-{} ", kinesisSink.streamName,
resultContext.getPartitionId(),
resultContext.getRecordSequence());
- this.resultContext.fail();
- if (sinkContext != null) {
- sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+ kinesisSink.previousPublishFailed = TRUE;
+ if (kinesisSink.sinkContext != null) {
+ kinesisSink.sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
}
recycle();
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index bf5f2ea883..ba476ab099 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -45,6 +45,7 @@
private String awsCredentialPluginName;
private String awsCredentialPluginParam;
private MessageFormat messageFormat = MessageFormat.ONLY_RAW_PAYLOAD; //
default : ONLY_RAW_PAYLOAD
+ private boolean retainOrdering;
public static KinesisSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services