Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2338#discussion_r140519416
--- Diff:
storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
@@ -423,50 +424,63 @@ public void supervisorHeartbeat(String supervisorId,
SupervisorInfo info) {
}
/**
- * if znode exists and to be not on?, delete; if exists and on?, do
nothing; if not exists and to be on?, create; if not exists and not on?, do
nothing;
- *
- * @param stormId
- * @param node
- * @param port
- * @param on
+ * If znode exists and timestamp is 0, delete;
+ * if exists and timestamp is larger than 0, update the timestamp;
+ * if not exists and timestamp is larger than 0, create the znode and
set the timestamp;
+ * if not exists and timestamp is 0, do nothing.
+ * @param stormId The topology Id
+ * @param node The node id
+ * @param port The port number
+ * @param timestamp The backpressure timestamp. 0 means turning off
the worker backpressure
*/
@Override
- public void workerBackpressure(String stormId, String node, Long port,
boolean on) {
+ public void workerBackpressure(String stormId, String node, Long port,
long timestamp) {
String path = ClusterUtils.backpressurePath(stormId, node, port);
boolean existed = stateStorage.node_exists(path, false);
if (existed) {
- if (on == false)
+ if (timestamp == 0) {
stateStorage.delete_node(path);
-
+ } else {
+ byte[] data =
ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+ stateStorage.set_data(path, data, acls);
+ }
} else {
- if (on == true) {
- stateStorage.set_ephemeral_node(path, null, acls);
+ if (timestamp > 0) {
+ byte[] data =
ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
+ stateStorage.set_ephemeral_node(path, data, acls);
}
}
}
/**
* Check whether a topology is in throttle-on status or not:
* if the backpresure/storm-id dir is not empty, this topology has
throttle-on, otherwise throttle-off.
- *
- * @param stormId
- * @param callback
- * @return
+ * But if the backpresure/storm-id dir is not empty and has not been
updated for more than timeoutMs, we treat it as throttle-off.
+ * This will prevent the spouts from getting stuck indefinitely if
something wrong happens.
+ * @param stormId The topology Id
+ * @param timeoutMs How long until the backpressure znode is invalid.
+ * @param callback The callback function
+ * @return True is backpresure/storm-id dir is not empty and at least
one of the backpressure znodes has not timed out; false otherwise.
*/
@Override
- public boolean topologyBackpressure(String stormId, Runnable callback)
{
+ public boolean topologyBackpressure(String stormId, long timeoutMs,
Runnable callback) {
if (callback != null) {
backPressureCallback.put(stormId, callback);
}
String path = ClusterUtils.backpressureStormRoot(stormId);
- List<String> childrens = null;
+ long mostRecentTimestamp = 0;
if(stateStorage.node_exists(path, false)) {
- childrens = stateStorage.get_children(path, callback != null);
- } else {
- childrens = new ArrayList<>();
- }
- return childrens.size() > 0;
-
+ List<String> children = stateStorage.get_children(path,
callback != null);
+ mostRecentTimestamp = children.stream()
+ .map(childPath ->
stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
+ .filter(data -> data != null)
+ .mapToLong(data -> ByteBuffer.wrap(data).getLong())
+ .max()
+ .orElse(0);
+ }
+ boolean ret = ((System.currentTimeMillis() - mostRecentTimestamp)
< timeoutMs);
--- End diff --
nit extra space after =
---