Repository: storm Updated Branches: refs/heads/master d2e221ae8 -> db510ae58
[STORM-2744] add in restart timeout for backpressure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bebc09cf Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bebc09cf Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bebc09cf Branch: refs/heads/master Commit: bebc09cfb744f760bf14395513145165cd214e77 Parents: 50d55a9 Author: Ethan Li <[email protected]> Authored: Wed Sep 20 09:33:29 2017 -0500 Committer: Ethan Li <[email protected]> Committed: Thu Sep 21 18:36:03 2017 +0000 ---------------------------------------------------------------------- .../org/apache/storm/cluster/ClusterUtils.java | 10 +++++ .../storm/cluster/IStormClusterState.java | 2 +- .../storm/cluster/StormClusterStateImpl.java | 33 ++++++++------ .../org/apache/storm/daemon/worker/Worker.java | 45 ++++++++++++-------- .../apache/storm/daemon/worker/WorkerState.java | 4 +- 5 files changed, 60 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java index 43b0574..0dc3e20 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java @@ -144,6 +144,16 @@ public class ClusterUtils { return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port; } + /** + * Get the backpressure znode full path. + * @param stormId The topology id + * @param shortPath A string in the form of "node-port" + * @return The backpressure znode path + */ + public static String backpressurePath(String stormId, String shortPath) { + return backpressureStormRoot(stormId) + ZK_SEPERATOR + shortPath; + } + public static String errorStormRoot(String stormId) { return ERRORS_SUBTREE + ZK_SEPERATOR + stormId; } http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java index 704c9e5..3ece640 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -99,7 +99,7 @@ public interface IStormClusterState { public void supervisorHeartbeat(String supervisorId, SupervisorInfo info); - public void workerBackpressure(String stormId, String node, Long port, boolean on); + public void workerBackpressure(String stormId, String node, Long port, long timestamp); public boolean topologyBackpressure(String stormId, Runnable callback); http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 343b0e6..19ee169 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -32,6 +32,7 @@ import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -423,24 +424,26 @@ public class StormClusterStateImpl implements IStormClusterState { } /** - * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing; + * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing; + * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing; * * @param stormId * @param node * @param port - * @param on + * @param timestamp */ @Override - public void workerBackpressure(String stormId, String node, Long port, boolean on) { + public void workerBackpressure(String stormId, String node, Long port, long timestamp) { String path = ClusterUtils.backpressurePath(stormId, node, port); boolean existed = stateStorage.node_exists(path, false); if (existed) { - if (on == false) + if (timestamp == 0) { stateStorage.delete_node(path); - + } } else { - if (on == true) { - stateStorage.set_ephemeral_node(path, null, acls); + if (timestamp > 0) { + byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array(); + stateStorage.set_ephemeral_node(path, data, acls); } } } @@ -448,7 +451,8 @@ public class StormClusterStateImpl implements IStormClusterState { /** * Check whether a topology is in throttle-on status or not: * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off. - * + * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off. + * This will prevent the spouts from getting stuck indefinitely if something wrong happens. * @param stormId * @param callback * @return @@ -459,14 +463,15 @@ public class StormClusterStateImpl implements IStormClusterState { backPressureCallback.put(stormId, callback); } String path = ClusterUtils.backpressureStormRoot(stormId); - List<String> childrens = null; + long mostRecentTimestamp = 0; if(stateStorage.node_exists(path, false)) { - childrens = stateStorage.get_children(path, callback != null); - } else { - childrens = new ArrayList<>(); + List<String> children = stateStorage.get_children(path, callback != null); + mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false)) + .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0); } - return childrens.size() > 0; - + boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < 30000); + LOG.debug("topology backpressure is {}", ret ? "on" : "off"); + return ret; } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 9e7bd0b..d878fc8 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -406,25 +406,36 @@ public class Worker implements Shutdownable, DaemonCommon { final List<IRunningExecutor> executors = executorsAtom.get(); return new WorkerBackpressureCallback() { @Override public void onEvent(Object obj) { - String topologyId = workerState.topologyId; - String assignmentId = workerState.assignmentId; - int port = workerState.port; - IStormClusterState stormClusterState = workerState.stormClusterState; - boolean prevBackpressureFlag = workerState.backpressure.get(); - boolean currBackpressureFlag = prevBackpressureFlag; if (null != executors) { - currBackpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream() - .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get()); - } + String topologyId = workerState.topologyId; + String assignmentId = workerState.assignmentId; + int port = workerState.port; + IStormClusterState stormClusterState = workerState.stormClusterState; + long prevBackpressureTimestamp = workerState.backpressure.get(); + long currTimestamp = System.currentTimeMillis(); + long currBackpressureTimestamp = 0; + // the backpressure flag is true if at least one of the disruptor queues has throttle-on + boolean backpressureFlag = workerState.transferQueue.getThrottleOn() || (executors.stream() + .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get()); + + if (backpressureFlag) { + // update the backpressure timestamp every 15 seconds + if ((currTimestamp - prevBackpressureTimestamp) > 15000) { + currBackpressureTimestamp = currTimestamp; + } else { + currBackpressureTimestamp = prevBackpressureTimestamp; + } + } - if (currBackpressureFlag != prevBackpressureFlag) { - try { - LOG.debug("worker backpressure flag changing from {} to {}", prevBackpressureFlag, currBackpressureFlag); - stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureFlag); - // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception - workerState.backpressure.set(currBackpressureFlag); - } catch (Exception ex) { - LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex); + if (currBackpressureTimestamp != prevBackpressureTimestamp) { + try { + LOG.debug("worker backpressure timestamp changing from {} to {}", prevBackpressureTimestamp, currBackpressureTimestamp); + stormClusterState.workerBackpressure(topologyId, assignmentId, (long) port, currBackpressureTimestamp); + // doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception + workerState.backpressure.set(currBackpressureTimestamp); + } catch (Exception ex) { + LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", ex); + } } } } http://git-wip-us.apache.org/repos/asf/storm/blob/bebc09cf/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 33ea579..f2c09b1 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -211,8 +211,8 @@ public class WorkerState { final Map<String, Object> userSharedResources; final LoadMapping loadMapping; final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions; - // Whether this worker is going slow - final AtomicBoolean backpressure = new AtomicBoolean(false); + // Whether this worker is going slow. 0 indicates the backpressure is off + final AtomicLong backpressure = new AtomicLong(0); // If the transfer queue is backed-up final AtomicBoolean transferBackpressure = new AtomicBoolean(false); // a trigger for synchronization with executors
