Repository: storm
Updated Branches:
  refs/heads/master 873028b58 -> a18657d0c


fix STORM-2879


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9655d0dc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9655d0dc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9655d0dc

Branch: refs/heads/master
Commit: 9655d0dc8c4f01e17edc3ff823cf7446dbc9930e
Parents: d644e29
Author: chenyuzhao <chenyuz...@meituan.com>
Authored: Wed Jan 3 15:31:38 2018 +0800
Committer: chenyuzhao <chenyuz...@meituan.com>
Committed: Wed Jan 3 15:31:38 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/storm/daemon/supervisor/Slot.java  | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9655d0dc/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java 
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 4baff66..cb41654 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -987,7 +987,18 @@ public class Slot extends Thread implements AutoCloseable, 
BlobChangingCallback
         Container container = null;
         if (currentAssignment != null) { 
             try {
-                container = containerLauncher.recoverContainer(port, 
currentAssignment, localState);
+                // For now we do not make a transaction when removing a 
topology assignment from local, an overdue
+                // assignment may be left on local disk.
+                // So we should check if the local disk assignment is valid 
when initializing:
+                // if topology files does not exist, the worker[possibly 
alive] will be reassigned if it is timed-out;
+                // if topology files exist but the topology id is invalid, 
just let Supervisor make a sync;
+                // if topology files exist and topology files is valid, 
recover the container.
+                if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, 
currentAssignment.get_topology_id())) {
+                    container = containerLauncher.recoverContainer(port, 
currentAssignment, localState);
+                } else {
+                    // Make the assignment null to let slot clean up the disk 
assignment.
+                    currentAssignment = null;
+                }
             } catch (ContainerRecoveryException e) {
                 //We could not recover container will be null.
             }

Reply via email to