This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 185107f  STORM-3793 add metric to track backpressure status for tasks 
(#3411)
185107f is described below

commit 185107f478c75ec9a78b90d036c9bcd639152494
Author: agresch <[email protected]>
AuthorDate: Wed Sep 22 10:24:23 2021 -0500

    STORM-3793 add metric to track backpressure status for tasks (#3411)
    
    * STORM-3793 add metric to track backpressure status for tasks
---
 docs/Metrics.md                                    |  4 +++
 .../storm/daemon/worker/BackPressureTracker.java   | 32 +++++++++++++++++-----
 .../apache/storm/daemon/worker/WorkerState.java    |  5 ++--
 .../daemon/worker/BackPressureTrackerTest.java     | 23 ++++++++++++----
 4 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/docs/Metrics.md b/docs/Metrics.md
index 11e5f27..5467638 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -199,6 +199,10 @@ This metric records how much time a spout was idle because 
more tuples than `top
 
 This metric records how much time a spout was idle because back-pressure 
indicated that downstream queues in the topology were too full.  This is the 
total time in milliseconds, not the average amount of time and is not 
sub-sampled. This is similar to skipped-throttle-ms in Storm 1.x.
 
+##### `__backpressure-last-overflow-count`
+
+This metric indicates the overflow count last time BP status was sent, with a 
minimum value of 1 if a task has backpressure on.
+
 ##### `skipped-inactive-ms`
 
 This metric records how much time a spout was idle because the topology was 
deactivated.  This is the total time in milliseconds, not the average amount of 
time and is not sub-sampled.
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 3c590e5..e191870 100644
--- 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -18,18 +18,17 @@
 
 package org.apache.storm.daemon.worker;
 
+import com.codahale.metrics.Gauge;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-
 import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.storm.utils.JCQueue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,12 +40,14 @@ public class BackPressureTracker {
     private final Map<Integer, BackpressureState> tasks;
     private final String workerId;
 
-    public BackPressureTracker(String workerId, Map<Integer, JCQueue> 
localTasksToQueues) {
+    public BackPressureTracker(String workerId, Map<Integer, JCQueue> 
localTasksToQueues,
+                               StormMetricRegistry metricRegistry, 
Map<Integer, String> taskToComponent) {
         this.workerId = workerId;
         this.tasks = localTasksToQueues.entrySet().stream()
             .collect(Collectors.toMap(
                 entry -> entry.getKey(),
-                entry -> new BackpressureState(entry.getValue())));
+                entry -> new BackpressureState(entry.getValue(), 
entry.getKey(),
+                        taskToComponent.get(entry.getKey()), metricRegistry)));
     }
 
     public BackpressureState getBackpressureState(Integer taskId) {
@@ -108,7 +109,6 @@ public class BackPressureTracker {
         state.lastOverflowCount = value;
     }
 
-
     
     public static class BackpressureState {
         private final JCQueue queue;
@@ -118,8 +118,26 @@ public class BackPressureTracker {
         private int lastOverflowCount = 0;
 
 
-        BackpressureState(JCQueue queue) {
+        BackpressureState(JCQueue queue, Integer taskId, String componentId, 
StormMetricRegistry metricRegistry) {
             this.queue = queue;
+
+            // System bolt is not a part of backpressure.
+            if (taskId >= 0) {
+                if (componentId == null) {
+                    throw new RuntimeException("Missing componentId for task " 
+ taskId);
+                }
+
+                Gauge<Integer> bpOverflowCount = new Gauge<Integer>() {
+                    @Override
+                    public Integer getValue() {
+                        if (backpressure.get()) {
+                            return Math.max(1, lastOverflowCount);
+                        }
+                        return 0;
+                    }
+                };
+                metricRegistry.gauge("__backpressure-last-overflow-count", 
bpOverflowCount, componentId, taskId);
+            }
         }
 
         @Override
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f7aa451..660e79f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -232,7 +232,8 @@ public class WorkerState {
         }
         int maxTaskId = getMaxTaskId(componentToSortedTasks);
         this.workerTransfer = new WorkerTransfer(this, topologyConf, 
maxTaskId);
-        this.bpTracker = new BackPressureTracker(workerId, 
taskToExecutorQueue);
+
+        this.bpTracker = new BackPressureTracker(workerId, 
taskToExecutorQueue, metricRegistry, taskToComponent);
         this.deserializedWorkerHooks = deserializeWorkerHooks();
         LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, 
port);
         IConnectionCallback cb = new 
DeserializingConnectionCallback(topologyConf,
@@ -584,7 +585,6 @@ public class WorkerState {
                 receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
                 bpTracker.setLastOverflowCount(bpState, currOverflowCount);
             } else {
-
                 if (currOverflowCount - 
bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) {
                     // resend BP status, in case prev notification was missed 
or reordered
                     BackPressureStatus bpStatus = bpTracker.getCurrStatus();
@@ -593,6 +593,7 @@ public class WorkerState {
                     LOG.debug("Re-sent BackPressure Status. OverflowCount = 
{}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
                 }
             }
+
             if (!queue.tryPublishToOverflow(tuple)) {
                 dropMessage(tuple, queue);
             }
diff --git 
a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
 
b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
index f642c54..04450d7 100644
--- 
a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
+++ 
b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 
 import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
 import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import 
org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
 import org.apache.storm.utils.JCQueue;
 import org.junit.Test;
@@ -40,7 +41,9 @@ public class BackPressureTrackerTest {
         int taskIdNoBackPressure = 1;
         JCQueue noBackPressureQueue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
-                Collections.singletonMap(taskIdNoBackPressure, 
noBackPressureQueue));
+                Collections.singletonMap(taskIdNoBackPressure, 
noBackPressureQueue),
+                new StormMetricRegistry(),
+                Collections.singletonMap(taskIdNoBackPressure, 
"testComponent"));
 
         BackPressureStatus status = tracker.getCurrStatus();
 
@@ -57,7 +60,11 @@ public class BackPressureTrackerTest {
         JCQueue backPressureQueue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, 
ImmutableMap.of(
             taskIdNoBackPressure, noBackPressureQueue,
-            taskIdBackPressure, backPressureQueue));
+            taskIdBackPressure, backPressureQueue), new StormMetricRegistry(),
+                ImmutableMap.of(
+                        taskIdNoBackPressure, "NoBackPressureComponent",
+                        taskIdBackPressure, "BackPressureComponent")
+        );
 
         BackpressureState state = 
tracker.getBackpressureState(taskIdBackPressure);
         boolean backpressureChanged = tracker.recordBackPressure(state);
@@ -74,7 +81,8 @@ public class BackPressureTrackerTest {
         int taskId = 1;
         JCQueue queue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, 
ImmutableMap.of(
-            taskId, queue));
+            taskId, queue), new StormMetricRegistry(),
+                ImmutableMap.of(taskId, "component-1"));
         BackpressureState state = tracker.getBackpressureState(taskId);
         tracker.recordBackPressure(state);
 
@@ -92,7 +100,8 @@ public class BackPressureTrackerTest {
         JCQueue queue = mock(JCQueue.class);
         when(queue.isEmptyOverflow()).thenReturn(true);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, 
ImmutableMap.of(
-            taskId, queue));
+            taskId, queue), new StormMetricRegistry(),
+                ImmutableMap.of(taskId, "component-1"));
         BackpressureState state = tracker.getBackpressureState(taskId);
         tracker.recordBackPressure(state);
 
@@ -110,7 +119,8 @@ public class BackPressureTrackerTest {
         JCQueue queue = mock(JCQueue.class);
         when(queue.isEmptyOverflow()).thenReturn(false);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, 
ImmutableMap.of(
-            taskId, queue));
+            taskId, queue), new StormMetricRegistry(),
+                ImmutableMap.of(taskId, "component-1"));
         BackpressureState state = tracker.getBackpressureState(taskId);
         tracker.recordBackPressure(state);
 
@@ -128,7 +138,8 @@ public class BackPressureTrackerTest {
         int overflow = 5;
         JCQueue queue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, 
ImmutableMap.of(
-            taskId, queue));
+            taskId, queue), new StormMetricRegistry(),
+                ImmutableMap.of(taskId, "component-1"));
         BackpressureState state = tracker.getBackpressureState(taskId);
         tracker.recordBackPressure(state);
         tracker.setLastOverflowCount(state, overflow);

Reply via email to