This is an automated email from the ASF dual-hosted git repository. hutran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 569f260 [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when… 569f260 is described below commit 569f260ad23325a393b4b8a273951e907c09d5d3 Author: Hung Tran <hut...@linkedin.com> AuthorDate: Fri Mar 27 11:34:20 2020 -0700 [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when… Closes #2941 from htran1/kafka-missing-avg-fetch- time --- .../extractor/extract/kafka/KafkaExtractor.java | 10 +++++- .../extract/kafka/KafkaExtractorStatsTracker.java | 25 ++------------- .../kafka/KafkaExtractorStatsTrackerTest.java | 37 +++++++++++++++++++--- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index 4055fb0..2006a57 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import lombok.Getter; @@ -319,8 +321,14 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount()); this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.statsTracker.getUndecodableMessageCount()); this.workUnitState.setActualHighWatermark(this.nextWatermark); + + // Need to call this even when not emitting metrics because some state, such as the average pull time, + // is updated when the tags are generated + Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = this.statsTracker.generateTagsForPartitions( + this.lowWatermark, this.highWatermark, this.nextWatermark, Maps.newHashMap()); + if (isInstrumentationEnabled()) { - this.statsTracker.emitTrackingEvents(getMetricContext(), this.lowWatermark, this.highWatermark, this.nextWatermark); + this.statsTracker.emitTrackingEvents(getMetricContext(), tagsForPartitionsMap); } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java index a1f4520..2913b54 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java @@ -396,30 +396,9 @@ public class KafkaExtractorStatsTracker { /** * Emit Tracking events reporting the various statistics to be consumed by a monitoring application. * @param context the current {@link MetricContext} - * @param lowWatermark begin Kafka offset for each topic partition - * @param highWatermark the expected last Kafka offset for each topic partition to be consumed by the Extractor - * @param nextWatermark the offset of next valid message for each Kafka topic partition consumed by the Extractor + * @param tagsForPartitionsMap tags for each partition */ - public void emitTrackingEvents(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, - MultiLongWatermark nextWatermark) { - emitTrackingEventsWithAdditionalTags(context, lowWatermark, highWatermark, nextWatermark, Maps.newHashMap()); - } - - /** - * Emit Tracking events reporting the various statistics to be consumed by a monitoring application, with additional - * map representing tags beyond what are constructed in {@link #createTagsForPartition(int, MultiLongWatermark, MultiLongWatermark, MultiLongWatermark) } - * - * Choose to not to make createTagsForPartition extensible to avoid additional derived class just for additional k-v pairs - * in the tag maps. - * - * @param additionalTags caller-provided mapping from {@link KafkaPartition} to {@link Map<String, String>}, which will - * be merged with result of {@link #createTagsForPartition}. - */ - public void emitTrackingEventsWithAdditionalTags(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark, - MultiLongWatermark nextWatermark, Map<KafkaPartition, Map<String, String>> additionalTags) { - Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = - generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, additionalTags); - + public void emitTrackingEvents(MetricContext context, Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap) { for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) { EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE); eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME)); diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java index 278ff9b..1f959f2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java @@ -33,10 +33,11 @@ import com.google.common.collect.ImmutableMap; import org.apache.gobblin.configuration.WorkUnitState; - +@Test(singleThreaded = true) public class KafkaExtractorStatsTrackerTest { List<KafkaPartition> kafkaPartitions = new ArrayList<>(); private KafkaExtractorStatsTracker extractorStatsTracker; + private WorkUnitState workUnitState; final static KafkaPartition PARTITION0 = new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build(); final static KafkaPartition PARTITION1 = new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build(); @@ -44,9 +45,9 @@ public class KafkaExtractorStatsTrackerTest { public void setUp() { kafkaPartitions.add(PARTITION0); kafkaPartitions.add(PARTITION1); - WorkUnitState workUnitState = new WorkUnitState(); - workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L); - workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true); + this.workUnitState = new WorkUnitState(); + this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L); + this.workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true); this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions); } @@ -195,8 +196,36 @@ public class KafkaExtractorStatsTrackerTest { MultiLongWatermark nextWatermark = new MultiLongWatermark(Arrays.asList(new Long(15), new Long(25))); Map<KafkaPartition, Map<String, String>> addtionalTags = ImmutableMap.of(PARTITION0, ImmutableMap.of("testKey", "testValue")); + + this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0)); + this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0)); + KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, PARTITION0, 0); + + KafkaExtractorStatsTracker.ExtractorStats extractorStats = this.extractorStatsTracker.getStatsMap() + .get(kafkaPartitions.get(0)); + + extractorStats.setStartFetchEpochTime(1000); + extractorStats.setStopFetchEpochTime(10000); + extractorStats.setAvgMillisPerRecord(10.1); + Map<KafkaPartition, Map<String, String>> result = extractorStatsTracker.generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, addtionalTags); + + // generateTagsForPartitions will set the following in the workUnitState + Assert.assertEquals(this.workUnitState.getPropAsLong( + KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0)), + extractorStats.getStartFetchEpochTime()); + Assert.assertEquals(this.workUnitState.getPropAsLong( + KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0)), + extractorStats.getStopFetchEpochTime()); + Assert.assertEquals(KafkaUtils.getPartitionAvgRecordMillis(this.workUnitState, PARTITION0), + extractorStats.getAvgMillisPerRecord()); + + // restore values since other tests check for them + extractorStats.setStartFetchEpochTime(0); + extractorStats.setStopFetchEpochTime(0); + extractorStats.setAvgMillisPerRecord(-1); + Assert.assertTrue(result.get(PARTITION0).containsKey("testKey")); Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue"); Assert.assertFalse(result.get(PARTITION1).containsKey("testKey"));