This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ba1ea66 Add kinesis-sink user metrics to sinkContext (#2169) ba1ea66 is described below commit ba1ea665f0a073701df68eeff3a6b2541f6d1fa6 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Jul 16 14:00:46 2018 -0700 Add kinesis-sink user metrics to sinkContext (#2169) --- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 34 ++++++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index c3b6cdc..c3c2c45 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -88,21 +88,32 @@ public class KinesisSink implements Sink<byte[]> { private String streamName; private static final String defaultPartitionedKey = "default"; private static final int maxPartitionedKeyLength = 256; + private SinkContext sinkContext; public static final String ACCESS_KEY_NAME = "accessKey"; public static final String SECRET_KEY_NAME = "secretKey"; + public static final String METRICS_TOTAL_INCOMING = "_kinesis_total_incoming_"; + public static final String METRICS_TOTAL_INCOMING_BYTES = "_kinesis_total_incoming_bytes_"; + public static final String METRICS_TOTAL_SUCCESS = "_kinesis_total_success_"; + public static final String METRICS_TOTAL_FAILURE = "_kinesis_total_failure_"; + + @Override public void write(Record<byte[]> record) throws Exception { String partitionedKey = record.getKey().orElse(defaultPartitionedKey); partitionedKey = partitionedKey.length() > maxPartitionedKeyLength ? partitionedKey.substring(0, maxPartitionedKeyLength - 1) : partitionedKey; // partitionedKey Length must be at least one, and at most 256 + ByteBuffer data = createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record); ListenableFuture<UserRecordResult> addRecordResult = kinesisProducer.addUserRecord(this.streamName, - partitionedKey, - createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record)); + partitionedKey, data); addCallback(addRecordResult, - ProducerSendCallback.create(this.streamName, record, System.nanoTime()), directExecutor()); + ProducerSendCallback.create(this.streamName, record, System.nanoTime(), sinkContext), directExecutor()); + if (sinkContext != null) { + sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1); + sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES, data.array().length); + } if (LOG.isDebugEnabled()) { LOG.debug("Published message to kinesis stream {} with size {}", streamName, record.getValue().length); } @@ -120,6 +131,7 @@ public class KinesisSink implements Sink<byte[]> { @Override public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { kinesisSinkConfig = KinesisSinkConfig.load(config); + this.sinkContext = sinkContext; checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name"); checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty aws-end-point"); @@ -158,16 +170,19 @@ public class KinesisSink implements Sink<byte[]> { private String streamName; private long startTime = 0; private final Handle<ProducerSendCallback> recyclerHandle; + private SinkContext sinkContext; private ProducerSendCallback(Handle<ProducerSendCallback> recyclerHandle) { this.recyclerHandle = recyclerHandle; } - static ProducerSendCallback create(String streamName, Record<byte[]> resultContext, long startTime) { + static ProducerSendCallback create(String streamName, Record<byte[]> resultContext, long startTime, + SinkContext sinkContext) { ProducerSendCallback sendCallback = RECYCLER.get(); sendCallback.resultContext = resultContext; sendCallback.streamName = streamName; sendCallback.startTime = startTime; + sendCallback.sinkContext = sinkContext; return sendCallback; } @@ -175,6 +190,7 @@ public class KinesisSink implements Sink<byte[]> { resultContext = null; streamName = null; startTime = 0; + sinkContext = null; recyclerHandle.recycle(this); } @@ -188,10 +204,13 @@ public class KinesisSink implements Sink<byte[]> { @Override public void onSuccess(UserRecordResult result) { if (LOG.isDebugEnabled()) { - LOG.debug("Successfully published message for replicator of {}-{} with latency", this.streamName, - result.getShardId(), TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime))); + LOG.debug("Successfully published message for {}-{} with latency", this.streamName, result.getShardId(), + TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime))); } this.resultContext.ack(); + if (sinkContext != null) { + sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1); + } recycle(); } @@ -200,6 +219,9 @@ public class KinesisSink implements Sink<byte[]> { LOG.error("[{}] Failed to published message for replicator of {}-{} ", streamName, resultContext.getPartitionId(), resultContext.getRecordSequence()); this.resultContext.fail(); + if (sinkContext != null) { + sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1); + } recycle(); } }