Repository: storm Updated Branches: refs/heads/1.x-branch 8363c40dc -> 00e303fc9
fix STORM-2879 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/501efb30 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/501efb30 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/501efb30 Branch: refs/heads/1.x-branch Commit: 501efb30a75916ca3edfb88d3a7bca21ae36c77e Parents: c2b3673 Author: chenyuzhao <chenyuz...@meituan.com> Authored: Sat Jan 6 08:21:58 2018 +0800 Committer: chenyuzhao <chenyuz...@meituan.com> Committed: Sat Jan 6 08:21:58 2018 +0800 ---------------------------------------------------------------------- .../jvm/org/apache/storm/daemon/supervisor/Slot.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/501efb30/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java index 11f800a..60bcef3 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java @@ -638,7 +638,18 @@ public class Slot extends Thread implements AutoCloseable { Container container = null; if (currentAssignment != null) { try { - container = containerLauncher.recoverContainer(port, currentAssignment, localState); + // For now we do not make a transaction when removing a topology assignment from local, an overdue + // assignment may be left on local disk. + // So we should check if the local disk assignment is valid when initializing: + // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out; + // if topology files exist but the topology id is invalid, just let Supervisor make a sync; + // if topology files exist and topology files is valid, recover the container. + if (SupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) { + container = containerLauncher.recoverContainer(port, currentAssignment, localState); + } else { + // Make the assignment null to let slot clean up the disk assignment. + currentAssignment = null; + } } catch (ContainerRecoveryException e) { //We could not recover container will be null. }