This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new fa22282 KAFKA-9175: Update MirrorMaker 2 topic/partition metrics
(#7688)
fa22282 is described below
commit fa2228292dd5ce120f4a87d9d4250f2d8964958a
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Nov 13 20:36:25 2019 +0000
KAFKA-9175: Update MirrorMaker 2 topic/partition metrics (#7688)
Reviewers: Manikumar Reddy <[email protected]>, Ryanne Dolan
<[email protected]>
Co-authored-by: Mickael Maison <[email protected]>
Co-authored-by: Edoardo Comar <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorMetrics.java | 32 ++++++++++++----------
1 file changed, 17 insertions(+), 15 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
index 51ddafc..ea9d2f7 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMetrics.java
@@ -20,12 +20,11 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.metrics.stats.Value;
-import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
@@ -48,6 +47,9 @@ class MirrorMetrics implements AutoCloseable {
private static final MetricNameTemplate RECORD_COUNT = new
MetricNameTemplate(
"record-count", SOURCE_CONNECTOR_GROUP,
"Number of source records replicated to the target cluster.",
PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_RATE = new
MetricNameTemplate(
+ "record-rate", SOURCE_CONNECTOR_GROUP,
+ "Average number of source records replicated to the target cluster
per second.", PARTITION_TAGS);
private static final MetricNameTemplate RECORD_AGE = new
MetricNameTemplate(
"record-age-ms", SOURCE_CONNECTOR_GROUP,
"The age of incoming source records when replicated to the target
cluster.", PARTITION_TAGS);
@@ -60,6 +62,9 @@ class MirrorMetrics implements AutoCloseable {
private static final MetricNameTemplate RECORD_AGE_AVG = new
MetricNameTemplate(
"record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
"The average age of incoming source records when replicated to the
target cluster.", PARTITION_TAGS);
+ private static final MetricNameTemplate BYTE_COUNT = new
MetricNameTemplate(
+ "byte-count", SOURCE_CONNECTOR_GROUP,
+ "Number of bytes replicated to the target cluster.",
PARTITION_TAGS);
private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
"byte-rate", SOURCE_CONNECTOR_GROUP,
"Average number of bytes replicated per second.", PARTITION_TAGS);
@@ -134,7 +139,7 @@ class MirrorMetrics implements AutoCloseable {
}
void recordBytes(TopicPartition topicPartition, long bytes) {
- partitionMetrics.get(topicPartition).byteRateSensor.record((double)
bytes);
+ partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
}
void checkpointLatency(TopicPartition topicPartition, String group, long
millis) {
@@ -152,39 +157,36 @@ class MirrorMetrics implements AutoCloseable {
private class PartitionMetrics {
private final Sensor recordSensor;
- private final Sensor byteRateSensor;
+ private final Sensor byteSensor;
private final Sensor recordAgeSensor;
private final Sensor replicationLatencySensor;
- private final TopicPartition topicPartition;
-
+
PartitionMetrics(TopicPartition topicPartition) {
- this.topicPartition = topicPartition;
+ String prefix = topicPartition.topic() + "-" +
topicPartition.partition() + "-";
Map<String, String> tags = new LinkedHashMap<>();
tags.put("target", target);
tags.put("topic", topicPartition.topic());
tags.put("partition",
Integer.toString(topicPartition.partition()));
- recordSensor = metrics.sensor("record-count");
- recordSensor.add(metrics.metricInstance(RECORD_COUNT, tags), new
WindowedCount());
+ recordSensor = metrics.sensor(prefix + "records-sent");
+ recordSensor.add(new Meter(metrics.metricInstance(RECORD_RATE,
tags), metrics.metricInstance(RECORD_COUNT, tags)));
- byteRateSensor = metrics.sensor("byte-rate");
- byteRateSensor.add(metrics.metricInstance(BYTE_RATE, tags), new
Rate());
+ byteSensor = metrics.sensor(prefix + "bytes-sent");
+ byteSensor.add(new Meter(metrics.metricInstance(BYTE_RATE, tags),
metrics.metricInstance(BYTE_COUNT, tags)));
- recordAgeSensor = metrics.sensor("record-age");
+ recordAgeSensor = metrics.sensor(prefix + "record-age");
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new
Value());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags),
new Max());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags),
new Min());
recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags),
new Avg());
- replicationLatencySensor = metrics.sensor("replication-latency");
+ replicationLatencySensor = metrics.sensor(prefix +
"replication-latency");
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags),
new Value());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX,
tags), new Max());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN,
tags), new Min());
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG,
tags), new Avg());
}
-
-
}
private class GroupMetrics {