This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 5e742cf [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin… 5e742cf is described below commit 5e742cfd84d01d368b46c5db16688b870c389871 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Tue Mar 17 21:37:13 2020 -0700 [GOBBLIN-1087] Track and report histogram of observed lag from Gobblin… Closes #2928 from sv2000/observedLag --- gobblin-core-base/build.gradle | 3 +- gobblin-modules/gobblin-kafka-common/build.gradle | 9 ++ .../kafka/HdrHistogramPerformanceBenchmark.java | 154 +++++++++++++++++++++ .../gobblin/kafka/client/KafkaConsumerRecord.java | 17 +-- .../extractor/extract/kafka/KafkaExtractor.java | 10 +- .../extract/kafka/KafkaExtractorStatsTracker.java | 89 +++++++++++- .../extractor/extract/kafka/KafkaSource.java | 20 ++- .../kafka/KafkaExtractorStatsTrackerTest.java | 58 ++++++-- gobblin-runtime/build.gradle | 3 +- gradle/scripts/dependencyDefinitions.gradle | 3 + 10 files changed, 339 insertions(+), 27 deletions(-) diff --git a/gobblin-core-base/build.gradle b/gobblin-core-base/build.gradle index 80b0f67..11ca360 100644 --- a/gobblin-core-base/build.gradle +++ b/gobblin-core-base/build.gradle @@ -38,8 +38,7 @@ dependencies { testCompile externalDependency.testng testCompile externalDependency.mockito - - jmh 'org.openjdk.jmh:jmh-core:1.17.3' + testCompile externalDependency.jmh } test { diff --git a/gobblin-modules/gobblin-kafka-common/build.gradle b/gobblin-modules/gobblin-kafka-common/build.gradle index d38d7d3..dba07a7 100644 --- a/gobblin-modules/gobblin-kafka-common/build.gradle +++ b/gobblin-modules/gobblin-kafka-common/build.gradle @@ -16,6 +16,7 @@ */ apply plugin: 'java' +apply plugin: 'me.champeau.gradle.jmh' dependencies { compile project(":gobblin-api") @@ -33,6 +34,7 @@ dependencies { compile externalDependency.commonsPool compile externalDependency.guava compile externalDependency.gson + compile externalDependency.hdrHistogram compile externalDependency.jacksonCore compile externalDependency.jacksonMapper compile externalDependency.slf4j @@ -44,6 +46,7 @@ dependencies { testCompile project(":gobblin-test-utils") testCompile externalDependency.mockito testCompile externalDependency.testng + testCompile externalDependency.jmh } configurations { @@ -59,4 +62,10 @@ test { workingDir rootProject.rootDir } +jmh { + include = "" + zip64 = true + duplicateClassesStrategy = "EXCLUDE" +} + ext.classification="library" diff --git a/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java new file mode 100644 index 0000000..23631f9 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/jmh/java/org/apache/gobblin/source/extractor/extract/kafka/HdrHistogramPerformanceBenchmark.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.source.extractor.extract.kafka; + +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.HdrHistogram.Histogram; +import org.apache.commons.math3.random.RandomDataGenerator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import lombok.extern.slf4j.Slf4j; + + +/** + * A micro-benchmark to measure the time taken to serialize a {@link Histogram} instance to its String representation. The + * benchmark uses a Random number generator to generate values according to a Uniform Distribution, an adversarial pattern + * for a Histogram that is likely to produce more count buckets in comparison with a skewed distribution. The benchmark + * provides an upper bound on memory footprint of the histogram, serialization time, as well as the size of the + * serialized representation. + */ +@Warmup (iterations = 3) +@Measurement (iterations = 10) +@BenchmarkMode (value = Mode.AverageTime) +@Fork (value = 1) +@OutputTimeUnit (TimeUnit.MILLISECONDS) +@Slf4j +public class HdrHistogramPerformanceBenchmark { + + @State (value = Scope.Benchmark) + public static class HistogramState { + private static long MIN_VALUE = 1; + private static long MAX_VALUE = TimeUnit.HOURS.toMillis(24); + + private Histogram histogram1; + private Histogram histogram2; + private Histogram histogram3; + private Histogram histogram4; + + private final RandomDataGenerator random = new RandomDataGenerator(); + + @Setup (value = Level.Iteration) + public void setUp() { + this.histogram1 = buildHistogram(1000000); + this.histogram2 = buildHistogram(2000000); + this.histogram3 = buildHistogram(4000000); + this.histogram4 = buildHistogram(10000000); + } + + private Histogram buildHistogram(int size) { + Histogram histogram = new Histogram(MIN_VALUE, MAX_VALUE, 3); + IntStream.range(0, size).mapToLong(i -> random.nextLong(MIN_VALUE, MAX_VALUE)) + .forEachOrdered(histogram::recordValue); + System.out.println("Estimated memory footprint of histogram is: " + histogram.getEstimatedFootprintInBytes()); + return histogram; + } + + @TearDown (value = Level.Iteration) + public void tearDown() { + this.histogram1.reset(); + this.histogram2.reset(); + this.histogram3.reset(); + this.histogram4.reset(); + } + } + + @Benchmark + public String trackHistogram1MToStringConversion(HistogramState histogramState) { + String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram1); + System.out.println("Histogram serialized string size: " + histogramString.length()); + return histogramString; + } + + @Benchmark + public String trackHistogram2MToStringConversion(HistogramState histogramState) { + String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram2); + System.out.println("Histogram serialized string size: " + histogramString.length()); + return histogramString; + } + + @Benchmark + public String trackHistogram4MToStringConversion(HistogramState histogramState) { + String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram3); + System.out.println("Histogram serialized string size: " + histogramString.length()); + return histogramString; + } + + @Benchmark + public String trackHistogram10MToStringConversion(HistogramState histogramState) { + String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogramState.histogram4); + System.out.println("Histogram serialized string size: " + histogramString.length()); + return histogramString; + } + + @Benchmark + public Histogram trackMergeHistogram(HistogramState histogramState) { + Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3); + histogram.add(histogramState.histogram1); + histogram.add(histogramState.histogram2); + histogram.add(histogramState.histogram3); + histogram.add(histogramState.histogram4); + return histogram; + } + + @Benchmark + public Histogram trackBuildHistogram(HistogramState histogramState) { + Histogram histogram = new Histogram(histogramState.MIN_VALUE, histogramState.MAX_VALUE, 3); + return histogram; + } + + @Benchmark + public void trackResetHistogram(HistogramState histogramState, Blackhole blackhole) { + int dummyVal = 1; + histogramState.histogram4.reset(); + blackhole.consume(dummyVal); + } + + public static void main(String[] args) throws Exception { + ChainedOptionsBuilder opt = new OptionsBuilder() + .include(HdrHistogramPerformanceBenchmark.class.getSimpleName()) + .warmupIterations(3) + .measurementIterations(10); + new Runner(opt.build()).run(); + } +} diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java index cba9fa2..4808436 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/KafkaConsumerRecord.java @@ -16,6 +16,9 @@ */ package org.apache.gobblin.kafka.client; +import java.util.concurrent.TimeUnit; + + /** * A kafka message/record consumed from {@link GobblinKafkaConsumerClient}. This interface provides APIs to read message * metadata. Extension interfaces like {@link DecodeableKafkaRecord} or {@link ByteArrayBasedKafkaRecord} provide APIs @@ -51,14 +54,6 @@ public interface KafkaConsumerRecord { return false; } - default boolean isTimestampCreateTime() { - return false; - } - - default boolean isTimestampNone() { - return false; - } - /** * @return Partition id for this record */ @@ -69,4 +64,10 @@ public interface KafkaConsumerRecord { */ String getTopic(); + /** + * @param fieldName the field name containing the record creation time. + * @param timeUnit the timeunit for the timestamp field. + * @return the record creation timestamp, if it is available. Defaults to 0. + */ + public default long getRecordCreationTimestamp(String fieldName, TimeUnit timeUnit) { return 0; } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java index ce0377a..4055fb0 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java @@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka; import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; @@ -60,6 +61,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver; private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final String recordCreationTimestampFieldName; + private final TimeUnit recordCreationTimestampUnit; private Iterator<KafkaConsumerRecord> messageIterator = null; @Getter @@ -105,6 +108,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { // The actual high watermark starts with the low watermark this.workUnitState.setActualHighWatermark(this.lowWatermark); + + this.recordCreationTimestampFieldName = this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, null); + this.recordCreationTimestampUnit = TimeUnit.valueOf(this.workUnitState.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name())); } @Override @@ -177,7 +183,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> { D record = decodeKafkaMessage(nextValidMessage); this.statsTracker.onDecodeableRecord(this.currentPartitionIdx, readStartTime, decodeStartTime, - nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L); + nextValidMessage.getValueSizeInBytes(), nextValidMessage.isTimestampLogAppend() ? nextValidMessage.getTimestamp() : 0L, + (this.recordCreationTimestampFieldName != null) ? nextValidMessage + .getRecordCreationTimestamp(this.recordCreationTimestampFieldName, this.recordCreationTimestampUnit) : 0L); this.currentPartitionLastSuccessfulRecord = record; return record; } catch (Throwable t) { diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java index 6811fc2..a1f4520 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java @@ -16,12 +16,19 @@ */ package org.apache.gobblin.source.extractor.extract.kafka; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramLogWriter; + import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -46,6 +53,7 @@ public class KafkaExtractorStatsTracker { public static final String TOPIC = "topic"; public static final String PARTITION = "partition"; + private static final String EMPTY_STRING = ""; private static final String GOBBLIN_KAFKA_NAMESPACE = "gobblin.kafka"; private static final String KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME = "KafkaExtractorTopicMetadata"; private static final String LOW_WATERMARK = "lowWatermark"; @@ -64,32 +72,71 @@ public class KafkaExtractorStatsTracker { private static final String DECODE_RECORD_TIME = "decodeRecordTime"; private static final String FETCH_MESSAGE_BUFFER_TIME = "fetchMessageBufferTime"; private static final String LAST_RECORD_HEADER_TIMESTAMP = "lastRecordHeaderTimestamp"; + private static final String OBSERVED_LATENCY_HISTOGRAM = "observedLatencyHistogram"; @Getter private final Map<KafkaPartition, ExtractorStats> statsMap; private final Set<Integer> errorPartitions; private final WorkUnitState workUnitState; private final TaskEventMetadataGenerator taskEventMetadataGenerator; + @Getter + private final Histogram observedLatencyHistogram; private boolean isSlaConfigured; private long recordLevelSlaMillis; + //Minimum partition index processed by this task. Statistics that are aggregated across all partitions (e.g. observed latency histogram) + // processed by the task are reported against this partition index. + private int minPartitionIdx = Integer.MAX_VALUE; //A global count of number of undecodeable messages encountered by the KafkaExtractor across all Kafka //TopicPartitions. @Getter private int undecodableMessageCount = 0; private List<KafkaPartition> partitions; + private long maxPossibleLatency; public KafkaExtractorStatsTracker(WorkUnitState state, List<KafkaPartition> partitions) { this.workUnitState = state; this.partitions = partitions; this.statsMap = Maps.newHashMapWithExpectedSize(this.partitions.size()); - this.partitions.forEach(partition -> this.statsMap.put(partition, new ExtractorStats())); + this.partitions.forEach(partition -> { + this.statsMap.put(partition, new ExtractorStats()); + if (partition.getId() < minPartitionIdx) { + minPartitionIdx = partition.getId(); + } + }); this.errorPartitions = Sets.newHashSet(); if (this.workUnitState.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) { this.isSlaConfigured = true; this.recordLevelSlaMillis = TimeUnit.MINUTES.toMillis(this.workUnitState.getPropAsLong(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)); } this.taskEventMetadataGenerator = TaskEventMetadataUtils.getTaskEventMetadataGenerator(workUnitState); + if (state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, KafkaSource.DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED)) { + this.observedLatencyHistogram = buildobservedLatencyHistogram(state); + } else { + this.observedLatencyHistogram = null; + } + } + + /** + * A method that constructs a {@link Histogram} object based on a minimum value, a maximum value and precision in terms + * of number of significant digits. The returned {@link Histogram} is not an auto-resizing histogram and any outliers + * above the maximum possible value are discarded in favor of bounding the worst-case performance. + * + * @param state + * @return a non auto-resizing {@link Histogram} with a bounded range and precision. + */ + private Histogram buildobservedLatencyHistogram(WorkUnitState state) { + this.maxPossibleLatency = TimeUnit.HOURS.toMillis(state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, + KafkaSource.DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS)); + int numSignificantDigits = state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION); + if (numSignificantDigits > 5) { + log.warn("Max precision must be <= 5; Setting precision for observed latency to 5."); + numSignificantDigits = 5; + } else if (numSignificantDigits < 1) { + log.warn("Max precision must be >= 1; Setting precision to the default value of 3."); + numSignificantDigits = 3; + } + return new Histogram(1, maxPossibleLatency, numSignificantDigits); } public int getErrorPartitionCount() { @@ -161,14 +208,24 @@ public class KafkaExtractorStatsTracker { * @param decodeStartTime the time instant immediately before a record decoding begins. * @param recordSizeInBytes the size of the decoded record in bytes. * @param logAppendTimestamp the log append time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. + * @param recordCreationTimestamp the time of the {@link org.apache.gobblin.kafka.client.KafkaConsumerRecord}. */ - public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp) { + public void onDecodeableRecord(int partitionIdx, long readStartTime, long decodeStartTime, long recordSizeInBytes, long logAppendTimestamp, long recordCreationTimestamp) { this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> { long currentTime = System.nanoTime(); v.processedRecordCount++; v.partitionTotalSize += recordSizeInBytes; v.decodeRecordTime += currentTime - decodeStartTime; v.readRecordTime += currentTime - readStartTime; + if (this.observedLatencyHistogram != null && recordCreationTimestamp > 0) { + long observedLatency = System.currentTimeMillis() - recordCreationTimestamp; + // Discard outliers larger than maxPossibleLatency to avoid additional overhead that may otherwise be incurred due to dynamic + // re-sizing of Histogram when observedLatency exceeds the maximum assumed latency. Essentially, we trade-off accuracy for + // performance in a pessimistic scenario. + if (observedLatency < this.maxPossibleLatency) { + this.observedLatencyHistogram.recordValue(observedLatency); + } + } if (this.isSlaConfigured) { if (v.slaMissedRecordCount < 0) { v.slaMissedRecordCount = 0; @@ -308,10 +365,35 @@ public class KafkaExtractorStatsTracker { tagsForPartition.put(AVG_RECORD_PULL_TIME, Double.toString(-1)); } + //Report observed latency histogram as part + if ((partitionId == minPartitionIdx) && (this.observedLatencyHistogram != null)) { + tagsForPartition.put(OBSERVED_LATENCY_HISTOGRAM, convertHistogramToString(this.observedLatencyHistogram)); + } return tagsForPartition; } /** + * A helper method to serialize a {@link Histogram} to its string representation. This method uses the + * compressed logging format provided by the {@link org.HdrHistogram.HistogramLogWriter} + * to represent the Histogram as a string. The readers can use the {@link org.HdrHistogram.HistogramLogReader} to + * deserialize the string back to a {@link Histogram} object. + * @param observedLatencyHistogram + * @return + */ + @VisibleForTesting + public static String convertHistogramToString(Histogram observedLatencyHistogram) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (PrintStream stream = new PrintStream(baos, true, Charsets.UTF_8.name())) { + HistogramLogWriter histogramLogWriter = new HistogramLogWriter(stream); + histogramLogWriter.outputIntervalHistogram(observedLatencyHistogram); + return new String(baos.toByteArray(), Charsets.UTF_8); + } catch (UnsupportedEncodingException e) { + log.error("Exception {} encountered when creating PrintStream; returning empty string", e); + return EMPTY_STRING; + } + } + + /** * Emit Tracking events reporting the various statistics to be consumed by a monitoring application. * @param context the current {@link MetricContext} * @param lowWatermark begin Kafka offset for each topic partition @@ -398,5 +480,8 @@ public class KafkaExtractorStatsTracker { for (int partitionIdx = 0; partitionIdx < this.partitions.size(); partitionIdx++) { resetStartFetchEpochTime(partitionIdx); } + if (this.observedLatencyHistogram != null) { + this.observedLatencyHistogram.reset(); + } } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 087077d..0932be6 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -114,7 +114,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset"; public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime"; public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = "previousOffsetFetchEpochTime"; - public static final String NUM_TOPIC_PARTITIONS = "numTopicPartitions"; public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = "gobblin.kafka.consumerClient.class"; public static final String GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION = "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization"; @@ -125,6 +124,14 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { public static final boolean DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE = false; public static final String OFFSET_FETCH_TIMER = "offsetFetchTimer"; public static final String RECORD_LEVEL_SLA_MINUTES_KEY = "gobblin.kafka.recordLevelSlaMinutes"; + public static final String MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = "gobblin.kafka.maxobservedLatencyInHours"; + public static final Integer DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS = 24; + public static final String OBSERVED_LATENCY_PRECISION = "gobblin.kafka.observedLatencyPrecision"; + public static final Integer DEFAULT_OBSERVED_LATENCY_PRECISION = 3; + public static final String OBSERVED_LATENCY_MEASUREMENT_ENABLED = "gobblin.kafka.observedLatencyMeasurementEnabled"; + public static final Boolean DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED = false; + public static final String RECORD_CREATION_TIMESTAMP_FIELD = "gobblin.kafka.recordCreationTimestampField"; + public static final String RECORD_CREATION_TIMESTAMP_UNIT = "gobblin.kafka.recordCreationTimestampUnit"; private final Set<String> moveToLatestTopics = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); private final Map<KafkaPartition, Long> previousOffsets = Maps.newConcurrentMap(); @@ -558,6 +565,17 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { if (state.contains(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)) { workUnit.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, state.getProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY)); } + boolean isobservedLatencyMeasurementEnabled = state.getPropAsBoolean(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, DEFAULT_OBSERVED_LATENCY_MEASUREMENT_ENABLED); + if (isobservedLatencyMeasurementEnabled) { + Preconditions.checkArgument(state.contains(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD), "Missing config key: " + KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD); + workUnit.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, isobservedLatencyMeasurementEnabled); + workUnit.setProp(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, + state.getPropAsInt(KafkaSource.MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS, DEFAULT_MAX_POSSIBLE_OBSERVED_LATENCY_IN_HOURS)); + workUnit.setProp(KafkaSource.OBSERVED_LATENCY_PRECISION, + state.getPropAsInt(KafkaSource.OBSERVED_LATENCY_PRECISION, KafkaSource.DEFAULT_OBSERVED_LATENCY_PRECISION)); + workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_FIELD)); + workUnit.setProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, state.getProp(KafkaSource.RECORD_CREATION_TIMESTAMP_UNIT, TimeUnit.MILLISECONDS.name())); + } } private long getPreviousStartFetchEpochTimeForPartition(KafkaPartition partition, SourceState state) { diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java index 6502fbe..278ff9b 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java @@ -16,19 +16,23 @@ */ package org.apache.gobblin.source.extractor.extract.kafka; +import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramLogReader; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import org.apache.gobblin.configuration.WorkUnitState; - +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; +import org.apache.gobblin.configuration.WorkUnitState; + public class KafkaExtractorStatsTrackerTest { List<KafkaPartition> kafkaPartitions = new ArrayList<>(); @@ -41,7 +45,8 @@ public class KafkaExtractorStatsTrackerTest { kafkaPartitions.add(PARTITION0); kafkaPartitions.add(PARTITION1); WorkUnitState workUnitState = new WorkUnitState(); - workUnitState.setProp("gobblin.kafka.recordLevelSlaMinutes", 10L); + workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L); + workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true); this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions); } @@ -77,7 +82,9 @@ public class KafkaExtractorStatsTrackerTest { long readStartTime = System.nanoTime(); Thread.sleep(1); long decodeStartTime = System.nanoTime(); - long logAppendTimestamp = System.currentTimeMillis() - 15 * 60 * 1000L; + long currentTimeMillis = System.currentTimeMillis(); + long logAppendTimestamp = currentTimeMillis - 15 * 60 * 1000L; + long recordCreationTimestamp = currentTimeMillis - 16 * 60 * 1000L; Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 0); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 0); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() == 0); @@ -85,8 +92,9 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), -1); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), -1); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), -1); + Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 0); - this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp); + this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 1); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 100); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > 0); @@ -94,16 +102,19 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), logAppendTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp); + Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 1); readStartTime = System.nanoTime(); Thread.sleep(1); decodeStartTime = System.nanoTime(); long previousLogAppendTimestamp = logAppendTimestamp; - logAppendTimestamp = System.currentTimeMillis() - 10; + currentTimeMillis = System.currentTimeMillis(); + logAppendTimestamp = currentTimeMillis - 10; + recordCreationTimestamp = currentTimeMillis - 20; long previousDecodeRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime(); long previousReadRecordTime = this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getReadRecordTime(); - this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp); + this.extractorStatsTracker.onDecodeableRecord(0, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getProcessedRecordCount(), 2); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getPartitionTotalSize(), 200); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getDecodeRecordTime() > previousDecodeRecordTime); @@ -111,6 +122,7 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getSlaMissedRecordCount(), 1); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMinLogAppendTime(), previousLogAppendTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getMaxLogAppendTime(), logAppendTimestamp); + Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 2); } @Test @@ -145,10 +157,12 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(0)).getAvgRecordSize(), 100); readStartTime = System.nanoTime(); - long logAppendTimestamp = System.currentTimeMillis() - 10; + long currentTimeMillis = System.currentTimeMillis(); + long logAppendTimestamp = currentTimeMillis - 10; + long recordCreationTimestamp = currentTimeMillis - 20; Thread.sleep(1); long decodeStartTime = System.nanoTime(); - this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp); + this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 100, logAppendTimestamp, recordCreationTimestamp); this.extractorStatsTracker.updateStatisticsForCurrentPartition(1, readStartTime, 0); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getElapsedTime() > 0); Assert.assertTrue(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getAvgMillisPerRecord() > 0); @@ -156,6 +170,7 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getSlaMissedRecordCount(), 0); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMinLogAppendTime(), logAppendTimestamp); Assert.assertEquals(this.extractorStatsTracker.getStatsMap().get(kafkaPartitions.get(1)).getMaxLogAppendTime(), logAppendTimestamp); + Assert.assertEquals(this.extractorStatsTracker.getObservedLatencyHistogram().getTotalCount(), 3); } @Test (dependsOnMethods = "testUpdateStatisticsForCurrentPartition") @@ -166,8 +181,10 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(0), 0); long readStartTime = System.nanoTime(); long decodeStartTime = readStartTime + 1; - long logAppendTimeStamp = System.currentTimeMillis() - 10; - this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimeStamp); + long currentTimeMillis = System.currentTimeMillis(); + long logAppendTimestamp = currentTimeMillis - 10; + long recordCreationTimestamp = currentTimeMillis - 20; + this.extractorStatsTracker.onDecodeableRecord(1, readStartTime, decodeStartTime, 150, logAppendTimestamp, recordCreationTimestamp); Assert.assertEquals(this.extractorStatsTracker.getAvgRecordSize(1), 150); } @@ -184,4 +201,23 @@ public class KafkaExtractorStatsTrackerTest { Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue"); Assert.assertFalse(result.get(PARTITION1).containsKey("testKey")); } + + @Test + public void testConvertHistogramToString() { + Histogram histogram = new Histogram(1, 100, 3); + histogram.recordValue(3); + histogram.recordValue(25); + histogram.recordValue(25); + histogram.recordValue(92); + String histogramString = KafkaExtractorStatsTracker.convertHistogramToString(histogram); + + HistogramLogReader logReader = new HistogramLogReader(new ByteArrayInputStream(histogramString.getBytes( + Charsets.UTF_8))); + Histogram histogram1 = (Histogram) logReader.nextIntervalHistogram(); + Assert.assertEquals(histogram1.getTotalCount(), 4); + Assert.assertEquals(histogram1.getMaxValue(), 92); + Assert.assertEquals(histogram1.getCountAtValue(25), 2); + Assert.assertEquals(histogram1.getCountAtValue(3), 1); + Assert.assertEquals(histogram1.getCountAtValue(92), 1); + } } \ No newline at end of file diff --git a/gobblin-runtime/build.gradle b/gobblin-runtime/build.gradle index 0ff442a..306036c 100644 --- a/gobblin-runtime/build.gradle +++ b/gobblin-runtime/build.gradle @@ -94,8 +94,7 @@ dependencies { testCompile externalDependency.curatorTest testCompile externalDependency.mockito testRuntime externalDependency.derby - - jmh 'org.openjdk.jmh:jmh-core:1.17.3' + testCompile externalDependency.jmh } // Begin HACK to get around POM being depenendent on the (empty) gobblin-rest-api instead of gobblin-rest-api-rest-client diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 7c83a79..6cc9363 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -62,6 +62,7 @@ ext.externalDependency = [ "hadoopYarnMiniCluster": "org.apache.hadoop:hadoop-minicluster:" + hadoopVersion, "hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" + hadoopVersion, "hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0", + "hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11", "helix": "org.apache.helix:helix-core:0.8.2", "hiveCommon": "org.apache.hive:hive-common:" + hiveVersion, "hiveService": "org.apache.hive:hive-service:" + hiveVersion, @@ -74,6 +75,8 @@ ext.externalDependency = [ "httpcore": "org.apache.httpcomponents:httpcore:4.4.4", "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3", "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r", + "jmh": "org.openjdk.jmh:jmh-core:1.17.3", + "jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3", "kafka08": "org.apache.kafka:kafka_2.11:" + kafka08Version, "kafka08Test": "org.apache.kafka:kafka_2.11:" + kafka08Version + ":test", "kafka08Client": "org.apache.kafka:kafka-clients:" + kafka08Version,