[STORM-2744] Address review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/884eb61d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/884eb61d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/884eb61d Branch: refs/heads/master Commit: 884eb61ddab0c3da7ef587793b8543ba0f906451 Parents: bebc09c Author: Ethan Li <[email protected]> Authored: Fri Sep 22 09:38:12 2017 -0500 Committer: Ethan Li <[email protected]> Committed: Fri Sep 22 09:38:46 2017 -0500 ---------------------------------------------------------------------- conf/defaults.yaml | 2 + .../src/jvm/org/apache/storm/Config.java | 17 ++++++++ .../storm/cluster/IStormClusterState.java | 2 +- .../storm/cluster/StormClusterStateImpl.java | 41 ++++++++++++-------- .../org/apache/storm/daemon/worker/Worker.java | 17 ++++---- .../apache/storm/daemon/worker/WorkerState.java | 6 ++- 6 files changed, 60 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index c6ef390..16df356 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -190,6 +190,8 @@ task.backpressure.poll.secs: 30 topology.backpressure.enable: false backpressure.disruptor.high.watermark: 0.9 backpressure.disruptor.low.watermark: 0.4 +backpressure.znode.timeout.secs: 30 +backpressure.znode.update.freq.secs: 15 zmq.threads: 1 zmq.linger.millis: 5000 http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 5623698..7149631 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -129,6 +129,23 @@ public class Config extends HashMap<String, Object> { public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark"; /** + * How long until the backpressure znode is invalid. + * It's measured by the data (timestamp) of the znode, not the ctime (creation time) or mtime (modification time), etc. + * This must be larger than BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS. + */ + @isInteger + @isPositiveNumber + public static final String BACKPRESSURE_ZNODE_TIMEOUT_SECS = "backpressure.znode.timeout.secs"; + + /** + * How often will the data (timestamp) of backpressure znode be updated. + * But if the worker backpressure status (on/off) changes, the znode will be updated anyway. + */ + @isInteger + @isPositiveNumber + public static final String BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS = "backpressure.znode.update.freq.secs"; + + /** * A list of users that are allowed to interact with the topology. To use this set * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer */ http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java index 3ece640..674865b 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -101,7 +101,7 @@ public interface IStormClusterState { public void workerBackpressure(String stormId, String node, Long port, long timestamp); - public boolean topologyBackpressure(String stormId, Runnable callback); + public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback); public void setupBackpressure(String stormId); http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 19ee169..4d7e2a5 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -424,13 +424,14 @@ public class StormClusterStateImpl implements IStormClusterState { } /** - * if znode exists and timestamp is 0?, delete; if exists and timestamp is larger than 0?, do nothing; - * if not exists and timestamp is larger than 0?, create the node and set the timestamp; if not exists and timestamp is 0?, do nothing; - * - * @param stormId - * @param node - * @param port - * @param timestamp + * If znode exists and timestamp is 0, delete; + * if exists and timestamp is larger than 0, update the timestamp; + * if not exists and timestamp is larger than 0, create the znode and set the timestamp; + * if not exists and timestamp is 0, do nothing. + * @param stormId The topology Id + * @param node The node id + * @param port The port number + * @param timestamp The backpressure timestamp. 0 means turning off the worker backpressure */ @Override public void workerBackpressure(String stormId, String node, Long port, long timestamp) { @@ -439,6 +440,9 @@ public class StormClusterStateImpl implements IStormClusterState { if (existed) { if (timestamp == 0) { stateStorage.delete_node(path); + } else { + byte[] data = ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array(); + stateStorage.set_data(path, data, acls); } } else { if (timestamp > 0) { @@ -451,14 +455,15 @@ public class StormClusterStateImpl implements IStormClusterState { /** * Check whether a topology is in throttle-on status or not: * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off. - * But if the backpresure/storm-id dir is not empty and has not been updated for more than 30s, we treat it as throttle-off. + * But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off. * This will prevent the spouts from getting stuck indefinitely if something wrong happens. - * @param stormId - * @param callback - * @return + * @param stormId The topology Id + * @param timeoutMs How long until the backpressure znode is invalid. + * @param callback The callback function + * @return True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise. */ @Override - public boolean topologyBackpressure(String stormId, Runnable callback) { + public boolean topologyBackpressure(String stormId, long timeoutMs, Runnable callback) { if (callback != null) { backPressureCallback.put(stormId, callback); } @@ -466,10 +471,14 @@ public class StormClusterStateImpl implements IStormClusterState { long mostRecentTimestamp = 0; if(stateStorage.node_exists(path, false)) { List<String> children = stateStorage.get_children(path, callback != null); - mostRecentTimestamp = children.stream().map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false)) - .filter(data -> data != null).mapToLong(data -> ByteBuffer.wrap(data).getLong()).max().orElse(0); - } - boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < 30000); + mostRecentTimestamp = children.stream() + .map(childPath -> stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false)) + .filter(data -> data != null) + .mapToLong(data -> ByteBuffer.wrap(data).getLong()) + .max() + .orElse(0); + } + boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp) < timeoutMs); LOG.debug("topology backpressure is {}", ret ? "on" : "off"); return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index d878fc8..519e7ce 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -95,7 +95,8 @@ public class Worker implements Shutdownable, DaemonCommon { private AtomicReference<List<IRunningExecutor>> executorsAtom; private Thread transferThread; private WorkerBackpressureThread backpressureThread; - + // How long until the backpressure znode is invalid. + private long backpressureZnodeTimeoutMs; private AtomicReference<Credentials> credentialsAtom; private Subject subject; private Collection<IAutoCredentials> autoCreds; @@ -152,6 +153,7 @@ public class Worker implements Shutdownable, DaemonCommon { } autoCreds = AuthUtils.GetAutoCredentials(topologyConf); subject = AuthUtils.populateSubject(null, autoCreds, initCreds); + backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000; Subject.doAs(subject, new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { @@ -224,11 +226,11 @@ public class Worker implements Shutdownable, DaemonCommon { workerState.transferQueue .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK))); - WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(); + WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf); backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback); if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) { backpressureThread.start(); - stormClusterState.topologyBackpressure(topologyId, workerState::refreshThrottle); + stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle); int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS)); workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle); @@ -366,7 +368,7 @@ public class Worker implements Shutdownable, DaemonCommon { } public void checkThrottleChanged() { - boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, this::checkThrottleChanged); + boolean throttleOn = workerState.stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::checkThrottleChanged); workerState.throttleOn.set(throttleOn); } @@ -402,8 +404,9 @@ public class Worker implements Shutdownable, DaemonCommon { /** * make a handler that checks and updates worker's backpressure flag */ - private WorkerBackpressureCallback mkBackpressureHandler() { + private WorkerBackpressureCallback mkBackpressureHandler(Map<String, Object> topologyConf) { final List<IRunningExecutor> executors = executorsAtom.get(); + final long updateFreqMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_UPDATE_FREQ_SECS)) * 1000; return new WorkerBackpressureCallback() { @Override public void onEvent(Object obj) { if (null != executors) { @@ -419,8 +422,8 @@ public class Worker implements Shutdownable, DaemonCommon { .map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> (op1 || op2)).get()); if (backpressureFlag) { - // update the backpressure timestamp every 15 seconds - if ((currTimestamp - prevBackpressureTimestamp) > 15000) { + // update the backpressure timestamp every updateFreqMs ms + if ((currTimestamp - prevBackpressureTimestamp) > updateFreqMs) { currBackpressureTimestamp = currTimestamp; } else { currBackpressureTimestamp = prevBackpressureTimestamp; http://git-wip-us.apache.org/repos/asf/storm/blob/884eb61d/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index f2c09b1..d679ee8 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -213,6 +213,8 @@ public class WorkerState { final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions; // Whether this worker is going slow. 0 indicates the backpressure is off final AtomicLong backpressure = new AtomicLong(0); + // How long until the backpressure znode is invalid. + final long backpressureZnodeTimeoutMs; // If the transfer queue is backed-up final AtomicBoolean transferBackpressure = new AtomicBoolean(false); // a trigger for synchronization with executors @@ -298,6 +300,7 @@ public class WorkerState { } Collections.sort(taskIds); this.topologyConf = topologyConf; + this.backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000; this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf)); this.systemTopology = StormCommon.systemTopology(topologyConf, topology); this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf); @@ -333,6 +336,7 @@ public class WorkerState { LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE); } this.drainer = new TransferDrainer(); + } public void refreshConnections() { @@ -441,7 +445,7 @@ public class WorkerState { } public void refreshThrottle() { - boolean backpressure = stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle); + boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle); this.throttleOn.set(backpressure); }
