This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ba1ea66 Add kinesis-sink user metrics to sinkContext (#2169)
ba1ea66 is described below
commit ba1ea665f0a073701df68eeff3a6b2541f6d1fa6
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Jul 16 14:00:46 2018 -0700
Add kinesis-sink user metrics to sinkContext (#2169)
---
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 34 ++++++++++++++++++----
1 file changed, 28 insertions(+), 6 deletions(-)
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3b6cdc..c3c2c45 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -88,21 +88,32 @@ public class KinesisSink implements Sink<byte[]> {
private String streamName;
private static final String defaultPartitionedKey = "default";
private static final int maxPartitionedKeyLength = 256;
+ private SinkContext sinkContext;
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";
+ public static final String METRICS_TOTAL_INCOMING =
"_kinesis_total_incoming_";
+ public static final String METRICS_TOTAL_INCOMING_BYTES =
"_kinesis_total_incoming_bytes_";
+ public static final String METRICS_TOTAL_SUCCESS =
"_kinesis_total_success_";
+ public static final String METRICS_TOTAL_FAILURE =
"_kinesis_total_failure_";
+
+
@Override
public void write(Record<byte[]> record) throws Exception {
String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least
one, and at most 256
+ ByteBuffer data =
createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
- partitionedKey,
- createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
record));
+ partitionedKey, data);
addCallback(addRecordResult,
- ProducerSendCallback.create(this.streamName, record,
System.nanoTime()), directExecutor());
+ ProducerSendCallback.create(this.streamName, record,
System.nanoTime(), sinkContext), directExecutor());
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
+ sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES,
data.array().length);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Published message to kinesis stream {} with size {}",
streamName, record.getValue().length);
}
@@ -120,6 +131,7 @@ public class KinesisSink implements Sink<byte[]> {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
kinesisSinkConfig = KinesisSinkConfig.load(config);
+ this.sinkContext = sinkContext;
checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()),
"empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty
aws-end-point");
@@ -158,16 +170,19 @@ public class KinesisSink implements Sink<byte[]> {
private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
+ private SinkContext sinkContext;
private ProducerSendCallback(Handle<ProducerSendCallback>
recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime) {
+ static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime,
+ SinkContext sinkContext) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
sendCallback.streamName = streamName;
sendCallback.startTime = startTime;
+ sendCallback.sinkContext = sinkContext;
return sendCallback;
}
@@ -175,6 +190,7 @@ public class KinesisSink implements Sink<byte[]> {
resultContext = null;
streamName = null;
startTime = 0;
+ sinkContext = null;
recyclerHandle.recycle(this);
}
@@ -188,10 +204,13 @@ public class KinesisSink implements Sink<byte[]> {
@Override
public void onSuccess(UserRecordResult result) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully published message for replicator of
{}-{} with latency", this.streamName,
- result.getShardId(),
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
+ LOG.debug("Successfully published message for {}-{} with
latency", this.streamName, result.getShardId(),
+ TimeUnit.NANOSECONDS.toMillis((System.nanoTime() -
startTime)));
}
this.resultContext.ack();
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+ }
recycle();
}
@@ -200,6 +219,9 @@ public class KinesisSink implements Sink<byte[]> {
LOG.error("[{}] Failed to published message for replicator of
{}-{} ", streamName,
resultContext.getPartitionId(),
resultContext.getRecordSequence());
this.resultContext.fail();
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+ }
recycle();
}
}