This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new ba5ed2e [GOBBLIN-1193] Ensure that ingestion latency is 0 when no records are … ba5ed2e is described below commit ba5ed2ef55d172b2b0192c1f3d6edafa1876d59b Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Mon Jun 15 17:38:50 2020 -0700 [GOBBLIN-1193] Ensure that ingestion latency is 0 when no records are … Closes #3041 from sv2000/kafkaExtractorLatency --- .../extract/kafka/KafkaExtractorStatsTracker.java | 16 ++++++++++++++-- .../extract/kafka/KafkaExtractorStatsTrackerTest.java | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java index 77cca0c..25c94d6 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java @@ -327,19 +327,31 @@ public class KafkaExtractorStatsTracker { if (partitionStats.getStopFetchEpochTime() > aggregateExtractorStats.getMaxStopFetchEpochTime()) { aggregateExtractorStats.setMaxStopFetchEpochTime(partitionStats.getStopFetchEpochTime()); } - long partitionLatency = partitionStats.getStopFetchEpochTime() - partitionStats.getMinLogAppendTime(); + + long partitionLatency = 0L; + //Check if there are any records consumed from this KafkaPartition. + if (partitionStats.getMinLogAppendTime() > 0) { + partitionLatency = partitionStats.getStopFetchEpochTime() - partitionStats.getMinLogAppendTime(); + } + if (aggregateExtractorStats.getMaxIngestionLatency() < partitionLatency) { aggregateExtractorStats.setMaxIngestionLatency(partitionLatency); } + if (aggregateExtractorStats.getMinLogAppendTime() > partitionStats.getMinLogAppendTime()) { aggregateExtractorStats.setMinLogAppendTime(partitionStats.getMinLogAppendTime()); } + if (aggregateExtractorStats.getMaxLogAppendTime() < partitionStats.getMaxLogAppendTime()) { aggregateExtractorStats.setMaxLogAppendTime(partitionStats.getMaxLogAppendTime()); } + aggregateExtractorStats.setProcessedRecordCount(aggregateExtractorStats.getProcessedRecordCount() + partitionStats.getProcessedRecordCount()); aggregateExtractorStats.setNumBytesConsumed(aggregateExtractorStats.getNumBytesConsumed() + partitionStats.getPartitionTotalSize()); - aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount() + partitionStats.getSlaMissedRecordCount()); + + if (partitionStats.getSlaMissedRecordCount() > 0) { + aggregateExtractorStats.setSlaMissedRecordCount(aggregateExtractorStats.getSlaMissedRecordCount() + partitionStats.getSlaMissedRecordCount()); + } } private Map<String, String> createTagsForPartition(int partitionId, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, MultiLongWatermark nextWatermark) { diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java index 67e1146..9cecc59 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java @@ -211,12 +211,26 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertTrue(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES) >= 15); } - @Test (dependsOnMethods = "testGetAvgRecordSize") + @Test (dependsOnMethods = "testGetMaxLatency") public void testGetConsumptionRateMBps() { double a = this.extractorStatsTracker.getConsumptionRateMBps(); Assert.assertEquals((new Double(Math.ceil(a * epochDurationMs * 1024 * 1024) / 1000)).longValue(), 300L); } + @Test (dependsOnMethods = "testGetConsumptionRateMBps") + public void testGetMaxLatencyNoRecordsInEpoch() { + //Close the previous epoch + this.extractorStatsTracker.reset(); + Long readStartTime = System.nanoTime(); + //Call update on partitions 1 and 2 with no records cosumed from each partition + this.extractorStatsTracker.updateStatisticsForCurrentPartition(0, readStartTime, 0); + this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0); + //Close the epoch + this.extractorStatsTracker.reset(); + //Ensure the max latency is 0 when there are no records + Assert.assertEquals(this.extractorStatsTracker.getMaxIngestionLatency(TimeUnit.MINUTES), 0L); + } + @Test public void testGenerateTagsForPartitions() throws Exception { MultiLongWatermark lowWatermark = new MultiLongWatermark(Arrays.asList(new Long(10), new Long(20)));