This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 185107f STORM-3793 add metric to track backpressure status for tasks
(#3411)
185107f is described below
commit 185107f478c75ec9a78b90d036c9bcd639152494
Author: agresch <[email protected]>
AuthorDate: Wed Sep 22 10:24:23 2021 -0500
STORM-3793 add metric to track backpressure status for tasks (#3411)
* STORM-3793 add metric to track backpressure status for tasks
---
docs/Metrics.md | 4 +++
.../storm/daemon/worker/BackPressureTracker.java | 32 +++++++++++++++++-----
.../apache/storm/daemon/worker/WorkerState.java | 5 ++--
.../daemon/worker/BackPressureTrackerTest.java | 23 ++++++++++++----
4 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 11e5f27..5467638 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -199,6 +199,10 @@ This metric records how much time a spout was idle because
more tuples than `top
This metric records how much time a spout was idle because back-pressure
indicated that downstream queues in the topology were too full. This is the
total time in milliseconds, not the average amount of time and is not
sub-sampled. This is similar to skipped-throttle-ms in Storm 1.x.
+##### `__backpressure-last-overflow-count`
+
+This metric indicates the overflow count last time BP status was sent, with a
minimum value of 1 if a task has backpressure on.
+
##### `skipped-inactive-ms`
This metric records how much time a spout was idle because the topology was
deactivated. This is the total time in milliseconds, not the average amount of
time and is not sub-sampled.
diff --git
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 3c590e5..e191870 100644
---
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -18,18 +18,17 @@
package org.apache.storm.daemon.worker;
+import com.codahale.metrics.Gauge;
import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-
import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
import org.apache.storm.utils.JCQueue;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +40,14 @@ public class BackPressureTracker {
private final Map<Integer, BackpressureState> tasks;
private final String workerId;
- public BackPressureTracker(String workerId, Map<Integer, JCQueue>
localTasksToQueues) {
+ public BackPressureTracker(String workerId, Map<Integer, JCQueue>
localTasksToQueues,
+ StormMetricRegistry metricRegistry,
Map<Integer, String> taskToComponent) {
this.workerId = workerId;
this.tasks = localTasksToQueues.entrySet().stream()
.collect(Collectors.toMap(
entry -> entry.getKey(),
- entry -> new BackpressureState(entry.getValue())));
+ entry -> new BackpressureState(entry.getValue(),
entry.getKey(),
+ taskToComponent.get(entry.getKey()), metricRegistry)));
}
public BackpressureState getBackpressureState(Integer taskId) {
@@ -108,7 +109,6 @@ public class BackPressureTracker {
state.lastOverflowCount = value;
}
-
public static class BackpressureState {
private final JCQueue queue;
@@ -118,8 +118,26 @@ public class BackPressureTracker {
private int lastOverflowCount = 0;
- BackpressureState(JCQueue queue) {
+ BackpressureState(JCQueue queue, Integer taskId, String componentId,
StormMetricRegistry metricRegistry) {
this.queue = queue;
+
+ // System bolt is not a part of backpressure.
+ if (taskId >= 0) {
+ if (componentId == null) {
+ throw new RuntimeException("Missing componentId for task "
+ taskId);
+ }
+
+ Gauge<Integer> bpOverflowCount = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ if (backpressure.get()) {
+ return Math.max(1, lastOverflowCount);
+ }
+ return 0;
+ }
+ };
+ metricRegistry.gauge("__backpressure-last-overflow-count",
bpOverflowCount, componentId, taskId);
+ }
}
@Override
diff --git
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f7aa451..660e79f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -232,7 +232,8 @@ public class WorkerState {
}
int maxTaskId = getMaxTaskId(componentToSortedTasks);
this.workerTransfer = new WorkerTransfer(this, topologyConf,
maxTaskId);
- this.bpTracker = new BackPressureTracker(workerId,
taskToExecutorQueue);
+
+ this.bpTracker = new BackPressureTracker(workerId,
taskToExecutorQueue, metricRegistry, taskToComponent);
this.deserializedWorkerHooks = deserializeWorkerHooks();
LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId,
port);
IConnectionCallback cb = new
DeserializingConnectionCallback(topologyConf,
@@ -584,7 +585,6 @@ public class WorkerState {
receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
bpTracker.setLastOverflowCount(bpState, currOverflowCount);
} else {
-
if (currOverflowCount -
bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) {
// resend BP status, in case prev notification was missed
or reordered
BackPressureStatus bpStatus = bpTracker.getCurrStatus();
@@ -593,6 +593,7 @@ public class WorkerState {
LOG.debug("Re-sent BackPressure Status. OverflowCount =
{}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
}
}
+
if (!queue.tryPublishToOverflow(tuple)) {
dropMessage(tuple, queue);
}
diff --git
a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
index f642c54..04450d7 100644
---
a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
+++
b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.metrics2.StormMetricRegistry;
import
org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.storm.utils.JCQueue;
import org.junit.Test;
@@ -40,7 +41,9 @@ public class BackPressureTrackerTest {
int taskIdNoBackPressure = 1;
JCQueue noBackPressureQueue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
- Collections.singletonMap(taskIdNoBackPressure,
noBackPressureQueue));
+ Collections.singletonMap(taskIdNoBackPressure,
noBackPressureQueue),
+ new StormMetricRegistry(),
+ Collections.singletonMap(taskIdNoBackPressure,
"testComponent"));
BackPressureStatus status = tracker.getCurrStatus();
@@ -57,7 +60,11 @@ public class BackPressureTrackerTest {
JCQueue backPressureQueue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
ImmutableMap.of(
taskIdNoBackPressure, noBackPressureQueue,
- taskIdBackPressure, backPressureQueue));
+ taskIdBackPressure, backPressureQueue), new StormMetricRegistry(),
+ ImmutableMap.of(
+ taskIdNoBackPressure, "NoBackPressureComponent",
+ taskIdBackPressure, "BackPressureComponent")
+ );
BackpressureState state =
tracker.getBackpressureState(taskIdBackPressure);
boolean backpressureChanged = tracker.recordBackPressure(state);
@@ -74,7 +81,8 @@ public class BackPressureTrackerTest {
int taskId = 1;
JCQueue queue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
ImmutableMap.of(
- taskId, queue));
+ taskId, queue), new StormMetricRegistry(),
+ ImmutableMap.of(taskId, "component-1"));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
@@ -92,7 +100,8 @@ public class BackPressureTrackerTest {
JCQueue queue = mock(JCQueue.class);
when(queue.isEmptyOverflow()).thenReturn(true);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
ImmutableMap.of(
- taskId, queue));
+ taskId, queue), new StormMetricRegistry(),
+ ImmutableMap.of(taskId, "component-1"));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
@@ -110,7 +119,8 @@ public class BackPressureTrackerTest {
JCQueue queue = mock(JCQueue.class);
when(queue.isEmptyOverflow()).thenReturn(false);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
ImmutableMap.of(
- taskId, queue));
+ taskId, queue), new StormMetricRegistry(),
+ ImmutableMap.of(taskId, "component-1"));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
@@ -128,7 +138,8 @@ public class BackPressureTrackerTest {
int overflow = 5;
JCQueue queue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
ImmutableMap.of(
- taskId, queue));
+ taskId, queue), new StormMetricRegistry(),
+ ImmutableMap.of(taskId, "component-1"));
BackpressureState state = tracker.getBackpressureState(taskId);
tracker.recordBackPressure(state);
tracker.setLastOverflowCount(state, overflow);