Update SyncSupervisorEvent.java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d0551d0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d0551d0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d0551d0 Branch: refs/heads/master Commit: 7d0551d0501bf625a9ae8f592df71150bd160540 Parents: afd2e3f 7536489 Author: xiaojian.fxj <[email protected]> Authored: Fri Mar 25 13:19:27 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Fri Mar 25 13:37:36 2016 +0800 ---------------------------------------------------------------------- .../org/apache/storm/daemon/local_supervisor.clj | 2 +- .../apache/storm/daemon/supervisor/Supervisor.java | 2 +- .../storm/daemon/supervisor/SyncProcessEvent.java | 7 +++---- .../daemon/supervisor/SyncSupervisorEvent.java | 16 +++++----------- .../supervisor/timer/SupervisorHeartbeat.java | 4 ++-- 5 files changed, 12 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7d0551d0/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java index 41fa01d,feb8e03..fb4e7ab --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java @@@ -82,8 -82,8 +82,7 @@@ public class SyncProcessEvent implement public SyncProcessEvent(SupervisorData supervisorData) { init(supervisorData); } -- -- //TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java ++ public void init(SupervisorData supervisorData){ this.supervisorData = supervisorData; this.localState = supervisorData.getLocalState(); http://git-wip-us.apache.org/repos/asf/storm/blob/7d0551d0/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java index 4549d4d,4f33c85..b53db06 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java @@@ -204,17 -204,11 +204,11 @@@ public class SyncSupervisorEvent implem for (Integer port : intersectAssignment.keySet()) { List<ExecutorInfo> existExecutors = existingAssignment.get(port).get_executors(); List<ExecutorInfo> newExecutors = newAssignment.get(port).get_executors(); - if (newExecutors.size() != existExecutors.size()) { - syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); - continue; + Set<ExecutorInfo> setExitExecutors = new HashSet<>(existExecutors); + Set<ExecutorInfo> setNewExecutors = new HashSet<>(newExecutors); - if (setExitExecutors != setNewExecutors){ ++ if (!setExitExecutors.equals(setNewExecutors)){ + syncProcesses.killWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); } - for (ExecutorInfo executorInfo : newExecutors) { - if (!existExecutors.contains(executorInfo)) { - syncProcesses.shutWorker(supervisorData, supervisorData.getWorkerManager(), vaildPortToWorkerIds.get(port)); - break; - } - } - } }
