This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new dcab375 [GOBBLIN-1000] Add min and max LogAppendTime to tracking
events emitted from Gobblin Kafka Extractor[]
dcab375 is described below
commit dcab3758812b3c54e6799ac749bb6d3a6863d95e
Author: sv2000 <[email protected]>
AuthorDate: Mon Dec 9 13:14:38 2019 -0800
[GOBBLIN-1000] Add min and max LogAppendTime to tracking events emitted
from Gobblin Kafka Extractor[]
Closes #2845 from sv2000/minMaxLogAppendTime
---
.../extract/kafka/KafkaExtractorStatsTracker.java | 15 +++++++++++++++
.../extract/kafka/KafkaExtractorStatsTrackerTest.java | 9 +++++++++
2 files changed, 24 insertions(+)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 28caf84..60ebb85 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -51,6 +51,8 @@ public class KafkaExtractorStatsTracker {
private static final String ELAPSED_TIME = "elapsedTime";
private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
private static final String SLA_MISSED_RECORD_COUNT = "slaMissedRecordCount";
+ private static final String MIN_LOG_APPEND_TIMESTAMP =
"minLogAppendTimestamp";
+ private static final String MAX_LOG_APPEND_TIMESTAMP =
"maxLogAppendTimestamp";
private static final String UNDECODABLE_MESSAGE_COUNT =
"undecodableMessageCount";
private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
@@ -107,6 +109,8 @@ public class KafkaExtractorStatsTracker {
private long startFetchEpochTime;
private long stopFetchEpochTime;
private long lastSuccessfulRecordHeaderTimestamp;
+ private long minLogAppendTime = -1L;
+ private long maxLogAppendTime = -1L;
}
/**
@@ -163,6 +167,15 @@ public class KafkaExtractorStatsTracker {
if (this.isSlaConfigured) {
if (v.slaMissedRecordCount < 0) {
v.slaMissedRecordCount = 0;
+ v.minLogAppendTime = logAppendTimestamp;
+ v.maxLogAppendTime = logAppendTimestamp;
+ } else {
+ if (logAppendTimestamp < v.minLogAppendTime) {
+ v.minLogAppendTime = logAppendTimestamp;
+ }
+ if (logAppendTimestamp > v.maxLogAppendTime) {
+ v.maxLogAppendTime = logAppendTimestamp;
+ }
}
if (logAppendTimestamp > 0 && (System.currentTimeMillis() -
logAppendTimestamp > recordLevelSlaMillis)) {
v.slaMissedRecordCount++;
@@ -267,6 +280,8 @@ public class KafkaExtractorStatsTracker {
Long.toString(stats.getStopFetchEpochTime()));
tagsForPartition.put(PROCESSED_RECORD_COUNT,
Long.toString(stats.getProcessedRecordCount()));
tagsForPartition.put(SLA_MISSED_RECORD_COUNT,
Long.toString(stats.getSlaMissedRecordCount()));
+ tagsForPartition.put(MIN_LOG_APPEND_TIMESTAMP,
Long.toString(stats.getMinLogAppendTime()));
+ tagsForPartition.put(MAX_LOG_APPEND_TIMESTAMP,
Long.toString(stats.getMaxLogAppendTime()));
tagsForPartition.put(PARTITION_TOTAL_SIZE,
Long.toString(stats.getPartitionTotalSize()));
tagsForPartition.put(AVG_RECORD_SIZE,
Long.toString(stats.getAvgRecordSize()));
tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 2022ba7..f2e0824 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -77,6 +77,8 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
== 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
== 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
-1);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
-1);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
-1);
this.extractorStatsTracker.onDecodeableRecord(0, readStartTime,
decodeStartTime, 100, logAppendTimestamp);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
1);
@@ -84,10 +86,13 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
> 0);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
> 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
1);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
logAppendTimestamp);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
logAppendTimestamp);
readStartTime = System.nanoTime();
Thread.sleep(1);
decodeStartTime = System.nanoTime();
+ long previousLogAppendTimestamp = logAppendTimestamp;
logAppendTimestamp = System.currentTimeMillis() - 10;
long previousDecodeRecordTime =
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
long previousReadRecordTime =
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
@@ -98,6 +103,8 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
> previousDecodeRecordTime);
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
> previousReadRecordTime);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
1);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
previousLogAppendTimestamp);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
logAppendTimestamp);
}
@Test
@@ -141,6 +148,8 @@ public class KafkaExtractorStatsTrackerTest {
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord()
> 0);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(),
100);
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(),
0);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(),
logAppendTimestamp);
+
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(),
logAppendTimestamp);
}
@Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")