This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new dcab375  [GOBBLIN-1000] Add min and max LogAppendTime to tracking 
events emitted from Gobblin Kafka Extractor[]
dcab375 is described below

commit dcab3758812b3c54e6799ac749bb6d3a6863d95e
Author: sv2000 <[email protected]>
AuthorDate: Mon Dec 9 13:14:38 2019 -0800

    [GOBBLIN-1000] Add min and max LogAppendTime to tracking events emitted 
from Gobblin Kafka Extractor[]
    
    Closes #2845 from sv2000/minMaxLogAppendTime
---
 .../extract/kafka/KafkaExtractorStatsTracker.java         | 15 +++++++++++++++
 .../extract/kafka/KafkaExtractorStatsTrackerTest.java     |  9 +++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 28caf84..60ebb85 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -51,6 +51,8 @@ public class KafkaExtractorStatsTracker {
   private static final String ELAPSED_TIME = "elapsedTime";
   private static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
   private static final String SLA_MISSED_RECORD_COUNT = "slaMissedRecordCount";
+  private static final String MIN_LOG_APPEND_TIMESTAMP = 
"minLogAppendTimestamp";
+  private static final String MAX_LOG_APPEND_TIMESTAMP = 
"maxLogAppendTimestamp";
   private static final String UNDECODABLE_MESSAGE_COUNT = 
"undecodableMessageCount";
   private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
   private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
@@ -107,6 +109,8 @@ public class KafkaExtractorStatsTracker {
     private long startFetchEpochTime;
     private long stopFetchEpochTime;
     private long lastSuccessfulRecordHeaderTimestamp;
+    private long minLogAppendTime = -1L;
+    private long maxLogAppendTime = -1L;
   }
 
   /**
@@ -163,6 +167,15 @@ public class KafkaExtractorStatsTracker {
       if (this.isSlaConfigured) {
         if (v.slaMissedRecordCount < 0) {
           v.slaMissedRecordCount = 0;
+          v.minLogAppendTime = logAppendTimestamp;
+          v.maxLogAppendTime = logAppendTimestamp;
+        } else {
+          if (logAppendTimestamp < v.minLogAppendTime) {
+            v.minLogAppendTime = logAppendTimestamp;
+          }
+          if (logAppendTimestamp > v.maxLogAppendTime) {
+            v.maxLogAppendTime = logAppendTimestamp;
+          }
         }
         if (logAppendTimestamp > 0 && (System.currentTimeMillis() - 
logAppendTimestamp > recordLevelSlaMillis)) {
           v.slaMissedRecordCount++;
@@ -267,6 +280,8 @@ public class KafkaExtractorStatsTracker {
         Long.toString(stats.getStopFetchEpochTime()));
     tagsForPartition.put(PROCESSED_RECORD_COUNT, 
Long.toString(stats.getProcessedRecordCount()));
     tagsForPartition.put(SLA_MISSED_RECORD_COUNT, 
Long.toString(stats.getSlaMissedRecordCount()));
+    tagsForPartition.put(MIN_LOG_APPEND_TIMESTAMP, 
Long.toString(stats.getMinLogAppendTime()));
+    tagsForPartition.put(MAX_LOG_APPEND_TIMESTAMP, 
Long.toString(stats.getMaxLogAppendTime()));
     tagsForPartition.put(PARTITION_TOTAL_SIZE, 
Long.toString(stats.getPartitionTotalSize()));
     tagsForPartition.put(AVG_RECORD_SIZE, 
Long.toString(stats.getAvgRecordSize()));
     tagsForPartition.put(ELAPSED_TIME, Long.toString(stats.getElapsedTime()));
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 2022ba7..f2e0824 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -77,6 +77,8 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 == 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 == 0);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 -1);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
 -1);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
 -1);
 
     this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, 
decodeStartTime, 100, logAppendTimestamp);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(),
 1);
@@ -84,10 +86,13 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 > 0);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 > 0);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 1);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
 logAppendTimestamp);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
 logAppendTimestamp);
 
     readStartTime = System.nanoTime();
     Thread.sleep(1);
     decodeStartTime = System.nanoTime();
+    long previousLogAppendTimestamp = logAppendTimestamp;
     logAppendTimestamp = System.currentTimeMillis() - 10;
     long previousDecodeRecordTime = 
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime();
     long previousReadRecordTime = 
this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime();
@@ -98,6 +103,8 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime()
 > previousDecodeRecordTime);
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime()
 > previousReadRecordTime);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(),
 1);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(),
 previousLogAppendTimestamp);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(),
 logAppendTimestamp);
   }
 
   @Test
@@ -141,6 +148,8 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord()
 > 0);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgRecordSize(),
 100);
     
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(),
 0);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(),
 logAppendTimestamp);
+    
Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(),
 logAppendTimestamp);
   }
 
   @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition")

Reply via email to