Repository: storm
Updated Branches:
  refs/heads/master 5e69300f6 -> 282a68af0


[STORM-2983] fix the issue that the inter-worker transfer thread is not spun up 
on some cases on RAS cluster

Because the number of workers are dynamically calculated on RAS and is not 
populated to SupervisorStormConf


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5516f291
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5516f291
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5516f291

Branch: refs/heads/master
Commit: 5516f291a635243d7ab2d34a4d967bb6b218fc4f
Parents: e6a3ac6
Author: Ethan Li <ethanopensou...@gmail.com>
Authored: Thu Mar 29 19:33:18 2018 -0500
Committer: Ethan Li <ethanopensou...@gmail.com>
Committed: Wed Apr 4 10:37:40 2018 -0500

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/daemon/worker/Worker.java      | 9 +++++++--
 .../src/jvm/org/apache/storm/daemon/worker/WorkerState.java | 4 ++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5516f291/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index 94ea9af..65915d0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -27,8 +27,10 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -36,6 +38,7 @@ import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.storm.Config;
@@ -221,9 +224,11 @@ public class Worker implements Shutdownable, DaemonCommon {
         }
         executorsAtom.set(newExecutors);
 
-
         // This thread will send out messages destined for remote tasks (on 
other workers)
-        if ( ( (Long)topologyConf.get(Config.TOPOLOGY_WORKERS) ) > 1 ) {
+        // If there are no remote tasks, don't start the thread.
+        Set<Integer> remoteTasks = Sets.difference(new HashSet<>( 
workerState.getOutboundTasks()),
+                new HashSet<>(workerState.getLocalTaskIds()));
+        if (!remoteTasks.isEmpty()) {
             transferThread = workerState.makeTransferThread();
             transferThread.setName("Worker-Transfer");
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/5516f291/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git 
a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java 
b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 992dcbc..8d8c62f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -718,6 +718,10 @@ public class WorkerState {
         return outboundTasks;
     }
 
+    public Set<Integer> getOutboundTasks() {
+        return this.outboundTasks;
+    }
+
     public void haltWorkerTransfer() {
         workerTransfer.haltTransferThd();
     }

Reply via email to