This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c5405bb emit maxLag/avgLag in KafkaSupervisor (#6587)
c5405bb is described below
commit c5405bb5929b117b31ad0f4c4ba8c6faf38beb63
Author: Mingming Qiu <[email protected]>
AuthorDate: Wed Nov 28 18:11:14 2018 +0800
emit maxLag/avgLag in KafkaSupervisor (#6587)
* emit maxLag/totalLag/avgLag in KafkaSupervisor
* modify ingest/kafka/totalLag to ingest/kafka/lag for backwards
compatibility
---
docs/content/operations/metrics.md | 2 ++
.../src/main/resources/defaultMetrics.json | 6 ++++++
.../indexing/kafka/supervisor/KafkaSupervisor.java | 22 ++++++++++++++++------
3 files changed, 24 insertions(+), 6 deletions(-)
diff --git a/docs/content/operations/metrics.md
b/docs/content/operations/metrics.md
index e327023..d99d3e5 100644
--- a/docs/content/operations/metrics.md
+++ b/docs/content/operations/metrics.md
@@ -149,6 +149,8 @@ emission period.|
|`ingest/sink/count`|Number of sinks not handoffed.|dataSource, taskId,
taskType.|1~3|
|`ingest/events/messageGap`|Time gap between the data time in event and
current system time.|dataSource, taskId, taskType.|Greater than 0, depends on
the time carried in event |
|`ingest/kafka/lag`|Applicable for Kafka Indexing Service. Total lag between
the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka
brokers across all partitions. Minimum emission period for this metric is a
minute.|dataSource.|Greater than 0, should not be a very high number |
+|`ingest/kafka/maxLag`|Applicable for Kafka Indexing Service. Max lag between
the offsets consumed by the Kafka indexing tasks and latest offsets in Kafka
brokers across all partitions. Minimum emission period for this metric is a
minute.|dataSource.|Greater than 0, should not be a very high number |
+|`ingest/kafka/avgLag`|Applicable for Kafka Indexing Service. Average lag
between the offsets consumed by the Kafka indexing tasks and latest offsets in
Kafka brokers across all partitions. Minimum emission period for this metric is
a minute.|dataSource.|Greater than 0, should not be a very high number |
Note: If the JVM does not support CPU time measurement for the current thread,
ingest/merge/cpu and ingest/persists/cpu will be 0.
diff --git
a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
index 553e3a2..e28639e 100644
--- a/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
+++ b/extensions-contrib/opentsdb-emitter/src/main/resources/defaultMetrics.json
@@ -99,6 +99,12 @@
"ingest/kafka/lag": [
"dataSource"
],
+ "ingest/kafka/maxLag": [
+ "dataSource"
+ ],
+ "ingest/kafka/avgLag": [
+ "dataSource"
+ ],
"task/run/time": [
"dataSource"
],
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 73fd54c..6992b8d 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -2349,14 +2349,24 @@ public class KafkaSupervisor implements Supervisor
);
}
- long lag = getLagPerPartition(highestCurrentOffsets)
- .values()
- .stream()
- .mapToLong(x -> Math.max(x, 0))
- .sum();
+ Map<Integer, Long> partitionLags =
getLagPerPartition(highestCurrentOffsets);
+ long maxLag = 0, totalLag = 0, avgLag;
+ for (long lag : partitionLags.values()) {
+ if (lag > maxLag) {
+ maxLag = lag;
+ }
+ totalLag += lag;
+ }
+ avgLag = partitionLags.size() == 0 ? 0 : totalLag /
partitionLags.size();
emitter.emit(
- ServiceMetricEvent.builder().setDimension("dataSource",
dataSource).build("ingest/kafka/lag", lag)
+ ServiceMetricEvent.builder().setDimension("dataSource",
dataSource).build("ingest/kafka/lag", totalLag)
+ );
+ emitter.emit(
+ ServiceMetricEvent.builder().setDimension("dataSource",
dataSource).build("ingest/kafka/maxLag", maxLag)
+ );
+ emitter.emit(
+ ServiceMetricEvent.builder().setDimension("dataSource",
dataSource).build("ingest/kafka/avgLag", avgLag)
);
}
catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]