This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ba5ed2e  [GOBBLIN-1193] Ensure that ingestion latency is 0 when no 
records are …
ba5ed2e is described below

commit ba5ed2ef55d172b2b0192c1f3d6edafa1876d59b
Author: sv2000 <sudarsh...@gmail.com>
AuthorDate: Mon Jun 15 17:38:50 2020 -0700

    [GOBBLIN-1193] Ensure that ingestion latency is 0 when no records are …
    
    Closes #3041 from sv2000/kafkaExtractorLatency
---
 .../extract/kafka/KafkaExtractorStatsTracker.java        | 16 ++++++++++++++--
 .../extract/kafka/KafkaExtractorStatsTrackerTest.java    | 16 +++++++++++++++-
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 77cca0c..25c94d6 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -327,19 +327,31 @@ public class KafkaExtractorStatsTracker {
     if (partitionStats.getStopFetchEpochTime() > 
aggregateExtractorStats.getMaxStopFetchEpochTime()) {
       
aggregateExtractorStats.setMaxStopFetchEpochTime(partitionStats.getStopFetchEpochTime());
     }
-    long partitionLatency = partitionStats.getStopFetchEpochTime() - 
partitionStats.getMinLogAppendTime();
+
+    long partitionLatency = 0L;
+    //Check if there are any records consumed from this KafkaPartition.
+    if (partitionStats.getMinLogAppendTime() > 0) {
+      partitionLatency = partitionStats.getStopFetchEpochTime() - 
partitionStats.getMinLogAppendTime();
+    }
+
     if (aggregateExtractorStats.getMaxIngestionLatency() < partitionLatency) {
       aggregateExtractorStats.setMaxIngestionLatency(partitionLatency);
     }
+
     if (aggregateExtractorStats.getMinLogAppendTime() > 
partitionStats.getMinLogAppendTime()) {
       
aggregateExtractorStats.setMinLogAppendTime(partitionStats.getMinLogAppendTime());
     }
+
     if (aggregateExtractorStats.getMaxLogAppendTime() < 
partitionStats.getMaxLogAppendTime()) {
       
aggregateExtractorStats.setMaxLogAppendTime(partitionStats.getMaxLogAppendTime());
     }
+
     
aggregateExtractorStats.setProcessedRecordCount(aggregateExtractorStats.getProcessedRecordCount()
 + partitionStats.getProcessedRecordCount());
     
aggregateExtractorStats.setNumBytesConsumed(aggregateExtractorStats.getNumBytesConsumed()
 + partitionStats.getPartitionTotalSize());
-    
aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount()
 + partitionStats.getSlaMissedRecordCount());
+
+    if (partitionStats.getSlaMissedRecordCount() > 0) {
+      
aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount()
 + partitionStats.getSlaMissedRecordCount());
+    }
   }
 
   private Map<String, String> createTagsForPartition(int partitionId, 
MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, 
MultiLongWatermark nextWatermark) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 67e1146..9cecc59 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -211,12 +211,26 @@ public class KafkaExtractorStatsTrackerTest {
     
Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES)
 >= 15);
   }
 
-  @Test (dependsOnMethods = "testGetAvgRecordSize")
+  @Test (dependsOnMethods = "testGetMaxLatency")
   public void testGetConsumptionRateMBps() {
     double a = this.extractorStatsTracker.getConsumptionRateMBps();
     Assert.assertEquals((new Double(Math.ceil(a * epochDurationMs * 1024 * 
1024) / 1000)).longValue(), 300L);
   }
 
+  @Test (dependsOnMethods = "testGetConsumptionRateMBps")
+  public void testGetMaxLatencyNoRecordsInEpoch() {
+    //Close the previous epoch
+    this.extractorStatsTracker.reset();
+    Long readStartTime = System.nanoTime();
+    //Call update on partitions 1 and 2 with no records cosumed from each 
partition
+    this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, 
readStartTime, 0);
+    this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, 
readStartTime, 0);
+    //Close the epoch
+    this.extractorStatsTracker.reset();
+    //Ensure the max latency is 0 when there are no records
+    
Assert.assertEquals(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES),
 0L);
+  }
+
   @Test
   public void testGenerateTagsForPartitions() throws Exception {
     MultiLongWatermark lowWatermark = new MultiLongWatermark(Arrays.asList(new 
Long(10), new Long(20)));

Reply via email to