This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ba1ea66  Add kinesis-sink user metrics to sinkContext (#2169)
ba1ea66 is described below

commit ba1ea665f0a073701df68eeff3a6b2541f6d1fa6
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Jul 16 14:00:46 2018 -0700

    Add kinesis-sink user metrics to sinkContext (#2169)
---
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 34 ++++++++++++++++++----
 1 file changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3b6cdc..c3c2c45 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -88,21 +88,32 @@ public class KinesisSink implements Sink<byte[]> {
     private String streamName;
     private static final String defaultPartitionedKey = "default";
     private static final int maxPartitionedKeyLength = 256;
+    private SinkContext sinkContext;
 
     public static final String ACCESS_KEY_NAME = "accessKey";
     public static final String SECRET_KEY_NAME = "secretKey";
 
+    public static final String METRICS_TOTAL_INCOMING = 
"_kinesis_total_incoming_";
+    public static final String METRICS_TOTAL_INCOMING_BYTES = 
"_kinesis_total_incoming_bytes_";
+    public static final String METRICS_TOTAL_SUCCESS = 
"_kinesis_total_success_";
+    public static final String METRICS_TOTAL_FAILURE = 
"_kinesis_total_failure_";
+         
+    
     @Override
     public void write(Record<byte[]> record) throws Exception {
         String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
+        ByteBuffer data = 
createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
-                partitionedKey,
-                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
record));
+                partitionedKey, data);
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, record, 
System.nanoTime()), directExecutor());
+                ProducerSendCallback.create(this.streamName, record, 
System.nanoTime(), sinkContext), directExecutor());
+        if (sinkContext != null) {
+            sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
+            sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, 
data.array().length);
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, record.getValue().length);
         }
@@ -120,6 +131,7 @@ public class KinesisSink implements Sink<byte[]> {
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kinesisSinkConfig = KinesisSinkConfig.load(config);
+        this.sinkContext = sinkContext;
 
         checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), 
"empty kinesis-stream name");
         checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty 
aws-end-point");
@@ -158,16 +170,19 @@ public class KinesisSink implements Sink<byte[]> {
         private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
+        private SinkContext sinkContext;
 
         private ProducerSendCallback(Handle<ProducerSendCallback> 
recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, Record<byte[]> 
resultContext, long startTime) {
+        static ProducerSendCallback create(String streamName, Record<byte[]> 
resultContext, long startTime,
+                SinkContext sinkContext) {
             ProducerSendCallback sendCallback = RECYCLER.get();
             sendCallback.resultContext = resultContext;
             sendCallback.streamName = streamName;
             sendCallback.startTime = startTime;
+            sendCallback.sinkContext = sinkContext;
             return sendCallback;
         }
 
@@ -175,6 +190,7 @@ public class KinesisSink implements Sink<byte[]> {
             resultContext = null;
             streamName = null;
             startTime = 0;
+            sinkContext = null;
             recyclerHandle.recycle(this);
         }
 
@@ -188,10 +204,13 @@ public class KinesisSink implements Sink<byte[]> {
         @Override
         public void onSuccess(UserRecordResult result) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Successfully published message for replicator of 
{}-{} with latency", this.streamName,
-                        result.getShardId(), 
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
+                LOG.debug("Successfully published message for {}-{} with 
latency", this.streamName, result.getShardId(),
+                        TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - 
startTime)));
             }
             this.resultContext.ack();
+            if (sinkContext != null) {
+                sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+            }
             recycle();
         }
 
@@ -200,6 +219,9 @@ public class KinesisSink implements Sink<byte[]> {
             LOG.error("[{}] Failed to published message for replicator of 
{}-{} ", streamName,
                     resultContext.getPartitionId(), 
resultContext.getRecordSequence());
             this.resultContext.fail();
+            if (sinkContext != null) {
+                sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+            }
             recycle();
         }
     }

Reply via email to