Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2338#discussion_r140519416
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---
    @@ -423,50 +424,63 @@ public void supervisorHeartbeat(String supervisorId, 
SupervisorInfo info) {
         }
     
         /**
    -     * if znode exists and to be not on?, delete; if exists and on?, do 
nothing; if not exists and to be on?, create; if not exists and not on?, do 
nothing;
    -     * 
    -     * @param stormId
    -     * @param node
    -     * @param port
    -     * @param on
    +     * If znode exists and timestamp is 0, delete;
    +     * if exists and timestamp is larger than 0, update the timestamp;
    +     * if not exists and timestamp is larger than 0, create the znode and 
set the timestamp;
    +     * if not exists and timestamp is 0, do nothing.
    +     * @param stormId The topology Id
    +     * @param node The node id
    +     * @param port The port number
    +     * @param timestamp The backpressure timestamp. 0 means turning off 
the worker backpressure
          */
         @Override
    -    public void workerBackpressure(String stormId, String node, Long port, 
boolean on) {
    +    public void workerBackpressure(String stormId, String node, Long port, 
long timestamp) {
             String path = ClusterUtils.backpressurePath(stormId, node, port);
             boolean existed = stateStorage.node_exists(path, false);
             if (existed) {
    -            if (on == false)
    +            if (timestamp == 0) {
                     stateStorage.delete_node(path);
    -
    +            } else {
    +                byte[] data = 
ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
    +                stateStorage.set_data(path, data, acls);
    +            }
             } else {
    -            if (on == true) {
    -                stateStorage.set_ephemeral_node(path, null, acls);
    +            if (timestamp > 0) {
    +                byte[] data = 
ByteBuffer.allocate(Long.BYTES).putLong(timestamp).array();
    +                stateStorage.set_ephemeral_node(path, data, acls);
                 }
             }
         }
     
         /**
          * Check whether a topology is in throttle-on status or not:
          * if the backpresure/storm-id dir is not empty, this topology has 
throttle-on, otherwise throttle-off.
    -     * 
    -     * @param stormId
    -     * @param callback
    -     * @return
    +     * But if the backpresure/storm-id dir is not empty and has not been 
updated for more than timeoutMs, we treat it as throttle-off.
    +     * This will prevent the spouts from getting stuck indefinitely if 
something wrong happens.
    +     * @param stormId The topology Id
    +     * @param timeoutMs How long until the backpressure znode is invalid.
    +     * @param callback The callback function
    +     * @return True is backpresure/storm-id dir is not empty and at least 
one of the backpressure znodes has not timed out; false otherwise.
          */
         @Override
    -    public boolean topologyBackpressure(String stormId, Runnable callback) 
{
    +    public boolean topologyBackpressure(String stormId, long timeoutMs, 
Runnable callback) {
             if (callback != null) {
                 backPressureCallback.put(stormId, callback);
             }
             String path = ClusterUtils.backpressureStormRoot(stormId);
    -        List<String> childrens = null;
    +        long mostRecentTimestamp = 0;
             if(stateStorage.node_exists(path, false)) {
    -            childrens = stateStorage.get_children(path, callback != null);
    -        } else {
    -            childrens = new ArrayList<>();
    -        }
    -        return childrens.size() > 0;
    -
    +            List<String> children = stateStorage.get_children(path, 
callback != null);
    +            mostRecentTimestamp = children.stream()
    +                    .map(childPath -> 
stateStorage.get_data(ClusterUtils.backpressurePath(stormId, childPath), false))
    +                    .filter(data -> data != null)
    +                    .mapToLong(data -> ByteBuffer.wrap(data).getLong())
    +                    .max()
    +                    .orElse(0);
    +        }
    +        boolean ret =  ((System.currentTimeMillis() - mostRecentTimestamp) 
< timeoutMs);
    --- End diff --
    
    nit extra space after =


---

Reply via email to