Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2502#discussion_r167293987 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) { } } - public void refreshThrottle() { - boolean backpressure = stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, this::refreshThrottle); - this.throttleOn.set(backpressure); - } - - private static double getQueueLoad(DisruptorQueue q) { - DisruptorQueue.QueueMetrics qMetrics = q.getMetrics(); + private static double getQueueLoad(JCQueue q) { + JCQueue.QueueMetrics qMetrics = q.getMetrics(); return ((double) qMetrics.population()) / qMetrics.capacity(); } public void refreshLoad(List<IRunningExecutor> execs) { - Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(taskIds)); + Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds)); Long now = System.currentTimeMillis(); Map<Integer, Double> localLoad = new HashMap<>(); - for (IRunningExecutor exec: execs) { + for (IRunningExecutor exec : execs) { double receiveLoad = getQueueLoad(exec.getReceiveQueue()); - double sendLoad = getQueueLoad(exec.getSendQueue()); - localLoad.put(exec.getExecutorId().get(0).intValue(), Math.max(receiveLoad, sendLoad)); + localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad); } Map<Integer, Load> remoteLoad = new HashMap<>(); cachedNodeToPortSocket.get().values().stream().forEach(conn -> remoteLoad.putAll(conn.getLoad(remoteTasks))); loadMapping.setLocal(localLoad); loadMapping.setRemote(remoteLoad); - if (now > nextUpdate.get()) { + if (now > nextLoadUpdate.get()) { receiver.sendLoadMetrics(localLoad); - nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); + nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS); } } + // checks if the tasks which had back pressure are now free again. if so, sends an update to other workers + public void refreshBackPressureStatus() { + LOG.debug("Checking for change in Backpressure status on worker's tasks"); + boolean bpSituationChanged = bpTracker.refreshBpTaskList(); + if (bpSituationChanged) { + BackPressureStatus bpStatus = bpTracker.getCurrStatus(); + receiver.sendBackPressureStatus(bpStatus); + } + } + + /** * we will wait all connections to be ready and then activate the spout/bolt * when the worker bootup. */ public void activateWorkerWhenAllConnectionsReady() { int delaySecs = 0; int recurSecs = 1; - refreshActiveTimer.schedule(delaySecs, new Runnable() { - @Override public void run() { + refreshActiveTimer.schedule(delaySecs, + () -> { if (areAllConnectionsReady()) { LOG.info("All connections are ready for worker {}:{} with id {}", assignmentId, port, workerId); isWorkerActive.set(Boolean.TRUE); } else { refreshActiveTimer.schedule(recurSecs, () -> activateWorkerWhenAllConnectionsReady(), false, 0); } } - }); + ); } public void registerCallbacks() { LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port); receiver.registerRecv(new DeserializingConnectionCallback(topologyConf, getWorkerTopologyContext(), - this::transferLocal)); + this::transferLocalBatch)); + // Send curr BackPressure status to new clients + receiver.registerNewConnectionResponse( + () -> { + BackPressureStatus bpStatus = bpTracker.getCurrStatus(); + LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus); + return bpStatus; + } + ); } - public void transferLocal(List<AddressedTuple> tupleBatch) { - Map<Integer, List<AddressedTuple>> grouped = new HashMap<>(); - for (AddressedTuple tuple : tupleBatch) { - Integer executor = taskToShortExecutor.get(tuple.dest); - if (null == executor) { - LOG.warn("Received invalid messages for unknown tasks. Dropping... "); - continue; - } - List<AddressedTuple> current = grouped.get(executor); - if (null == current) { - current = new ArrayList<>(); - grouped.put(executor, current); - } - current.add(tuple); - } + /* Not a Blocking call. If cannot emit, will add 'tuple' to pendingEmits and return 'false'. 'pendingEmits' can be null */ + public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) { + return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer); + } - for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) { - DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey()); - if (null != queue) { - queue.publish(entry.getValue()); - } else { - LOG.warn("Received invalid messages for unknown tasks. Dropping... "); - } - } + public void flushRemotes() throws InterruptedException { + workerTransfer.flushRemotes(); } - public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) { - if (trySerializeLocal) { - assertCanSerialize(serializer, tupleBatch); - } - List<AddressedTuple> local = new ArrayList<>(); - Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>(); - for (AddressedTuple addressedTuple : tupleBatch) { - int destTask = addressedTuple.getDest(); - if (taskIds.contains(destTask)) { - // Local task - local.add(addressedTuple); - } else { - // Using java objects directly to avoid performance issues in java code - if (! remoteMap.containsKey(destTask)) { - remoteMap.put(destTask, new ArrayList<>()); + public boolean tryFlushRemotes() { + return workerTransfer.tryFlushRemotes(); + } + + // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure, + // informs other workers about back pressure situation. Runs in the NettyWorker thread. + private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) { + int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent + + for (int i = 0; i < tupleBatch.size(); i++) { + AddressedTuple tuple = tupleBatch.get(i); + JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest); + + // 1- try adding to main queue if its overflow is not empty + if (queue.isEmptyOverflow()) { + if (queue.tryPublish(tuple)) { + continue; } - remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple()))); } - } - if (!local.isEmpty()) { - transferLocal(local); - } - if (!remoteMap.isEmpty()) { - transferQueue.publish(remoteMap); - } - } + // 2- BP detected (i.e MainQ is full). So try adding to overflow + int currOverflowCount = queue.getOverflowCount(); + if (bpTracker.recordBackPressure(tuple.dest, queue)) { + receiver.sendBackPressureStatus(bpTracker.getCurrStatus()); + lastOverflowCount = currOverflowCount; + } else { - // TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues - public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) { - drainer.add(packets); - if (batchEnd) { - ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock(); - try { - readLock.lock(); - drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get()); - } finally { - readLock.unlock(); + if (currOverflowCount - lastOverflowCount > 10000) { + // resend BP status, in case prev notification was missed or reordered + BackPressureStatus bpStatus = bpTracker.getCurrStatus(); + receiver.sendBackPressureStatus(bpStatus); + lastOverflowCount = currOverflowCount; + LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id); + } + } + if (!queue.tryPublishToOverflow(tuple)) { + dropMessage(tuple, queue); } - drainer.clear(); } } + private void dropMessage(AddressedTuple tuple, JCQueue queue) { + ++dropCount; + queue.recordMsgDrop(); + LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", queue.getName(), queue.getOverflowCount(), dropCount, tuple.toString()); --- End diff -- nit: `tuple.toString()` is not needed, as the logging will do it for you, but since this is a warn log that should almost never be called it really does not matter.
---