Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2502#discussion_r167293987
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
    @@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
             }
         }
     
    -    public void refreshThrottle() {
    -        boolean backpressure = 
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, 
this::refreshThrottle);
    -        this.throttleOn.set(backpressure);
    -    }
    -
    -    private static double getQueueLoad(DisruptorQueue q) {
    -        DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
    +    private static double getQueueLoad(JCQueue q) {
    +        JCQueue.QueueMetrics qMetrics = q.getMetrics();
             return ((double) qMetrics.population()) / qMetrics.capacity();
         }
     
         public void refreshLoad(List<IRunningExecutor> execs) {
    -        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(taskIds));
    +        Set<Integer> remoteTasks = Sets.difference(new 
HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
             Long now = System.currentTimeMillis();
             Map<Integer, Double> localLoad = new HashMap<>();
    -        for (IRunningExecutor exec: execs) {
    +        for (IRunningExecutor exec : execs) {
                 double receiveLoad = getQueueLoad(exec.getReceiveQueue());
    -            double sendLoad = getQueueLoad(exec.getSendQueue());
    -            localLoad.put(exec.getExecutorId().get(0).intValue(), 
Math.max(receiveLoad, sendLoad));
    +            localLoad.put(exec.getExecutorId().get(0).intValue(), 
receiveLoad);
             }
     
             Map<Integer, Load> remoteLoad = new HashMap<>();
             cachedNodeToPortSocket.get().values().stream().forEach(conn -> 
remoteLoad.putAll(conn.getLoad(remoteTasks)));
             loadMapping.setLocal(localLoad);
             loadMapping.setRemote(remoteLoad);
     
    -        if (now > nextUpdate.get()) {
    +        if (now > nextLoadUpdate.get()) {
                 receiver.sendLoadMetrics(localLoad);
    -            nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
    +            nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
             }
         }
     
    +    // checks if the tasks which had back pressure are now free again. if 
so, sends an update to other workers
    +    public void refreshBackPressureStatus() {
    +        LOG.debug("Checking for change in Backpressure status on worker's 
tasks");
    +        boolean bpSituationChanged = bpTracker.refreshBpTaskList();
    +        if (bpSituationChanged) {
    +            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +            receiver.sendBackPressureStatus(bpStatus);
    +        }
    +    }
    +
    +
         /**
          * we will wait all connections to be ready and then activate the 
spout/bolt
          * when the worker bootup.
          */
         public void activateWorkerWhenAllConnectionsReady() {
             int delaySecs = 0;
             int recurSecs = 1;
    -        refreshActiveTimer.schedule(delaySecs, new Runnable() {
    -            @Override public void run() {
    +        refreshActiveTimer.schedule(delaySecs,
    +            () -> {
                     if (areAllConnectionsReady()) {
                         LOG.info("All connections are ready for worker {}:{} 
with id {}", assignmentId, port, workerId);
                         isWorkerActive.set(Boolean.TRUE);
                     } else {
                         refreshActiveTimer.schedule(recurSecs, () -> 
activateWorkerWhenAllConnectionsReady(), false, 0);
                     }
                 }
    -        });
    +        );
         }
     
         public void registerCallbacks() {
             LOG.info("Registering IConnectionCallbacks for {}:{}", 
assignmentId, port);
             receiver.registerRecv(new 
DeserializingConnectionCallback(topologyConf,
                 getWorkerTopologyContext(),
    -            this::transferLocal));
    +            this::transferLocalBatch));
    +        // Send curr BackPressure status to new clients
    +        receiver.registerNewConnectionResponse(
    +            () -> {
    +                BackPressureStatus bpStatus = bpTracker.getCurrStatus();
    +                LOG.info("Sending BackPressure status to new client. 
BPStatus: {}", bpStatus);
    +                return bpStatus;
    +            }
    +        );
         }
     
    -    public void transferLocal(List<AddressedTuple> tupleBatch) {
    -        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
    -        for (AddressedTuple tuple : tupleBatch) {
    -            Integer executor = taskToShortExecutor.get(tuple.dest);
    -            if (null == executor) {
    -                LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
    -                continue;
    -            }
    -            List<AddressedTuple> current = grouped.get(executor);
    -            if (null == current) {
    -                current = new ArrayList<>();
    -                grouped.put(executor, current);
    -            }
    -            current.add(tuple);
    -        }
    +    /* Not a Blocking call. If cannot emit, will add 'tuple' to 
pendingEmits and return 'false'. 'pendingEmits' can be null */
    +    public boolean tryTransferRemote(AddressedTuple tuple, 
Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
    +        return workerTransfer.tryTransferRemote(tuple, pendingEmits, 
serializer);
    +    }
     
    -        for (Map.Entry<Integer, List<AddressedTuple>> entry : 
grouped.entrySet()) {
    -            DisruptorQueue queue = 
shortExecutorReceiveQueueMap.get(entry.getKey());
    -            if (null != queue) {
    -                queue.publish(entry.getValue());
    -            } else {
    -                LOG.warn("Received invalid messages for unknown tasks. 
Dropping... ");
    -            }
    -        }
    +    public void flushRemotes() throws InterruptedException {
    +        workerTransfer.flushRemotes();
         }
     
    -    public void transfer(KryoTupleSerializer serializer, 
List<AddressedTuple> tupleBatch) {
    -        if (trySerializeLocal) {
    -            assertCanSerialize(serializer, tupleBatch);
    -        }
    -        List<AddressedTuple> local = new ArrayList<>();
    -        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
    -        for (AddressedTuple addressedTuple : tupleBatch) {
    -            int destTask = addressedTuple.getDest();
    -            if (taskIds.contains(destTask)) {
    -                // Local task
    -                local.add(addressedTuple);
    -            } else {
    -                // Using java objects directly to avoid performance issues 
in java code
    -                if (! remoteMap.containsKey(destTask)) {
    -                    remoteMap.put(destTask, new ArrayList<>());
    +    public boolean tryFlushRemotes() {
    +        return workerTransfer.tryFlushRemotes();
    +    }
    +
    +    // Receives msgs from remote workers and feeds them to local 
executors. If any receiving local executor is under Back Pressure,
    +    // informs other workers about back pressure situation. Runs in the 
NettyWorker thread.
    +    private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
    +        int lastOverflowCount = 0; // overflowQ size at the time the last 
BPStatus was sent
    +
    +        for (int i = 0; i < tupleBatch.size(); i++) {
    +            AddressedTuple tuple = tupleBatch.get(i);
    +            JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
    +
    +            // 1- try adding to main queue if its overflow is not empty
    +            if (queue.isEmptyOverflow()) {
    +                if (queue.tryPublish(tuple)) {
    +                    continue;
                     }
    -                remoteMap.get(destTask).add(new TaskMessage(destTask, 
serializer.serialize(addressedTuple.getTuple())));
                 }
    -        }
     
    -        if (!local.isEmpty()) {
    -            transferLocal(local);
    -        }
    -        if (!remoteMap.isEmpty()) {
    -            transferQueue.publish(remoteMap);
    -        }
    -    }
    +            // 2- BP detected (i.e MainQ is full). So try adding to 
overflow
    +            int currOverflowCount = queue.getOverflowCount();
    +            if (bpTracker.recordBackPressure(tuple.dest, queue)) {
    +                receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
    +                lastOverflowCount = currOverflowCount;
    +            } else {
     
    -    // TODO: consider having a max batch size besides what disruptor does 
automagically to prevent latency issues
    -    public void sendTuplesToRemoteWorker(HashMap<Integer, 
ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
    -        drainer.add(packets);
    -        if (batchEnd) {
    -            ReentrantReadWriteLock.ReadLock readLock = 
endpointSocketLock.readLock();
    -            try {
    -                readLock.lock();
    -                drainer.send(cachedTaskToNodePort.get(), 
cachedNodeToPortSocket.get());
    -            } finally {
    -                readLock.unlock();
    +                if (currOverflowCount - lastOverflowCount > 10000) {
    +                    // resend BP status, in case prev notification was 
missed or reordered
    +                    BackPressureStatus bpStatus = 
bpTracker.getCurrStatus();
    +                    receiver.sendBackPressureStatus(bpStatus);
    +                    lastOverflowCount = currOverflowCount;
    +                    LOG.debug("Re-sent BackPressure Status. OverflowCount 
= {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
    +                }
    +            }
    +            if (!queue.tryPublishToOverflow(tuple)) {
    +                dropMessage(tuple, queue);
                 }
    -            drainer.clear();
             }
         }
     
    +    private void dropMessage(AddressedTuple tuple, JCQueue queue) {
    +        ++dropCount;
    +        queue.recordMsgDrop();
    +        LOG.warn("Dropping message as overflow threshold has reached for Q 
= {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", 
queue.getName(), queue.getOverflowCount(), dropCount, tuple.toString());
    --- End diff --
    
    nit: `tuple.toString()` is not needed, as the logging will do it for you, 
but since this is a warn log that should almost never be called it really does 
not matter.


---

Reply via email to