rdhabalia closed pull request #2169: Add kinesis-sink user metrics to
sinkContext
URL: https://github.com/apache/incubator-pulsar/pull/2169
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3b6cdcd26..c3c2c452fb 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -88,21 +88,32 @@
private String streamName;
private static final String defaultPartitionedKey = "default";
private static final int maxPartitionedKeyLength = 256;
+ private SinkContext sinkContext;
public static final String ACCESS_KEY_NAME = "accessKey";
public static final String SECRET_KEY_NAME = "secretKey";
+ public static final String METRICS_TOTAL_INCOMING =
"_kinesis_total_incoming_";
+ public static final String METRICS_TOTAL_INCOMING_BYTES =
"_kinesis_total_incoming_bytes_";
+ public static final String METRICS_TOTAL_SUCCESS =
"_kinesis_total_success_";
+ public static final String METRICS_TOTAL_FAILURE =
"_kinesis_total_failure_";
+
+
@Override
public void write(Record<byte[]> record) throws Exception {
String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least
one, and at most 256
+ ByteBuffer data =
createKinesisMessage(kinesisSinkConfig.getMessageFormat(), record);
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
- partitionedKey,
- createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
record));
+ partitionedKey, data);
addCallback(addRecordResult,
- ProducerSendCallback.create(this.streamName, record,
System.nanoTime()), directExecutor());
+ ProducerSendCallback.create(this.streamName, record,
System.nanoTime(), sinkContext), directExecutor());
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_INCOMING, 1);
+ sinkContext.recordMetric(METRICS_TOTAL_INCOMING_BYTES,
data.array().length);
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Published message to kinesis stream {} with size {}",
streamName, record.getValue().length);
}
@@ -120,6 +131,7 @@ public void close() throws IOException {
@Override
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
kinesisSinkConfig = KinesisSinkConfig.load(config);
+ this.sinkContext = sinkContext;
checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()),
"empty kinesis-stream name");
checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint()), "empty
aws-end-point");
@@ -158,16 +170,19 @@ protected AWSCredentialsProvider
createCredentialProvider(String awsCredentialPl
private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
+ private SinkContext sinkContext;
private ProducerSendCallback(Handle<ProducerSendCallback>
recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime) {
+ static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime,
+ SinkContext sinkContext) {
ProducerSendCallback sendCallback = RECYCLER.get();
sendCallback.resultContext = resultContext;
sendCallback.streamName = streamName;
sendCallback.startTime = startTime;
+ sendCallback.sinkContext = sinkContext;
return sendCallback;
}
@@ -175,6 +190,7 @@ private void recycle() {
resultContext = null;
streamName = null;
startTime = 0;
+ sinkContext = null;
recyclerHandle.recycle(this);
}
@@ -188,10 +204,13 @@ protected ProducerSendCallback
newObject(Handle<ProducerSendCallback> handle) {
@Override
public void onSuccess(UserRecordResult result) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Successfully published message for replicator of
{}-{} with latency", this.streamName,
- result.getShardId(),
TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - startTime)));
+ LOG.debug("Successfully published message for {}-{} with
latency", this.streamName, result.getShardId(),
+ TimeUnit.NANOSECONDS.toMillis((System.nanoTime() -
startTime)));
}
this.resultContext.ack();
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_SUCCESS, 1);
+ }
recycle();
}
@@ -200,6 +219,9 @@ public void onFailure(Throwable exception) {
LOG.error("[{}] Failed to published message for replicator of
{}-{} ", streamName,
resultContext.getPartitionId(),
resultContext.getRecordSequence());
this.resultContext.fail();
+ if (sinkContext != null) {
+ sinkContext.recordMetric(METRICS_TOTAL_FAILURE, 1);
+ }
recycle();
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services