Repository: storm Updated Branches: refs/heads/master 5e69300f6 -> 282a68af0
[STORM-2983] fix the issue that the inter-worker transfer thread is not spun up on some cases on RAS cluster Because the number of workers are dynamically calculated on RAS and is not populated to SupervisorStormConf Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5516f291 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5516f291 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5516f291 Branch: refs/heads/master Commit: 5516f291a635243d7ab2d34a4d967bb6b218fc4f Parents: e6a3ac6 Author: Ethan Li <ethanopensou...@gmail.com> Authored: Thu Mar 29 19:33:18 2018 -0500 Committer: Ethan Li <ethanopensou...@gmail.com> Committed: Wed Apr 4 10:37:40 2018 -0500 ---------------------------------------------------------------------- .../src/jvm/org/apache/storm/daemon/worker/Worker.java | 9 +++++++-- .../src/jvm/org/apache/storm/daemon/worker/WorkerState.java | 4 ++++ 2 files changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5516f291/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index 94ea9af..65915d0 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -27,8 +27,10 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -36,6 +38,7 @@ import java.util.stream.Collectors; import javax.security.auth.Subject; +import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.ObjectUtils; import org.apache.storm.Config; @@ -221,9 +224,11 @@ public class Worker implements Shutdownable, DaemonCommon { } executorsAtom.set(newExecutors); - // This thread will send out messages destined for remote tasks (on other workers) - if ( ( (Long)topologyConf.get(Config.TOPOLOGY_WORKERS) ) > 1 ) { + // If there are no remote tasks, don't start the thread. + Set<Integer> remoteTasks = Sets.difference(new HashSet<>( workerState.getOutboundTasks()), + new HashSet<>(workerState.getLocalTaskIds())); + if (!remoteTasks.isEmpty()) { transferThread = workerState.makeTransferThread(); transferThread.setName("Worker-Transfer"); } http://git-wip-us.apache.org/repos/asf/storm/blob/5516f291/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index 992dcbc..8d8c62f 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -718,6 +718,10 @@ public class WorkerState { return outboundTasks; } + public Set<Integer> getOutboundTasks() { + return this.outboundTasks; + } + public void haltWorkerTransfer() { workerTransfer.haltTransferThd(); }