This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a8aedc8 KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
a8aedc8 is described below
commit a8aedc85ebfadcf1472acafe2e0311a73d3040be
Author: John Roesler <[email protected]>
AuthorDate: Tue Jul 23 18:54:20 2019 -0500
KAFKA-8696: clean up Sum/Count/Total metrics (#7057)
* Clean up one redundant and one misplaced metric
* Clarify the relationship among these metrics to avoid future confusion
Reviewers: Matthias J. Sax <[email protected]>, Bill Bejeck
<[email protected]>, Guozhang Wang <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 4 +-
.../kafka/clients/consumer/internals/Fetcher.java | 4 +-
.../kafka/common/metrics/MeasurableStat.java | 2 +-
.../apache/kafka/common/metrics/stats/Count.java | 30 +++-------
.../common/metrics/stats}/CumulativeCount.java | 22 +++-----
.../stats/{Total.java => CumulativeSum.java} | 19 ++++---
.../apache/kafka/common/metrics/stats/Meter.java | 24 ++++----
.../apache/kafka/common/metrics/stats/Rate.java | 27 ++-------
.../org/apache/kafka/common/metrics/stats/Sum.java | 30 +++-------
.../apache/kafka/common/metrics/stats/Total.java | 36 +++---------
.../stats/{Count.java => WindowedCount.java} | 26 +++------
.../metrics/stats/{Sum.java => WindowedSum.java} | 11 ++--
.../org/apache/kafka/common/network/Selector.java | 18 +++---
.../kafka/common/metrics/JmxReporterTest.java | 14 ++---
.../kafka/common/metrics/KafkaMbeanTest.java | 8 +--
.../apache/kafka/common/metrics/MetricsTest.java | 64 +++++++++++-----------
.../apache/kafka/common/metrics/SensorTest.java | 4 +-
.../kafka/common/metrics/stats/MeterTest.java | 2 +-
.../java/org/apache/kafka/test/MetricsBench.java | 4 +-
.../org/apache/kafka/connect/runtime/Worker.java | 14 ++---
.../kafka/connect/runtime/WorkerSinkTask.java | 10 ++--
.../kafka/connect/runtime/WorkerSourceTask.java | 6 +-
.../runtime/distributed/DistributedHerder.java | 4 +-
.../runtime/errors/ErrorHandlingMetrics.java | 16 +++---
.../main/scala/kafka/network/SocketServer.scala | 5 +-
.../scala/kafka/server/ClientQuotaManager.scala | 4 +-
.../streams/kstream/internals/metrics/Sensors.java | 8 +--
.../streams/processor/internals/StreamTask.java | 8 +--
.../internals/metrics/StreamsMetricsImpl.java | 5 +-
.../processor/internals/RecordCollectorTest.java | 4 +-
.../processor/internals/StandbyTaskTest.java | 4 +-
.../apache/kafka/streams/TopologyTestDriver.java | 8 +--
32 files changed, 185 insertions(+), 260 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d5faa6e..b92a4a6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -42,9 +42,9 @@ import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
@@ -961,7 +961,7 @@ public abstract class AbstractCoordinator implements
Closeable {
}
protected Meter createMeter(Metrics metrics, String groupName, String
baseName, String descriptiveName) {
- return new Meter(new Count(),
+ return new Meter(new WindowedCount(),
metrics.metricName(baseName + "-rate", groupName,
String.format("The number of %s per second",
descriptiveName)),
metrics.metricName(baseName + "-total", groupName,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 08ab9fb..d4d028d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -47,10 +47,10 @@ import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -1657,7 +1657,7 @@ public class Fetcher<K, V> implements Closeable {
this.fetchLatency = metrics.sensor("fetch-latency");
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg),
new Avg());
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax),
new Max());
- this.fetchLatency.add(new Meter(new Count(),
metrics.metricInstance(metricsRegistry.fetchRequestRate),
+ this.fetchLatency.add(new Meter(new WindowedCount(),
metrics.metricInstance(metricsRegistry.fetchRequestRate),
metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
this.recordsFetchLag = metrics.sensor("records-lag");
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
index aedac9a..035449e 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MeasurableStat.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.metrics;
/**
* A MeasurableStat is a {@link Stat} that is also {@link Measurable} (i.e.
can produce a single floating point value).
* This is the interface used for most of the simple statistics such as {@link
org.apache.kafka.common.metrics.stats.Avg},
- * {@link org.apache.kafka.common.metrics.stats.Max}, {@link
org.apache.kafka.common.metrics.stats.Count}, etc.
+ * {@link org.apache.kafka.common.metrics.stats.Max}, {@link
org.apache.kafka.common.metrics.stats.CumulativeCount}, etc.
*/
public interface MeasurableStat extends Stat, Measurable {
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
index 3da91c4..2bef7cf 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
@@ -16,30 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
-import java.util.List;
-
-import org.apache.kafka.common.metrics.MetricConfig;
-
/**
* A {@link SampledStat} that maintains a simple count of what it has seen.
+ * This is a special kind of {@link WindowedSum} that always records a value
of {@code 1} instead of the provided value.
+ *
+ * See also {@link CumulativeCount} for a non-sampled version of this metric.
+ *
+ * @deprecated since 2.4 . Use {@link WindowedCount} instead
*/
-public class Count extends SampledStat {
-
- public Count() {
- super(0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value,
long now) {
- sample.value += 1.0;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now)
{
- double total = 0.0;
- for (Sample sample : samples)
- total += sample.value;
- return total;
- }
-
+@Deprecated
+public class Count extends WindowedCount {
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
similarity index 66%
rename from
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
rename to
clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
index 2c12c2b..85591b5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
+++
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeCount.java
@@ -14,25 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.processor.internals.metrics;
+package org.apache.kafka.common.metrics.stats;
-import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
/**
- * A non-SampledStat version of Count for measuring -total metrics in streams
+ * A non-sampled version of {@link WindowedCount} maintained over all time.
+ *
+ * This is a special kind of {@link CumulativeSum} that always records {@code
1} instead of the provided value.
+ * In other words, it counts the number of
+ * {@link CumulativeCount#record(MetricConfig, double, long)} invocations,
+ * instead of summing the recorded values.
*/
-public class CumulativeCount implements MeasurableStat {
-
- private double count = 0.0;
-
+public class CumulativeCount extends CumulativeSum {
@Override
public void record(final MetricConfig config, final double value, final
long timeMs) {
- count += 1;
- }
-
- @Override
- public double measure(final MetricConfig config, final long now) {
- return count;
+ super.record(config, 1, timeMs);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
similarity index 73%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
copy to
clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
index b8a83f5..13f12a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/CumulativeSum.java
@@ -20,28 +20,31 @@ import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
/**
- * An un-windowed cumulative total maintained over all time.
+ * An non-sampled cumulative total maintained over all time.
+ * This is a non-sampled version of {@link WindowedSum}.
+ *
+ * See also {@link CumulativeCount} if you just want to increment the value by
1 on each recording.
*/
-public class Total implements MeasurableStat {
+public class CumulativeSum implements MeasurableStat {
private double total;
- public Total() {
- this.total = 0.0;
+ public CumulativeSum() {
+ total = 0.0;
}
- public Total(double value) {
- this.total = value;
+ public CumulativeSum(double value) {
+ total = value;
}
@Override
public void record(MetricConfig config, double value, long now) {
- this.total += value;
+ total += value;
}
@Override
public double measure(MetricConfig config, long now) {
- return this.total;
+ return total;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
index 91d4461..a6bdc9f 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
@@ -23,48 +23,46 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.CompoundStat;
import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.stats.Rate.SampledTotal;
/**
* A compound stat that includes a rate metric and a cumulative total metric.
*/
public class Meter implements CompoundStat {
-
private final MetricName rateMetricName;
private final MetricName totalMetricName;
private final Rate rate;
- private final Total total;
+ private final CumulativeSum total;
/**
- * Construct a Meter with seconds as time unit and {@link SampledTotal}
stats for Rate
+ * Construct a Meter with seconds as time unit
*/
public Meter(MetricName rateMetricName, MetricName totalMetricName) {
- this(TimeUnit.SECONDS, new SampledTotal(), rateMetricName,
totalMetricName);
+ this(TimeUnit.SECONDS, new WindowedSum(), rateMetricName,
totalMetricName);
}
/**
- * Construct a Meter with provided time unit and {@link SampledTotal}
stats for Rate
+ * Construct a Meter with provided time unit
*/
public Meter(TimeUnit unit, MetricName rateMetricName, MetricName
totalMetricName) {
- this(unit, new SampledTotal(), rateMetricName, totalMetricName);
+ this(unit, new WindowedSum(), rateMetricName, totalMetricName);
}
/**
- * Construct a Meter with seconds as time unit and provided {@link
SampledStat} stats for Rate
+ * Construct a Meter with seconds as time unit
*/
public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName
totalMetricName) {
this(TimeUnit.SECONDS, rateStat, rateMetricName, totalMetricName);
}
/**
- * Construct a Meter with provided time unit and provided {@link
SampledStat} stats for Rate
+ * Construct a Meter with provided time unit
*/
public Meter(TimeUnit unit, SampledStat rateStat, MetricName
rateMetricName, MetricName totalMetricName) {
- if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof
Count)) {
- throw new IllegalArgumentException("Meter is supported only for
SampledTotal and Count");
+ if (!(rateStat instanceof WindowedSum)) {
+ throw new IllegalArgumentException("Meter is supported only for
WindowedCount or WindowedSum.");
}
- this.total = new Total();
+ this.total = new CumulativeSum();
this.rate = new Rate(unit, rateStat);
this.rateMetricName = rateMetricName;
this.totalMetricName = totalMetricName;
@@ -81,7 +79,7 @@ public class Meter implements CompoundStat {
public void record(MetricConfig config, double value, long timeMs) {
rate.record(config, value, timeMs);
// Total metrics with Count stat should record 1.0 (as recorded in the
count)
- double totalValue = (rate.stat instanceof Count) ? 1.0 : value;
+ double totalValue = (rate.stat instanceof WindowedCount) ? 1.0 : value;
total.record(config, totalValue, timeMs);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index a56734c..604f860 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.metrics.stats;
-import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
@@ -40,7 +39,7 @@ public class Rate implements MeasurableStat {
}
public Rate(TimeUnit unit) {
- this(unit, new SampledTotal());
+ this(unit, new WindowedSum());
}
public Rate(SampledStat stat) {
@@ -115,24 +114,10 @@ public class Rate implements MeasurableStat {
}
}
- public static class SampledTotal extends SampledStat {
-
- public SampledTotal() {
- super(0.0d);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double
value, long timeMs) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long
now) {
- double total = 0.0;
- for (Sample sample : samples)
- total += sample.value;
- return total;
- }
-
+ /**
+ * @deprecated since 2.4 Use {@link WindowedSum} instead.
+ */
+ @Deprecated
+ public static class SampledTotal extends WindowedSum {
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
index b40e9cd..17188b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
@@ -16,30 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
-import java.util.List;
-
-import org.apache.kafka.common.metrics.MetricConfig;
-
/**
* A {@link SampledStat} that maintains the sum of what it has seen.
+ * This is a sampled version of {@link CumulativeSum}.
+ *
+ * See also {@link WindowedCount} if you want to increment the value by 1 on
each recording.
+ *
+ * @deprecated since 2.4 . Use {@link WindowedSum} instead
*/
-public class Sum extends SampledStat {
-
- public Sum() {
- super(0);
- }
-
- @Override
- protected void update(Sample sample, MetricConfig config, double value,
long now) {
- sample.value += value;
- }
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now)
{
- double total = 0.0;
- for (Sample sample : samples)
- total += sample.value;
- return total;
- }
-
+@Deprecated
+public class Sum extends WindowedSum {
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
index b8a83f5..23f7d04 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java
@@ -16,32 +16,14 @@
*/
package org.apache.kafka.common.metrics.stats;
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.MetricConfig;
-
/**
- * An un-windowed cumulative total maintained over all time.
+ * An non-sampled cumulative total maintained over all time.
+ * This is a non-sampled version of {@link WindowedSum}.
+ *
+ * See also {@link CumulativeCount} if you just want to increment the value by
1 on each recording.
+ *
+ * @deprecated since 2.4 . Use {@link CumulativeSum} instead.
*/
-public class Total implements MeasurableStat {
-
- private double total;
-
- public Total() {
- this.total = 0.0;
- }
-
- public Total(double value) {
- this.total = value;
- }
-
- @Override
- public void record(MetricConfig config, double value, long now) {
- this.total += value;
- }
-
- @Override
- public double measure(MetricConfig config, long now) {
- return this.total;
- }
-
-}
+@Deprecated
+public class Total extends CumulativeSum {
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
similarity index 70%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
copy to
clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
index 3da91c4..825f404 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java
+++
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedCount.java
@@ -16,30 +16,20 @@
*/
package org.apache.kafka.common.metrics.stats;
-import java.util.List;
-
import org.apache.kafka.common.metrics.MetricConfig;
/**
* A {@link SampledStat} that maintains a simple count of what it has seen.
+ * This is a special kind of {@link WindowedSum} that always records a value
of {@code 1} instead of the provided value.
+ * In other words, it counts the number of
+ * {@link WindowedCount#record(MetricConfig, double, long)} invocations,
+ * instead of summing the recorded values.
+ *
+ * See also {@link CumulativeCount} for a non-sampled version of this metric.
*/
-public class Count extends SampledStat {
-
- public Count() {
- super(0);
- }
-
+public class WindowedCount extends WindowedSum {
@Override
protected void update(Sample sample, MetricConfig config, double value,
long now) {
- sample.value += 1.0;
+ super.update(sample, config, 1.0, now);
}
-
- @Override
- public double combine(List<Sample> samples, MetricConfig config, long now)
{
- double total = 0.0;
- for (Sample sample : samples)
- total += sample.value;
- return total;
- }
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
similarity index 86%
copy from clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
copy to
clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
index b40e9cd..14aa562 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Sum.java
+++
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/WindowedSum.java
@@ -16,16 +16,19 @@
*/
package org.apache.kafka.common.metrics.stats;
-import java.util.List;
-
import org.apache.kafka.common.metrics.MetricConfig;
+import java.util.List;
+
/**
* A {@link SampledStat} that maintains the sum of what it has seen.
+ * This is a sampled version of {@link CumulativeSum}.
+ *
+ * See also {@link WindowedCount} if you want to increment the value by 1 on
each recording.
*/
-public class Sum extends SampledStat {
+public class WindowedSum extends SampledStat {
- public Sum() {
+ public WindowedSum() {
super(0);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 20e24b7..2652906 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -23,11 +23,11 @@ import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -1084,7 +1084,7 @@ public class Selector implements Selectable,
AutoCloseable {
"successful-authentication-no-reauth-total", metricGrpName,
"The total number of connections with successful
authentication where the client does not support re-authentication",
metricTags);
-
this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName,
new Total());
+
this.successfulAuthenticationNoReauth.add(successfulAuthenticationNoReauthMetricName,
new CumulativeSum());
this.failedAuthentication = sensor("failed-authentication:" +
tagsSuffix);
this.failedAuthentication.add(createMeter(metrics, metricGrpName,
metricTags,
@@ -1105,13 +1105,13 @@ public class Selector implements Selectable,
AutoCloseable {
this.reauthenticationLatency.add(reauthenticationLatencyAvgMetricName, new
Avg());
this.bytesTransferred = sensor("bytes-sent-received:" +
tagsSuffix);
- bytesTransferred.add(createMeter(metrics, metricGrpName,
metricTags, new Count(),
+ bytesTransferred.add(createMeter(metrics, metricGrpName,
metricTags, new WindowedCount(),
"network-io", "network operations (reads or writes) on all
connections"));
this.bytesSent = sensor("bytes-sent:" + tagsSuffix,
bytesTransferred);
this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
"outgoing-byte", "outgoing bytes sent to all servers"));
- this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
new Count(),
+ this.bytesSent.add(createMeter(metrics, metricGrpName, metricTags,
new WindowedCount(),
"request", "requests sent"));
MetricName metricName = metrics.metricName("request-size-avg",
metricGrpName, "The average size of requests sent.", metricTags);
this.bytesSent.add(metricName, new Avg());
@@ -1122,11 +1122,11 @@ public class Selector implements Selectable,
AutoCloseable {
this.bytesReceived.add(createMeter(metrics, metricGrpName,
metricTags,
"incoming-byte", "bytes read off all sockets"));
this.bytesReceived.add(createMeter(metrics, metricGrpName,
metricTags,
- new Count(), "response", "responses received"));
+ new WindowedCount(), "response", "responses received"));
this.selectTime = sensor("select-time:" + tagsSuffix);
this.selectTime.add(createMeter(metrics, metricGrpName, metricTags,
- new Count(), "select", "times the I/O layer checked for
new I/O to perform"));
+ new WindowedCount(), "select", "times the I/O layer
checked for new I/O to perform"));
metricName = metrics.metricName("io-wait-time-ns-avg",
metricGrpName, "The average length of time the I/O thread spent waiting for a
socket ready for reads or writes in nanoseconds.", metricTags);
this.selectTime.add(metricName, new Avg());
this.selectTime.add(createIOThreadRatioMeter(metrics,
metricGrpName, metricTags, "io-wait", "waiting"));
@@ -1187,7 +1187,7 @@ public class Selector implements Selectable,
AutoCloseable {
nodeRequest = sensor(nodeRequestName);
nodeRequest.add(createMeter(metrics, metricGrpName, tags,
"outgoing-byte", "outgoing bytes"));
- nodeRequest.add(createMeter(metrics, metricGrpName, tags,
new Count(), "request", "requests sent"));
+ nodeRequest.add(createMeter(metrics, metricGrpName, tags,
new WindowedCount(), "request", "requests sent"));
MetricName metricName =
metrics.metricName("request-size-avg", metricGrpName, "The average size of
requests sent.", tags);
nodeRequest.add(metricName, new Avg());
metricName = metrics.metricName("request-size-max",
metricGrpName, "The maximum size of any request sent.", tags);
@@ -1196,7 +1196,7 @@ public class Selector implements Selectable,
AutoCloseable {
String nodeResponseName = "node-" + connectionId +
".bytes-received";
Sensor nodeResponse = sensor(nodeResponseName);
nodeResponse.add(createMeter(metrics, metricGrpName, tags,
"incoming-byte", "incoming bytes"));
- nodeResponse.add(createMeter(metrics, metricGrpName, tags,
new Count(), "response", "responses received"));
+ nodeResponse.add(createMeter(metrics, metricGrpName, tags,
new WindowedCount(), "response", "responses received"));
String nodeTimeName = "node-" + connectionId + ".latency";
Sensor nodeRequestTime = sensor(nodeTimeName);
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index c6e112a..37d1182 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.junit.Test;
import javax.management.MBeanServer;
@@ -43,7 +43,7 @@ public class JmxReporterTest {
Sensor sensor = metrics.sensor("kafka.requests");
sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new
Avg());
- sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new
Total());
+ sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new
CumulativeSum());
assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
assertEquals(Double.NaN, server.getAttribute(new
ObjectName(":type=grp1"), "pack.bean1.avg"));
@@ -79,11 +79,11 @@ public class JmxReporterTest {
metrics.addReporter(new JmxReporter());
Sensor sensor = metrics.sensor("kafka.requests");
- sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo*"), new Total());
- sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo+"), new Total());
- sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo?"), new Total());
- sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo:"), new Total());
- sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo%"), new Total());
+ sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo*"), new CumulativeSum());
+ sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo+"), new CumulativeSum());
+ sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo?"), new CumulativeSum());
+ sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo:"), new CumulativeSum());
+ sensor.add(metrics.metricName("name", "group", "desc", "id",
"foo%"), new CumulativeSum());
assertTrue(server.isRegistered(new
ObjectName(":type=group,id=\"foo\\*\"")));
assertEquals(0.0, server.getAttribute(new
ObjectName(":type=group,id=\"foo\\*\""), "name"));
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
index dd494c8..ea1178e 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMbeanTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.common.metrics;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,9 +51,9 @@ public class KafkaMbeanTest {
metrics.addReporter(new JmxReporter());
sensor = metrics.sensor("kafka.requests");
countMetricName = metrics.metricName("pack.bean1.count", "grp1");
- sensor.add(countMetricName, new Count());
+ sensor.add(countMetricName, new WindowedCount());
sumMetricName = metrics.metricName("pack.bean1.sum", "grp1");
- sensor.add(sumMetricName, new Sum());
+ sensor.add(sumMetricName, new WindowedSum());
}
@After
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 98b468a..0b1ea85 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -44,7 +44,7 @@ import java.util.function.Function;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
@@ -52,9 +52,9 @@ import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.metrics.stats.SimpleRate;
-import org.apache.kafka.common.metrics.stats.Sum;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.MockTime;
import org.junit.After;
@@ -119,15 +119,15 @@ public class MetricsTest {
s.add(metrics.metricName("test.min", "grp1"), new Min());
s.add(new Meter(TimeUnit.SECONDS, metrics.metricName("test.rate",
"grp1"),
metrics.metricName("test.total", "grp1")));
- s.add(new Meter(TimeUnit.SECONDS, new Count(),
metrics.metricName("test.occurences", "grp1"),
+ s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(),
metrics.metricName("test.occurences", "grp1"),
metrics.metricName("test.occurences.total", "grp1")));
- s.add(metrics.metricName("test.count", "grp1"), new Count());
+ s.add(metrics.metricName("test.count", "grp1"), new WindowedCount());
s.add(new Percentiles(100, -100, 100, BucketSizing.CONSTANT,
new Percentile(metrics.metricName("test.median",
"grp1"), 50.0),
new
Percentile(metrics.metricName("test.perc99_9", "grp1"), 99.9)));
Sensor s2 = metrics.sensor("test.sensor2");
- s2.add(metrics.metricName("s2.total", "grp1"), new Total());
+ s2.add(metrics.metricName("s2.total", "grp1"), new CumulativeSum());
s2.record(5.0);
int sum = 0;
@@ -162,15 +162,15 @@ public class MetricsTest {
@Test
public void testHierarchicalSensors() {
Sensor parent1 = metrics.sensor("test.parent1");
- parent1.add(metrics.metricName("test.parent1.count", "grp1"), new
Count());
+ parent1.add(metrics.metricName("test.parent1.count", "grp1"), new
WindowedCount());
Sensor parent2 = metrics.sensor("test.parent2");
- parent2.add(metrics.metricName("test.parent2.count", "grp1"), new
Count());
+ parent2.add(metrics.metricName("test.parent2.count", "grp1"), new
WindowedCount());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
- child1.add(metrics.metricName("test.child1.count", "grp1"), new
Count());
+ child1.add(metrics.metricName("test.child1.count", "grp1"), new
WindowedCount());
Sensor child2 = metrics.sensor("test.child2", parent1);
- child2.add(metrics.metricName("test.child2.count", "grp1"), new
Count());
+ child2.add(metrics.metricName("test.child2.count", "grp1"), new
WindowedCount());
Sensor grandchild = metrics.sensor("test.grandchild", child1);
- grandchild.add(metrics.metricName("test.grandchild.count", "grp1"),
new Count());
+ grandchild.add(metrics.metricName("test.grandchild.count", "grp1"),
new WindowedCount());
/* increment each sensor one time */
parent1.record();
@@ -222,15 +222,15 @@ public class MetricsTest {
public void testRemoveSensor() {
int size = metrics.metrics().size();
Sensor parent1 = metrics.sensor("test.parent1");
- parent1.add(metrics.metricName("test.parent1.count", "grp1"), new
Count());
+ parent1.add(metrics.metricName("test.parent1.count", "grp1"), new
WindowedCount());
Sensor parent2 = metrics.sensor("test.parent2");
- parent2.add(metrics.metricName("test.parent2.count", "grp1"), new
Count());
+ parent2.add(metrics.metricName("test.parent2.count", "grp1"), new
WindowedCount());
Sensor child1 = metrics.sensor("test.child1", parent1, parent2);
- child1.add(metrics.metricName("test.child1.count", "grp1"), new
Count());
+ child1.add(metrics.metricName("test.child1.count", "grp1"), new
WindowedCount());
Sensor child2 = metrics.sensor("test.child2", parent2);
- child2.add(metrics.metricName("test.child2.count", "grp1"), new
Count());
+ child2.add(metrics.metricName("test.child2.count", "grp1"), new
WindowedCount());
Sensor grandChild1 = metrics.sensor("test.gchild2", child2);
- grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new
Count());
+ grandChild1.add(metrics.metricName("test.gchild2.count", "grp1"), new
WindowedCount());
Sensor sensor = metrics.getSensor("test.parent1");
assertNotNull(sensor);
@@ -268,10 +268,10 @@ public class MetricsTest {
@Test
public void testRemoveInactiveMetrics() {
Sensor s1 = metrics.sensor("test.s1", null, 1);
- s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
+ s1.add(metrics.metricName("test.s1.count", "grp1"), new
WindowedCount());
Sensor s2 = metrics.sensor("test.s2", null, 3);
- s2.add(metrics.metricName("test.s2.count", "grp1"), new Count());
+ s2.add(metrics.metricName("test.s2.count", "grp1"), new
WindowedCount());
Metrics.ExpireSensorTask purger = metrics.new ExpireSensorTask();
purger.run();
@@ -309,7 +309,7 @@ public class MetricsTest {
// After purging, it should be possible to recreate a metric
s1 = metrics.sensor("test.s1", null, 1);
- s1.add(metrics.metricName("test.s1.count", "grp1"), new Count());
+ s1.add(metrics.metricName("test.s1.count", "grp1"), new
WindowedCount());
assertNotNull("Sensor test.s1 must be present",
metrics.getSensor("test.s1"));
assertNotNull("MetricName test.s1.count must be present",
metrics.metrics().get(metrics.metricName("test.s1.count",
"grp1")));
@@ -318,8 +318,8 @@ public class MetricsTest {
@Test
public void testRemoveMetric() {
int size = metrics.metrics().size();
- metrics.addMetric(metrics.metricName("test1", "grp1"), new Count());
- metrics.addMetric(metrics.metricName("test2", "grp1"), new Count());
+ metrics.addMetric(metrics.metricName("test1", "grp1"), new
WindowedCount());
+ metrics.addMetric(metrics.metricName("test2", "grp1"), new
WindowedCount());
assertNotNull(metrics.removeMetric(metrics.metricName("test1",
"grp1")));
assertNull(metrics.metrics().get(metrics.metricName("test1", "grp1")));
@@ -333,7 +333,7 @@ public class MetricsTest {
@Test
public void testEventWindowing() {
- Count count = new Count();
+ WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().eventWindow(1).samples(2);
count.record(config, 1.0, time.milliseconds());
count.record(config, 1.0, time.milliseconds());
@@ -344,7 +344,7 @@ public class MetricsTest {
@Test
public void testTimeWindowing() {
- Count count = new Count();
+ WindowedCount count = new WindowedCount();
MetricConfig config = new MetricConfig().timeWindow(1,
TimeUnit.MILLISECONDS).samples(2);
count.record(config, 1.0, time.milliseconds());
time.sleep(1);
@@ -397,8 +397,8 @@ public class MetricsTest {
*/
@Test
public void testSampledStatReturnsInitialValueWhenNoValuesExist() {
- Count count = new Count();
- Rate.SampledTotal sampledTotal = new Rate.SampledTotal();
+ WindowedCount count = new WindowedCount();
+ WindowedSum sampledTotal = new WindowedSum();
long windowMs = 100;
int samples = 2;
MetricConfig config = new MetricConfig().timeWindow(windowMs,
TimeUnit.MILLISECONDS).samples(samples);
@@ -415,14 +415,14 @@ public class MetricsTest {
@Test(expected = IllegalArgumentException.class)
public void testDuplicateMetricName() {
metrics.sensor("test").add(metrics.metricName("test", "grp1"), new
Avg());
- metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new
Total());
+ metrics.sensor("test2").add(metrics.metricName("test", "grp1"), new
CumulativeSum());
}
@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
- sensor.add(metrics.metricName("test1.total", "grp1"), new Total(), new
MetricConfig().quota(Quota.upperBound(5.0)));
- sensor.add(metrics.metricName("test2.total", "grp1"), new Total(), new
MetricConfig().quota(Quota.lowerBound(0.0)));
+ sensor.add(metrics.metricName("test1.total", "grp1"), new
CumulativeSum(), new MetricConfig().quota(Quota.upperBound(5.0)));
+ sensor.add(metrics.metricName("test2.total", "grp1"), new
CumulativeSum(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.record(5.0);
try {
sensor.record(1.0);
@@ -503,7 +503,7 @@ public class MetricsTest {
MetricName countRateMetricName = metrics.metricName("test.count.rate",
"grp1");
MetricName countTotalMetricName =
metrics.metricName("test.count.total", "grp1");
s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
- s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName,
countTotalMetricName));
+ s.add(new Meter(TimeUnit.SECONDS, new WindowedCount(),
countRateMetricName, countTotalMetricName));
KafkaMetric totalMetric = metrics.metrics().get(totalMetricName);
KafkaMetric countTotalMetric =
metrics.metrics().get(countTotalMetricName);
@@ -825,10 +825,10 @@ public class MetricsTest {
sensor.add(metrics.metricName("test.metric.avg", "avg",
tags), new Avg());
break;
case TOTAL:
- sensor.add(metrics.metricName("test.metric.total",
"total", tags), new Total());
+ sensor.add(metrics.metricName("test.metric.total",
"total", tags), new CumulativeSum());
break;
case COUNT:
- sensor.add(metrics.metricName("test.metric.count",
"count", tags), new Count());
+ sensor.add(metrics.metricName("test.metric.count",
"count", tags), new WindowedCount());
break;
case MAX:
sensor.add(metrics.metricName("test.metric.max", "max",
tags), new Max());
@@ -843,7 +843,7 @@ public class MetricsTest {
sensor.add(metrics.metricName("test.metric.simpleRate",
"simpleRate", tags), new SimpleRate());
break;
case SUM:
- sensor.add(metrics.metricName("test.metric.sum", "sum",
tags), new Sum());
+ sensor.add(metrics.metricName("test.metric.sum", "sum",
tags), new WindowedSum());
break;
case VALUE:
sensor.add(metrics.metricName("test.metric.value",
"value", tags), new Value());
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
index 8e3dfeb..fc7cfe2 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
@@ -128,7 +128,7 @@ public class SensorTest {
}
// note that adding a different metric with the same name is also a
no-op
- assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"),
new Sum()));
+ assertTrue(sensor.add(metrics.metricName("test-metric", "test-group"),
new WindowedSum()));
// so after all this, we still just have the original metric registered
assertEquals(1, sensor.metrics().size());
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
index 8204771..27198ea 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
@@ -44,7 +44,7 @@ public class MeterTest {
assertEquals(rateMetricName, rate.name());
assertEquals(totalMetricName, total.name());
Rate rateStat = (Rate) rate.stat();
- Total totalStat = (Total) total.stat();
+ CumulativeSum totalStat = (CumulativeSum) total.stat();
MetricConfig config = new MetricConfig();
double nextValue = 0.0;
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 379db2f..93cbf6d 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -21,11 +21,11 @@ import java.util.Arrays;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Percentile;
import org.apache.kafka.common.metrics.stats.Percentiles;
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
public class MetricsBench {
@@ -37,7 +37,7 @@ public class MetricsBench {
Sensor child = metrics.sensor("child", parent);
for (Sensor sensor : Arrays.asList(parent, child)) {
sensor.add(metrics.metricName(sensor.name() + ".avg", "grp1"),
new Avg());
- sensor.add(metrics.metricName(sensor.name() + ".count",
"grp1"), new Count());
+ sensor.add(metrics.metricName(sensor.name() + ".count",
"grp1"), new WindowedCount());
sensor.add(metrics.metricName(sensor.name() + ".max", "grp1"),
new Max());
sensor.add(new Percentiles(1024,
0.0,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f848a18..fd90dd1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -24,8 +24,8 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Frequencies;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@@ -863,13 +863,13 @@ public class Worker {
connectorStartupResults.add(connectorStartupResultFrequencies);
connectorStartupAttempts =
metricGroup.sensor("connector-startup-attempts");
-
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal),
new Total());
+
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal),
new CumulativeSum());
connectorStartupSuccesses =
metricGroup.sensor("connector-startup-successes");
-
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal),
new Total());
+
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal),
new CumulativeSum());
connectorStartupFailures =
metricGroup.sensor("connector-startup-failures");
-
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal),
new Total());
+
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal),
new CumulativeSum());
MetricName taskFailurePct =
metricGroup.metricName(registry.taskStartupFailurePercentage);
MetricName taskSuccessPct =
metricGroup.metricName(registry.taskStartupSuccessPercentage);
@@ -878,13 +878,13 @@ public class Worker {
taskStartupResults.add(taskStartupResultFrequencies);
taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
-
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal),
new Total());
+
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal),
new CumulativeSum());
taskStartupSuccesses =
metricGroup.sensor("task-startup-successes");
-
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal),
new Total());
+
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal),
new CumulativeSum());
taskStartupFailures = metricGroup.sensor("task-startup-failures");
-
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal),
new Total());
+
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal),
new CumulativeSum());
}
void close() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index f21c500..395c93e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.SchemaAndValue;
@@ -705,11 +705,11 @@ class WorkerSinkTask extends WorkerTask {
sinkRecordRead = metricGroup.sensor("sink-record-read");
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadRate), new
Rate());
-
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new
Total());
+
sinkRecordRead.add(metricGroup.metricName(registry.sinkRecordReadTotal), new
CumulativeSum());
sinkRecordSend = metricGroup.sensor("sink-record-send");
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendRate), new
Rate());
-
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new
Total());
+
sinkRecordSend.add(metricGroup.metricName(registry.sinkRecordSendTotal), new
CumulativeSum());
sinkRecordActiveCount =
metricGroup.sensor("sink-record-active-count");
sinkRecordActiveCount.add(metricGroup.metricName(registry.sinkRecordActiveCount),
new Value());
@@ -724,11 +724,11 @@ class WorkerSinkTask extends WorkerTask {
offsetCompletion = metricGroup.sensor("offset-commit-completion");
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate),
new Rate());
-
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal),
new Total());
+
offsetCompletion.add(metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal),
new CumulativeSum());
offsetCompletionSkip =
metricGroup.sensor("offset-commit-completion-skip");
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate),
new Rate());
-
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal),
new Total());
+
offsetCompletionSkip.add(metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal),
new CumulativeSum());
putBatchTime = metricGroup.sensor("put-batch-time");
putBatchTime.add(metricGroup.metricName(registry.sinkRecordPutBatchTimeMax),
new Max());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6e94c6f..6e1152b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -24,9 +24,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
@@ -591,11 +591,11 @@ class WorkerSourceTask extends WorkerTask {
sourceRecordPoll = metricGroup.sensor("source-record-poll");
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollRate), new
Rate());
-
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal),
new Total());
+
sourceRecordPoll.add(metricGroup.metricName(registry.sourceRecordPollTotal),
new CumulativeSum());
sourceRecordWrite = metricGroup.sensor("source-record-write");
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteRate),
new Rate());
-
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal),
new Total());
+
sourceRecordWrite.add(metricGroup.metricName(registry.sourceRecordWriteTotal),
new CumulativeSum());
pollTime = metricGroup.sensor("poll-batch-time");
pollTime.add(metricGroup.metricName(registry.sourceRecordPollBatchTimeMax), new
Max());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index b1a41c0..86caeeb 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -1483,7 +1483,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
});
rebalanceCompletedCounts =
metricGroup.sensor("completed-rebalance-count");
-
rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal),
new Total());
+
rebalanceCompletedCounts.add(metricGroup.metricName(registry.rebalanceCompletedTotal),
new CumulativeSum());
rebalanceTime = metricGroup.sensor("rebalance-time");
rebalanceTime.add(metricGroup.metricName(registry.rebalanceTimeMax), new Max());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
index c589012..0deecd1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/ErrorHandlingMetrics.java
@@ -17,7 +17,7 @@
package org.apache.kafka.connect.runtime.errors;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
@@ -62,25 +62,25 @@ public class ErrorHandlingMetrics {
metricGroup.close();
recordProcessingFailures = metricGroup.sensor("total-record-failures");
-
recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures),
new Total());
+
recordProcessingFailures.add(metricGroup.metricName(registry.recordProcessingFailures),
new CumulativeSum());
recordProcessingErrors = metricGroup.sensor("total-record-errors");
-
recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors),
new Total());
+
recordProcessingErrors.add(metricGroup.metricName(registry.recordProcessingErrors),
new CumulativeSum());
recordsSkipped = metricGroup.sensor("total-records-skipped");
- recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped),
new Total());
+ recordsSkipped.add(metricGroup.metricName(registry.recordsSkipped),
new CumulativeSum());
retries = metricGroup.sensor("total-retries");
- retries.add(metricGroup.metricName(registry.retries), new Total());
+ retries.add(metricGroup.metricName(registry.retries), new
CumulativeSum());
errorsLogged = metricGroup.sensor("total-errors-logged");
- errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new
Total());
+ errorsLogged.add(metricGroup.metricName(registry.errorsLogged), new
CumulativeSum());
dlqProduceRequests =
metricGroup.sensor("deadletterqueue-produce-requests");
-
dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new
Total());
+
dlqProduceRequests.add(metricGroup.metricName(registry.dlqProduceRequests), new
CumulativeSum());
dlqProduceFailures =
metricGroup.sensor("deadletterqueue-produce-failures");
-
dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new
Total());
+
dlqProduceFailures.add(metricGroup.metricName(registry.dlqProduceFailures), new
CumulativeSum());
metricGroup.addValueMetric(registry.lastErrorTimestamp, now ->
lastErrorTime);
}
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 619d260..69f02ae 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -39,8 +39,7 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.{KafkaException, Reconfigurable}
import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.Meter
-import org.apache.kafka.common.metrics.stats.Total
+import org.apache.kafka.common.metrics.stats.{CumulativeSum, Meter}
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders,
KafkaChannel, ListenerName, ListenerReconfigurable, Selectable, Send, Selector
=> KSelector}
import org.apache.kafka.common.protocol.ApiKeys
@@ -712,7 +711,7 @@ private[kafka] class Processor(val id: Int,
Map(NetworkProcessorMetricTag -> id.toString)
)
- val expiredConnectionsKilledCount = new Total()
+ val expiredConnectionsKilledCount = new CumulativeSum()
private val expiredConnectionsKilledCountMetricName =
metrics.metricName("expired-connections-killed-count", "socket-server-metrics",
metricTags)
metrics.addMetric(expiredConnectionsKilledCountMetricName,
expiredConnectionsKilledCount)
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 959144c..5451b5c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -27,7 +27,7 @@ import kafka.utils.{Logging, ShutdownableThread}
import org.apache.kafka.common.{Cluster, MetricName}
import org.apache.kafka.common.metrics._
import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaType}
@@ -179,7 +179,7 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
private val delayQueueSensor = metrics.sensor(quotaType + "-delayQueue")
delayQueueSensor.add(metrics.metricName("queue-size",
quotaType.toString,
- "Tracks the size of the delay queue"), new Total())
+ "Tracks the size of the delay queue"), new CumulativeSum())
start() // Use start method to keep spotbugs happy
private def start() {
throttledChannelReaper.start()
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 4ecaeb4..038b8ac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -19,10 +19,10 @@ package org.apache.kafka.streams.kstream.internals.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Sum;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -106,7 +106,7 @@ public class Sensors {
"The average number of occurrence of suppression-emit
operation per second.",
tags
),
- new Rate(TimeUnit.SECONDS, new Sum())
+ new Rate(TimeUnit.SECONDS, new WindowedSum())
);
sensor.add(
new MetricName(
@@ -115,7 +115,7 @@ public class Sensors {
"The total number of occurrence of suppression-emit
operations.",
tags
),
- new Total()
+ new CumulativeSum()
);
return sensor;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 210412b..836330f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -28,9 +28,10 @@ import
org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -43,7 +44,6 @@ import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -112,7 +112,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
);
taskCommitTimeSensor.add(
new MetricName("commit-rate", group, "The average number of
occurrence of commit operation per second.", tagMap),
- new Rate(TimeUnit.SECONDS, new Count())
+ new Rate(TimeUnit.SECONDS, new WindowedCount())
);
taskCommitTimeSensor.add(
new MetricName("commit-total", group, "The total number of
occurrence of commit operations.", tagMap),
@@ -123,7 +123,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
taskEnforcedProcessSensor = metrics.taskLevelSensor(taskName,
"enforced-processing", Sensor.RecordingLevel.DEBUG, parent);
taskEnforcedProcessSensor.add(
new MetricName("enforced-processing-rate", group, "The
average number of occurrence of enforced-processing operation per second.",
tagMap),
- new Rate(TimeUnit.SECONDS, new Count())
+ new Rate(TimeUnit.SECONDS, new WindowedCount())
);
taskEnforcedProcessSensor.add(
new MetricName("enforced-processing-total", group, "The
total number of occurrence of enforced-processing operations.", tagMap),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 46b5669..b6bfcc5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -22,9 +22,10 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.streams.StreamsMetrics;
import java.util.Arrays;
@@ -445,7 +446,7 @@ public class StreamsMetricsImpl implements StreamsMetrics {
descriptionOfRate,
tags
),
- new Rate(TimeUnit.SECONDS, new Count())
+ new Rate(TimeUnit.SECONDS, new WindowedCount())
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index a7da2cb..47dd61b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -33,7 +33,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
@@ -215,7 +215,7 @@ public class RecordCollectorTest {
final Sensor sensor = metrics.sensor("skipped-records");
final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister();
final MetricName metricName = new MetricName("name", "group",
"description", Collections.emptyMap());
- sensor.add(metricName, new Sum());
+ sensor.add(metricName, new WindowedSum());
final RecordCollector collector = new RecordCollectorImpl(
"test",
logContext,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 0400128..2faa078 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -692,7 +692,7 @@ public class StandbyTaskTest {
private MetricName setupCloseTaskMetric() {
final MetricName metricName = new MetricName("name", "group",
"description", Collections.emptyMap());
final Sensor sensor = streamsMetrics.threadLevelSensor("task-closed",
Sensor.RecordingLevel.INFO);
- sensor.add(metricName, new Total());
+ sensor.add(metricName, new CumulativeSum());
return metricName;
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 48c038c..2b1428f 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -32,9 +32,9 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -289,12 +289,12 @@ public class TopologyTestDriver implements Closeable {
threadLevelGroup,
"The average per-second number
of skipped records",
streamsMetrics.tagMap()),
- new Rate(TimeUnit.SECONDS, new Count()));
+ new Rate(TimeUnit.SECONDS, new
WindowedCount()));
skippedRecordsSensor.add(new MetricName("skipped-records-total",
threadLevelGroup,
"The total number of skipped
records",
streamsMetrics.tagMap()),
- new Total());
+ new CumulativeSum());
final ThreadCache cache = new ThreadCache(
new LogContext("topology-test-driver "),
Math.max(0,
streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),