Repository: kafka Updated Branches: refs/heads/0.11.0 4671a486c -> 847098610
KAFKA-5368: Fix skipped record metrics to use rate of sum instead of rate of count This resolved the issue with Kafka Streams skipped records sensor reporting wrong values. Jira ticket: https://issues.apache.org/jira/browse/KAFKA-5368 The contribution is my original work and I license the work to the project under the project's open source license. Author: Hamidreza Afzali <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3206 from hrafzali/KAFKA-5368_skipped-records-sensor-bug (cherry picked from commit 0a8b10e27f573612e1d5d6787b2096d3a8528b94) Signed-off-by: Guozhang Wang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84709861 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84709861 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84709861 Branch: refs/heads/0.11.0 Commit: 84709861055f677123c88b19a5f1bd44576e2a3f Parents: 4671a48 Author: Hamidreza Afzali <[email protected]> Authored: Fri Jun 2 12:20:57 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Jun 2 12:21:10 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/metrics/stats/Sum.java | 45 ++++++++++++++++++++ .../processor/internals/StreamThread.java | 3 +- 2 files changed, 47 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/84709861/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java new file mode 100644 index 0000000..b40e9cd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics.stats; + +import java.util.List; + +import org.apache.kafka.common.metrics.MetricConfig; + +/** + * A {@link SampledStat} that maintains the sum of what it has seen. + */ +public class Sum extends SampledStat { + + public Sum() { + super(0); + } + + @Override + protected void update(Sample sample, MetricConfig config, double value, long now) { + sample.value += value; + } + + @Override + public double combine(List<Sample> samples, MetricConfig config, long now) { + double total = 0.0; + for (Sample sample : samples) + total += sample.value; + return total; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/84709861/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 44cd1b1..624a15e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Sum; import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.utils.Time; @@ -359,7 +360,7 @@ public class StreamThread extends Thread { tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(new Count())); skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records"); - skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Count())); + skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Sum())); }
