Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2502#discussion_r167293987
--- Diff:
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -456,137 +450,135 @@ public void refreshStormActive(Runnable callback) {
}
}
- public void refreshThrottle() {
- boolean backpressure =
stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs,
this::refreshThrottle);
- this.throttleOn.set(backpressure);
- }
-
- private static double getQueueLoad(DisruptorQueue q) {
- DisruptorQueue.QueueMetrics qMetrics = q.getMetrics();
+ private static double getQueueLoad(JCQueue q) {
+ JCQueue.QueueMetrics qMetrics = q.getMetrics();
return ((double) qMetrics.population()) / qMetrics.capacity();
}
public void refreshLoad(List<IRunningExecutor> execs) {
- Set<Integer> remoteTasks = Sets.difference(new
HashSet<>(outboundTasks), new HashSet<>(taskIds));
+ Set<Integer> remoteTasks = Sets.difference(new
HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
Long now = System.currentTimeMillis();
Map<Integer, Double> localLoad = new HashMap<>();
- for (IRunningExecutor exec: execs) {
+ for (IRunningExecutor exec : execs) {
double receiveLoad = getQueueLoad(exec.getReceiveQueue());
- double sendLoad = getQueueLoad(exec.getSendQueue());
- localLoad.put(exec.getExecutorId().get(0).intValue(),
Math.max(receiveLoad, sendLoad));
+ localLoad.put(exec.getExecutorId().get(0).intValue(),
receiveLoad);
}
Map<Integer, Load> remoteLoad = new HashMap<>();
cachedNodeToPortSocket.get().values().stream().forEach(conn ->
remoteLoad.putAll(conn.getLoad(remoteTasks)));
loadMapping.setLocal(localLoad);
loadMapping.setRemote(remoteLoad);
- if (now > nextUpdate.get()) {
+ if (now > nextLoadUpdate.get()) {
receiver.sendLoadMetrics(localLoad);
- nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+ nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
}
}
+ // checks if the tasks which had back pressure are now free again. if
so, sends an update to other workers
+ public void refreshBackPressureStatus() {
+ LOG.debug("Checking for change in Backpressure status on worker's
tasks");
+ boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+ if (bpSituationChanged) {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ receiver.sendBackPressureStatus(bpStatus);
+ }
+ }
+
+
/**
* we will wait all connections to be ready and then activate the
spout/bolt
* when the worker bootup.
*/
public void activateWorkerWhenAllConnectionsReady() {
int delaySecs = 0;
int recurSecs = 1;
- refreshActiveTimer.schedule(delaySecs, new Runnable() {
- @Override public void run() {
+ refreshActiveTimer.schedule(delaySecs,
+ () -> {
if (areAllConnectionsReady()) {
LOG.info("All connections are ready for worker {}:{}
with id {}", assignmentId, port, workerId);
isWorkerActive.set(Boolean.TRUE);
} else {
refreshActiveTimer.schedule(recurSecs, () ->
activateWorkerWhenAllConnectionsReady(), false, 0);
}
}
- });
+ );
}
public void registerCallbacks() {
LOG.info("Registering IConnectionCallbacks for {}:{}",
assignmentId, port);
receiver.registerRecv(new
DeserializingConnectionCallback(topologyConf,
getWorkerTopologyContext(),
- this::transferLocal));
+ this::transferLocalBatch));
+ // Send curr BackPressure status to new clients
+ receiver.registerNewConnectionResponse(
+ () -> {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ LOG.info("Sending BackPressure status to new client.
BPStatus: {}", bpStatus);
+ return bpStatus;
+ }
+ );
}
- public void transferLocal(List<AddressedTuple> tupleBatch) {
- Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
- for (AddressedTuple tuple : tupleBatch) {
- Integer executor = taskToShortExecutor.get(tuple.dest);
- if (null == executor) {
- LOG.warn("Received invalid messages for unknown tasks.
Dropping... ");
- continue;
- }
- List<AddressedTuple> current = grouped.get(executor);
- if (null == current) {
- current = new ArrayList<>();
- grouped.put(executor, current);
- }
- current.add(tuple);
- }
+ /* Not a Blocking call. If cannot emit, will add 'tuple' to
pendingEmits and return 'false'. 'pendingEmits' can be null */
+ public boolean tryTransferRemote(AddressedTuple tuple,
Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
+ return workerTransfer.tryTransferRemote(tuple, pendingEmits,
serializer);
+ }
- for (Map.Entry<Integer, List<AddressedTuple>> entry :
grouped.entrySet()) {
- DisruptorQueue queue =
shortExecutorReceiveQueueMap.get(entry.getKey());
- if (null != queue) {
- queue.publish(entry.getValue());
- } else {
- LOG.warn("Received invalid messages for unknown tasks.
Dropping... ");
- }
- }
+ public void flushRemotes() throws InterruptedException {
+ workerTransfer.flushRemotes();
}
- public void transfer(KryoTupleSerializer serializer,
List<AddressedTuple> tupleBatch) {
- if (trySerializeLocal) {
- assertCanSerialize(serializer, tupleBatch);
- }
- List<AddressedTuple> local = new ArrayList<>();
- Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
- for (AddressedTuple addressedTuple : tupleBatch) {
- int destTask = addressedTuple.getDest();
- if (taskIds.contains(destTask)) {
- // Local task
- local.add(addressedTuple);
- } else {
- // Using java objects directly to avoid performance issues
in java code
- if (! remoteMap.containsKey(destTask)) {
- remoteMap.put(destTask, new ArrayList<>());
+ public boolean tryFlushRemotes() {
+ return workerTransfer.tryFlushRemotes();
+ }
+
+ // Receives msgs from remote workers and feeds them to local
executors. If any receiving local executor is under Back Pressure,
+ // informs other workers about back pressure situation. Runs in the
NettyWorker thread.
+ private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
+ int lastOverflowCount = 0; // overflowQ size at the time the last
BPStatus was sent
+
+ for (int i = 0; i < tupleBatch.size(); i++) {
+ AddressedTuple tuple = tupleBatch.get(i);
+ JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+
+ // 1- try adding to main queue if its overflow is not empty
+ if (queue.isEmptyOverflow()) {
+ if (queue.tryPublish(tuple)) {
+ continue;
}
- remoteMap.get(destTask).add(new TaskMessage(destTask,
serializer.serialize(addressedTuple.getTuple())));
}
- }
- if (!local.isEmpty()) {
- transferLocal(local);
- }
- if (!remoteMap.isEmpty()) {
- transferQueue.publish(remoteMap);
- }
- }
+ // 2- BP detected (i.e MainQ is full). So try adding to
overflow
+ int currOverflowCount = queue.getOverflowCount();
+ if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+ receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
+ lastOverflowCount = currOverflowCount;
+ } else {
- // TODO: consider having a max batch size besides what disruptor does
automagically to prevent latency issues
- public void sendTuplesToRemoteWorker(HashMap<Integer,
ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
- drainer.add(packets);
- if (batchEnd) {
- ReentrantReadWriteLock.ReadLock readLock =
endpointSocketLock.readLock();
- try {
- readLock.lock();
- drainer.send(cachedTaskToNodePort.get(),
cachedNodeToPortSocket.get());
- } finally {
- readLock.unlock();
+ if (currOverflowCount - lastOverflowCount > 10000) {
+ // resend BP status, in case prev notification was
missed or reordered
+ BackPressureStatus bpStatus =
bpTracker.getCurrStatus();
+ receiver.sendBackPressureStatus(bpStatus);
+ lastOverflowCount = currOverflowCount;
+ LOG.debug("Re-sent BackPressure Status. OverflowCount
= {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
+ }
+ }
+ if (!queue.tryPublishToOverflow(tuple)) {
+ dropMessage(tuple, queue);
}
- drainer.clear();
}
}
+ private void dropMessage(AddressedTuple tuple, JCQueue queue) {
+ ++dropCount;
+ queue.recordMsgDrop();
+ LOG.warn("Dropping message as overflow threshold has reached for Q
= {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
queue.getName(), queue.getOverflowCount(), dropCount, tuple.toString());
--- End diff --
nit: `tuple.toString()` is not needed, as the logging will do it for you,
but since this is a warn log that should almost never be called it really does
not matter.
---