Repository: storm Updated Branches: refs/heads/master 873028b58 -> a18657d0c
fix STORM-2879 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9655d0dc Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9655d0dc Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9655d0dc Branch: refs/heads/master Commit: 9655d0dc8c4f01e17edc3ff823cf7446dbc9930e Parents: d644e29 Author: chenyuzhao <chenyuz...@meituan.com> Authored: Wed Jan 3 15:31:38 2018 +0800 Committer: chenyuzhao <chenyuz...@meituan.com> Committed: Wed Jan 3 15:31:38 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/storm/daemon/supervisor/Slot.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9655d0dc/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 4baff66..cb41654 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -987,7 +987,18 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback Container container = null; if (currentAssignment != null) { try { - container = containerLauncher.recoverContainer(port, currentAssignment, localState); + // For now we do not make a transaction when removing a topology assignment from local, an overdue + // assignment may be left on local disk. + // So we should check if the local disk assignment is valid when initializing: + // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out; + // if topology files exist but the topology id is invalid, just let Supervisor make a sync; + // if topology files exist and topology files is valid, recover the container. + if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) { + container = containerLauncher.recoverContainer(port, currentAssignment, localState); + } else { + // Make the assignment null to let slot clean up the disk assignment. + currentAssignment = null; + } } catch (ContainerRecoveryException e) { //We could not recover container will be null. }