Repository: storm
Updated Branches:
  refs/heads/master d2e221ae8 -> db510ae58


[STORM-2744] add in restart timeout for backpressure


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bebc09cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bebc09cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bebc09cf

Branch: refs/heads/master
Commit: bebc09cfb744f760bf14395513145165cd214e77
Parents: 50d55a9
Author: Ethan Li <[email protected]>
Authored: Wed Sep 20 09:33:29 2017 -0500
Committer: Ethan Li <[email protected]>
Committed: Thu Sep 21 18:36:03 2017 +0000

----------------------------------------------------------------------
 .../org/apache/storm/cluster/ClusterUtils.java  | 10 +++++
 .../storm/cluster/IStormClusterState.java       |  2 +-
 .../storm/cluster/StormClusterStateImpl.java    | 33 ++++++++------
 .../org/apache/storm/daemon/worker/Worker.java  | 45 ++++++++++++--------
 .../apache/storm/daemon/worker/WorkerState.java |  4 +-
 5 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 43b0574..0dc3e20 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -144,6 +144,16 @@ public class ClusterUtils {
         return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + 
port;
     }
 
+    /**
+     * Get the backpressure znode full path.
+     * @param stormId The topology id
+     * @param shortPath A string in the form of "node-port"
+     * @return The backpressure znode path
+     */
+    public static String backpressurePath(String stormId, String shortPath) {
+        return backpressureStormRoot(stormId) + ZK_SEPERATOR + shortPath;
+    }
+
     public static String errorStormRoot(String stormId) {
         return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java 
b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 704c9e5..3ece640 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -99,7 +99,7 @@ public interface IStormClusterState {
 
     public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
 
-    public void workerBackpressure(String stormId, String node, Long port, 
boolean on);
+    public void workerBackpressure(String stormId, String node, Long port, 
long timestamp);
 
     public boolean topologyBackpressure(String stormId, Runnable callback);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 343b0e6..19ee169 100644
--- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -423,24 +424,26 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     /**
-     * if znode exists and to be not on?, delete; if exists and on?, do 
nothing; if not exists and to be on?, create; if not exists and not on?, do 
nothing;
+     * if znode exists and timestamp is 0?, delete; if exists and timestamp is 
larger than 0?, do nothing;
+     * if not exists and timestamp is larger than 0?, create the node and set 
the timestamp; if not exists and timestamp is 0?, do nothing;
      * 
      * @param stormId
      * @param node
      * @param port
-     * @param on
+     * @param timestamp
      */
     @Override
-    public void workerBackpressure(String stormId, String node, Long port, 
boolean on) {
+    public void workerBackpressure(String stormId, String node, Long port, 
long timestamp) {
         String path = ClusterUtils.backpressurePath(stormId, node, port);
         boolean existed = stateStorage.node_exists(path, false);
         if (existed) {
-            if (on == false)
+            if (timestamp == 0) {
                 stateStorage.delete_node(path);
-
+            }
         } else {
-            if (on == true) {
-                stateStorage.set_ephemeral_node(path, null, acls);
+            if (timestamp > 0) {
+                byte[] data = 
ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+                stateStorage.set_ephemeral_node(path, data, acls);
             }
         }
     }
@@ -448,7 +451,8 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     /**
      * Check whether a topology is in throttle-on status or not:
      * if the backpresure/storm-id dir is not empty, this topology has 
throttle-on, otherwise throttle-off.
-     * 
+     * But if the backpresure/storm-id dir is not empty and has not been 
updated for more than 30s, we treat it as throttle-off.
+     * This will prevent the spouts from getting stuck indefinitely if 
something wrong happens.
      * @param stormId
      * @param callback
      * @return
@@ -459,14 +463,15 @@ public class StormClusterStateImpl implements 
IStormClusterState {
             backPressureCallback.put(stormId, callback);
         }
         String path = ClusterUtils.backpressureStormRoot(stormId);
-        List<String> childrens = null;
+        long mostRecentTimestamp = 0;
         if(stateStorage.node_exists(path, false)) {
-            childrens = stateStorage.get_children(path, callback != null);
-        } else {
-            childrens = new ArrayList<>();
+            List<String> children = stateStorage.get_children(path, callback 
!= null);
+            mostRecentTimestamp = children.stream().map(childPath -> 
stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+                    .filter(data -> data != null).mapToLong(data -> 
ByteBuffer.wrap(data).getLong()).max().orElse(0);
         }
-        return childrens.size() > 0;
-
+        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) < 
30000);
+        LOG.debug("topology backpressure is {}", ret ? "on" : "off");
+        return ret;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 9e7bd0b..d878fc8 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -406,25 +406,36 @@ public class Worker implements Shutdownable, DaemonCommon 
{
         final List<IRunningExecutor> executors = executorsAtom.get();
         return new WorkerBackpressureCallback() {
             @Override public void onEvent(Object obj) {
-                String topologyId = workerState.topologyId;
-                String assignmentId = workerState.assignmentId;
-                int port = workerState.port;
-                IStormClusterState stormClusterState = 
workerState.stormClusterState;
-                boolean prevBackpressureFlag = workerState.backpressure.get();
-                boolean currBackpressureFlag = prevBackpressureFlag;
                 if (null != executors) {
-                    currBackpressureFlag = 
workerState.transferQueue.getThrottleOn() || (executors.stream()
-                        
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || 
op2)).get());
-                }
+                    String topologyId = workerState.topologyId;
+                    String assignmentId = workerState.assignmentId;
+                    int port = workerState.port;
+                    IStormClusterState stormClusterState = 
workerState.stormClusterState;
+                    long prevBackpressureTimestamp = 
workerState.backpressure.get();
+                    long currTimestamp = System.currentTimeMillis();
+                    long currBackpressureTimestamp = 0;
+                    // the backpressure flag is true if at least one of the 
disruptor queues has throttle-on
+                    boolean backpressureFlag = 
workerState.transferQueue.getThrottleOn() || (executors.stream()
+                            
.map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || 
op2)).get());
+
+                    if (backpressureFlag) {
+                        // update the backpressure timestamp every 15 seconds
+                        if ((currTimestamp - prevBackpressureTimestamp) > 
15000) {
+                            currBackpressureTimestamp = currTimestamp;
+                        } else {
+                            currBackpressureTimestamp = 
prevBackpressureTimestamp;
+                        }
+                    }
 
-                if (currBackpressureFlag != prevBackpressureFlag) {
-                    try {
-                        LOG.debug("worker backpressure flag changing from {} 
to {}", prevBackpressureFlag, currBackpressureFlag);
-                        stormClusterState.workerBackpressure(topologyId, 
assignmentId, (long) port, currBackpressureFlag);
-                        // doing the local reset after the zk update succeeds 
is very important to avoid a bad state upon zk exception
-                        workerState.backpressure.set(currBackpressureFlag);
-                    } catch (Exception ex) {
-                        LOG.error("workerBackpressure update failed when 
connecting to ZK ... will retry", ex);
+                    if (currBackpressureTimestamp != 
prevBackpressureTimestamp) {
+                        try {
+                            LOG.debug("worker backpressure timestamp changing 
from {} to {}", prevBackpressureTimestamp, currBackpressureTimestamp);
+                            stormClusterState.workerBackpressure(topologyId, 
assignmentId, (long) port, currBackpressureTimestamp);
+                            // doing the local reset after the zk update 
succeeds is very important to avoid a bad state upon zk exception
+                            
workerState.backpressure.set(currBackpressureTimestamp);
+                        } catch (Exception ex) {
+                            LOG.error("workerBackpressure update failed when 
connecting to ZK ... will retry", ex);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 33ea579..f2c09b1 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -211,8 +211,8 @@ public class WorkerState {
     final Map<String, Object> userSharedResources;
     final LoadMapping loadMapping;
     final AtomicReference<Map<String, VersionedData<Assignment>>> 
assignmentVersions;
-    // Whether this worker is going slow
-    final AtomicBoolean backpressure = new AtomicBoolean(false);
+    // Whether this worker is going slow. 0 indicates the backpressure is off
+    final AtomicLong backpressure = new AtomicLong(0);
     // If the transfer queue is backed-up
     final AtomicBoolean transferBackpressure = new AtomicBoolean(false);
     // a trigger for synchronization with executors

Reply via email to