Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2433#discussion_r152897084
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
 ---
    @@ -0,0 +1,119 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.supervisor.timer;
    +
    +import org.apache.storm.daemon.supervisor.Supervisor;
    +import org.apache.storm.daemon.supervisor.SupervisorUtils;
    +import org.apache.storm.generated.LSWorkerHeartbeat;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeats;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.NimbusClient;
    +import org.apache.thrift.TException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Runnable reporting local worker reported heartbeats to master, 
supervisor should take care the of the heartbeats
    + * integrity for the master heartbeats recovery, a non-null node id means 
that the heartbeats are full,
    + * and master can go on to check and wait others nodes when doing a 
heartbeats recovery.
    + */
    +public class ReportWorkerHeartbeats implements Runnable {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(ReportWorkerHeartbeats.class);
    +
    +    private Supervisor supervisor;
    +    private Map<String, Object> conf;
    +
    +    public ReportWorkerHeartbeats(Map<String, Object> conf, Supervisor 
supervisor) {
    +        this.conf = conf;
    +        this.supervisor = supervisor;
    +    }
    +
    +    @Override
    +    public void run() {
    +        SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = 
getAndResetWorkerHeartbeats();
    +        reportWorkerHeartbeats(supervisorWorkerHeartbeats);
    +    }
    +
    +    private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
    +        Map<String, LSWorkerHeartbeat> localHeartbeats;
    +        try {
    +            localHeartbeats = 
SupervisorUtils.readWorkerHeartbeats(this.conf);
    +            return getSupervisorWorkerHeartbeatsFromLocal(localHeartbeats);
    +        } catch (Exception e) {
    +            LOG.error("Read local worker heartbeats error, skipping 
heartbeats for this round, msg:{}", e.getMessage());
    +            return null;
    +        }
    +    }
    +
    +    private SupervisorWorkerHeartbeats 
getSupervisorWorkerHeartbeatsFromLocal(Map<String, LSWorkerHeartbeat> 
localHeartbeats) {
    +        SupervisorWorkerHeartbeats supervisorWorkerHeartbeats = new 
SupervisorWorkerHeartbeats();
    +
    +        List<SupervisorWorkerHeartbeat> heartbeatList = new ArrayList<>();
    +
    +        for (Map.Entry<String, LSWorkerHeartbeat> lsWorkerHeartbeatEntry : 
localHeartbeats.entrySet()) {
    --- End diff --
    
    We could simplify logic using stream:
    
    ```
    List<SupervisorWorkerHeartbeat> heartbeatList = 
localHeartbeats.values().stream()
    .filter(Objects::nonNull).map(hb -> {
        SupervisorWorkerHeartbeat supervisorWorkerHeartbeat = new 
SupervisorWorkerHeartbeat();
        
supervisorWorkerHeartbeat.set_storm_id(lsWorkerHeartbeat.get_topology_id());
        
supervisorWorkerHeartbeat.set_executors(lsWorkerHeartbeat.get_executors());
        
supervisorWorkerHeartbeat.set_time_secs(lsWorkerHeartbeat.get_time_secs());
        return supervisorWorkerHeartbeat;
    }).collect(toList());
    ```
    
    or at least use values() instead of entrySet() given that we don't use keys 
at all.


---

Reply via email to