Repository: storm
Updated Branches:
  refs/heads/master 4ce6f04e8 -> e0056c7d6


http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
new file mode 100644
index 0000000..5265cf6
--- /dev/null
+++ 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+public class BasicContainerTest {
+    public static class CommandRun {
+        final List<String> cmd;
+        final Map<String, String> env;
+        final File pwd;
+        
+        public CommandRun(List<String> cmd, Map<String, String> env, File pwd) 
{
+            this.cmd = cmd;
+            this.env = env;
+            this.pwd = pwd;
+        }
+    }
+    
+    public static class MockBasicContainer extends BasicContainer {
+        public MockBasicContainer(ContainerType type, Map<String, Object> 
conf, String supervisorId, int port,
+                LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager, LocalState localState,
+                String workerId, Map<String, Object> topoConf, AdvancedFSOps 
ops, String profileCmd)
+                throws IOException {
+            super(type, conf, supervisorId, port, assignment, 
resourceIsolationManager, localState, workerId, topoConf, ops,
+                    profileCmd);
+        }
+
+        public final List<CommandRun> profileCmds = new ArrayList<>();
+        public final List<CommandRun> workerCmds = new ArrayList<>();
+        
+
+        @Override
+        protected Map<String, Object> readTopoConf() throws IOException {
+            return new HashMap<>();
+        }
+        
+        @Override
+        public void createNewWorkerId() {
+            super.createNewWorkerId();
+        }
+        
+        @Override
+        public List<String> substituteChildopts(Object value, int memOnheap) {
+            return super.substituteChildopts(value, memOnheap);
+        }
+               
+        @Override
+        protected boolean runProfilingCommand(List<String> command, 
Map<String, String> env, String logPrefix,
+                File targetDir) throws IOException, InterruptedException {
+            profileCmds.add(new CommandRun(command, env, targetDir));
+            return true;
+        }
+        
+        @Override
+        protected void launchWorkerProcess(List<String> command, Map<String, 
String> env, String logPrefix,
+                ExitCodeCallback processExitCallback, File targetDir) throws 
IOException {
+            workerCmds.add(new CommandRun(command, env, targetDir));
+        }
+        
+        @Override
+        protected String javaCmd(String cmd) {
+            //avoid system dependent things
+            return cmd;
+        }
+        
+        @Override
+        protected List<String> frameworkClasspath() {
+            //We are not really running anything so make this
+            // simple to check for
+            return Arrays.asList("FRAMEWORK_CP");
+        }
+        
+        @Override
+        protected String javaLibraryPath(String stormRoot, Map<String, Object> 
conf) {
+            return "JLP";
+        }
+    }
+    
+    @Test
+    public void testCreateNewWorkerId() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
+                "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), ops, 
"profile");
+        //null worker id means generate one...
+        
+        assertNotNull(mc._workerId);
+        verify(ls).getApprovedWorkers();
+        Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
+        expectedNewState.put(mc._workerId, port);
+        verify(ls).setApprovedWorkers(expectedNewState);
+    }
+    
+    @Test
+    public void testRecovery() throws Exception {
+        final String topoId = "test_topology";
+        final String workerId = "myWorker";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put(workerId, port);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(workerState);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        MockBasicContainer mc = new 
MockBasicContainer(ContainerType.RECOVER_FULL, superConf, 
+                "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), ops, 
"profile");
+        
+        assertEquals(workerId, mc._workerId);
+    }
+    
+    @Test
+    public void testRecoveryMiss() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put("somethingelse", port+1);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(workerState);
+        
+        try {
+            new MockBasicContainer(ContainerType.RECOVER_FULL, new 
HashMap<String, Object>(), 
+                    "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), 
null, "profile");
+            fail("Container recovered worker incorrectly");
+        } catch (ContainerRecoveryException e) {
+            //Expected
+        }
+    }
+    
+    @Test
+    public void testCleanUp() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String workerId = "worker-id";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        Map<String, Integer> workerState = new HashMap<String, Integer>();
+        workerState.put(workerId, port);
+        
+        LocalState ls = mock(LocalState.class);
+        when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
+                "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), 
ops, "profile");
+        
+        mc.cleanUp();
+        
+        assertNull(mc._workerId);
+        verify(ls).getApprovedWorkers();
+        Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
+        verify(ls).setApprovedWorkers(expectedNewState);
+    }
+    
+    @Test
+    public void testRunProfiling() throws Exception {
+        final long pid = 100;
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String workerId = "worker-id";
+        final String stormLocal = ContainerTest.asAbsPath("tmp", "testing");
+        final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, 
String.valueOf(port));
+        final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, 
"worker.pid");
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid));
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
+                "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), 
ops, "profile");
+        
+        //HEAP DUMP
+        ProfileRequest req = new ProfileRequest();
+        req.set_action(ProfileAction.JMAP_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        CommandRun cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", 
topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JSTACK DUMP
+        req.set_action(ProfileAction.JSTACK_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", 
topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //RESTART
+        req.set_action(ProfileAction.JVM_RESTART);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), 
cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE DUMP
+        req.set_action(ProfileAction.JPROFILE_DUMP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", 
topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE START
+        req.set_action(ProfileAction.JPROFILE_STOP);
+        
+        mc.runProfiling(req, false);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), 
cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+        
+        //JPROFILE STOP
+        req.set_action(ProfileAction.JPROFILE_STOP);
+        
+        mc.runProfiling(req, true);
+        
+        assertEquals(1, mc.profileCmds.size());
+        cmd = mc.profileCmds.get(0);
+        mc.profileCmds.clear();
+        assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", 
topoRoot), cmd.cmd);
+        assertEquals(new File(topoRoot), cmd.pwd);
+    }
+    
+    private static void setSystemProp(String key, String value) {
+        if (value == null) {
+            System.clearProperty(key);
+        } else {
+            System.setProperty(key, value);
+        }
+    }
+    
+    private static interface Run {
+        public void run() throws Exception;
+    }
+    
+    private static void checkpoint(Run r, String ... newValues) throws 
Exception {
+        if (newValues.length % 2 != 0) {
+            throw new IllegalArgumentException("Parameters are of the form 
system property name, new value");
+        }
+        Map<String, String> orig = new HashMap<>();
+        try {
+            for (int index = 0; index < newValues.length; index += 2) {
+                String key = newValues[index];
+                String value = newValues[index + 1];
+                orig.put(key, System.getProperty(key));
+                setSystemProp(key, value);
+            }
+            r.run();
+        } finally {
+            for (Map.Entry<String, String> entry: orig.entrySet()) {
+                setSystemProp(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+    
+    private static <T> void assertListEquals(List<T> a, List<T> b) {
+        if (a == null) {
+            assertNull(b);
+        }
+        if (b == null) {
+            assertNull(a);
+        }
+        int commonLen = Math.min(a.size(), b.size());
+        for (int i = 0; i < commonLen; i++) {
+            assertEquals("at index "+i+"\n"+a+" !=\n"+b+"\n", a.get(i), 
b.get(i));
+        }
+        
+        assertEquals("size of lists don't match \n"+a+" !=\n"+b, a.size(), 
b.size());
+    }
+    
+    @Test
+    public void testLaunch() throws Exception {
+        final String topoId = "test_topology";
+        final int port = 8080;
+        final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home");
+        final String stormLogDir = ContainerTest.asFile(".", 
"target").getCanonicalPath();
+        final String workerId = "worker-id";
+        final String stormLocal = ContainerTest.asAbsPath("tmp", 
"storm-local");
+        final String distRoot = ContainerTest.asAbsPath(stormLocal, 
"supervisor", "stormdist", topoId);
+        final File stormcode = new File(distRoot, "stormcode.ser");
+        final File stormjar = new File(distRoot, "stormjar.jar");
+        final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf");
+        final String workerConf = ContainerTest.asAbsPath(log4jdir, 
"worker.xml");
+        final String workerRoot = ContainerTest.asAbsPath(stormLocal, 
"workers", workerId);
+        final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
+        
+        final StormTopology st = new StormTopology();
+        st.set_spouts(new HashMap<>());
+        st.set_bolts(new HashMap<>());
+        st.set_state_spouts(new HashMap<>());
+        byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st));
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        superConf.put(Config.STORM_LOG4J2_CONF_DIR, log4jdir);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.slurp(stormcode)).thenReturn(serializedState);
+        
+        LocalState ls = mock(LocalState.class);
+        
+        
+        checkpoint(() -> {
+            MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf, 
+                    "SUPERVISOR", port, la, null, ls, workerId, new 
HashMap<>(), ops, "profile");
+
+            mc.launch();
+
+            assertEquals(1, mc.workerCmds.size());
+            CommandRun cmd = mc.workerCmds.get(0);
+            mc.workerCmds.clear();
+            assertListEquals(Arrays.asList(
+                    "java",
+                    "-cp",
+                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                    "-Dlogging.sensitivity=S3",
+                    "-Dlogfile.name=worker.log",
+                    "-Dstorm.home=" + stormHome,
+                    "-Dworkers.artifacts=" + stormLocal,
+                    "-Dstorm.id=" + topoId,
+                    "-Dworker.id=" + workerId,
+                    "-Dworker.port=" + port,
+                    "-Dstorm.log.dir=" + stormLogDir,
+                    "-Dlog4j.configurationFile=" + workerConf,
+                    
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                    "-Dstorm.local.dir=" + stormLocal,
+                    "org.apache.storm.LogWriter",
+                    "java",
+                    "-server",
+                    "-Dlogging.sensitivity=S3",
+                    "-Dlogfile.name=worker.log",
+                    "-Dstorm.home=" + stormHome,
+                    "-Dworkers.artifacts=" + stormLocal,
+                    "-Dstorm.id=" + topoId,
+                    "-Dworker.id=" + workerId,
+                    "-Dworker.port=" + port,
+                    "-Dstorm.log.dir=" + stormLogDir,
+                    "-Dlog4j.configurationFile=" + workerConf,
+                    
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                    "-Dstorm.local.dir=" + stormLocal,
+                    "-Djava.library.path=JLP",
+                    "-Dstorm.conf.file=",
+                    "-Dstorm.options=",
+                    "-Djava.io.tmpdir="+workerTmpDir,
+                    "-cp",
+                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                    "org.apache.storm.daemon.worker", 
+                    topoId, 
+                    "SUPERVISOR",
+                    String.valueOf(port),
+                    workerId
+                    ), cmd.cmd);
+            assertEquals(new File(workerRoot), cmd.pwd);
+          }, 
+          "storm.home", stormHome,
+          "storm.log.dir", stormLogDir);
+    }
+    
+    @Test
+    public void testSubstChildOpts() throws Exception {
+        String workerId = "w-01";
+        String topoId = "s-01";
+        int port = 9999;
+        int memOnheap = 512;
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        
+        Map<String, Object> superConf = new HashMap<>();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        
+        LocalState ls = mock(LocalState.class);
+        
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
+                "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), 
ops, "profile");
+        
+        assertListEquals(Arrays.asList(
+                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+                "-Xms256m",
+                "-Xmx512m"),
+                
mc.substituteChildopts("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log
 -Xms256m -Xmx%HEAP-MEM%m", memOnheap));
+        
+        assertListEquals(Arrays.asList(
+                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+                "-Xms256m",
+                "-Xmx512m"),
+                
mc.substituteChildopts(Arrays.asList("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log","-Xms256m","-Xmx%HEAP-MEM%m"),
 memOnheap));
+        
+        assertListEquals(Collections.emptyList(), 
+                mc.substituteChildopts(null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
new file mode 100644
index 0000000..b1adcd8
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.daemon.supervisor.Container.ContainerType;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.junit.Test;
+import org.yaml.snakeyaml.Yaml;
+
+import com.google.common.base.Joiner;
+
+public class ContainerTest {
+    public static class MockContainer extends Container {
+        
+        protected MockContainer(ContainerType type, Map<String, Object> conf, 
String supervisorId, int port,
+                LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager, String workerId,
+                Map<String, Object> topoConf, AdvancedFSOps ops) throws 
IOException {
+            super(type, conf, supervisorId, port, assignment, 
resourceIsolationManager, workerId, topoConf, ops);
+        }
+
+        public final List<Long> killedPids = new ArrayList<>();
+        public final List<Long> forceKilledPids = new ArrayList<>();
+        public final Set<Long> allPids = new HashSet<>();
+
+        @Override
+        protected void kill(long pid) {
+            killedPids.add(pid);
+        }
+        
+        @Override
+        protected void forceKill(long pid) {
+            forceKilledPids.add(pid);
+        }
+        
+        @Override
+        protected Set<Long> getAllPids() throws IOException {
+            return allPids;
+        }
+        
+        @Override
+        public void launch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public void relaunch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public boolean didMainProcessExit() {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+
+        @Override
+        public boolean runProfiling(ProfileRequest request, boolean stop) 
throws IOException, InterruptedException {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+    }
+
+    @Test
+    public void testKill() throws Exception {
+        final String topoId = "test_topology";
+        final Map<String, Object> superConf = new HashMap<>();
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", 8080, la, null, "worker", new HashMap<>(), ops);
+        mc.kill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        mc.forceKill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        
+        long pid = 987654321;
+        mc.allPids.add(pid);
+        
+        mc.kill();
+        assertEquals(mc.allPids, new HashSet<>(mc.killedPids));
+        assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
+        mc.killedPids.clear();
+        
+        mc.forceKill();
+        assertEquals(Collections.EMPTY_LIST, mc.killedPids);
+        assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids));
+    }
+    
+    private static final Joiner PATH_JOIN = 
Joiner.on(File.separator).skipNulls();
+    private static final String DOUBLE_SEP = File.separator + File.separator;  
  
+    static String asAbsPath(String ... parts) {
+        return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, 
File.separator);
+    }
+    
+    static File asAbsFile(String ... parts) {
+        return new File(asAbsPath(parts));
+    }
+    
+    static String asPath(String ... parts) {
+        return PATH_JOIN.join(parts);
+    }
+    
+    public static File asFile(String ... parts) {
+        return new File(asPath(parts));
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testSetup() throws Exception {
+        final int port = 8080;
+        final String topoId = "test_topology";
+        final String workerId = "worker_id";
+        final String user = "me";
+        final String stormLocal = asAbsPath("tmp", "testing");
+        final File workerArtifacts = asAbsFile(stormLocal, topoId, 
String.valueOf(port));
+        final File logMetadataFile = new File(workerArtifacts, "worker.yaml");
+        final File workerUserFile = asAbsFile(stormLocal, "workers-users", 
workerId);
+        final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
+        final File distRoot = asAbsFile(stormLocal, "supervisor", "stormdist", 
topoId);
+        
+        final Map<String, Object> topoConf = new HashMap<>();
+        final List<String> topoUsers = Arrays.asList("t-user-a", "t-user-b");
+        final List<String> logUsers = Arrays.asList("l-user-a", "l-user-b");
+        
+        final List<String> topoGroups = Arrays.asList("t-group-a", 
"t-group-b");
+        final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
+        
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        topoConf.put(Config.LOGS_GROUPS, logGroups);
+        topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
+        topoConf.put(Config.LOGS_USERS, logUsers);
+        topoConf.put(Config.TOPOLOGY_USERS, topoUsers);
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        final StringWriter yamlDump = new StringWriter();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.fileExists(workerArtifacts)).thenReturn(true);
+        when(ops.fileExists(workerRoot)).thenReturn(true);
+        when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", 8080, la, null, workerId, topoConf, ops);
+        
+        mc.setup();
+        
+        //Initial Setup
+        verify(ops).forceMkdir(new File(workerRoot, "pids"));
+        verify(ops).forceMkdir(new File(workerRoot, "tmp"));
+        verify(ops).forceMkdir(new File(workerRoot, "heartbeats"));
+        verify(ops).fileExists(workerArtifacts);
+        
+        //Log file permissions
+        verify(ops).getWriter(logMetadataFile);
+        
+        String yamlResult = yamlDump.toString();
+        Yaml yaml = new Yaml();
+        Map<String, Object> result = (Map<String, Object>) 
yaml.load(yamlResult);
+        assertEquals(workerId, result.get("worker-id"));
+        assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER));
+        HashSet<String> allowedUsers = new HashSet<>(topoUsers);
+        allowedUsers.addAll(logUsers);
+        assertEquals(allowedUsers, new 
HashSet<String>((List<String>)result.get(Config.LOGS_USERS)));
+        
+        HashSet<String> allowedGroups = new HashSet<>(topoGroups);
+        allowedGroups.addAll(logGroups);
+        assertEquals(allowedGroups, new 
HashSet<String>((List<String>)result.get(Config.LOGS_GROUPS)));
+        
+        //Save the current user to help with recovery
+        verify(ops).dump(workerUserFile, user);
+        
+        //Create links to artifacts dir
+        verify(ops).createSymlink(new File(workerRoot, "artifacts"), 
workerArtifacts);
+        
+        //Create links to blobs 
+        verify(ops).createSymlink(new File(workerRoot, "resources"), new 
File(distRoot, "resources"));
+    }
+    
+    @Test
+    public void testCleanup() throws Exception {
+        final int port = 8080;
+        final long pid = 100;
+        final String topoId = "test_topology";
+        final String workerId = "worker_id";
+        final String user = "me";
+        final String stormLocal = asAbsPath("tmp", "testing");
+        final File workerArtifacts = asAbsFile(stormLocal, topoId, 
String.valueOf(port));
+        final File logMetadataFile = new File(workerArtifacts, "worker.yaml");
+        final File workerUserFile = asAbsFile(stormLocal, "workers-users", 
workerId);
+        final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
+        final File workerPidsRoot = new File(workerRoot, "pids");
+        
+        final Map<String, Object> topoConf = new HashMap<>();
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        
+        final Map<String, Object> superConf = new HashMap<>();
+        superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
+        
+        final StringWriter yamlDump = new StringWriter();
+        
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
+        when(ops.fileExists(workerArtifacts)).thenReturn(true);
+        when(ops.fileExists(workerRoot)).thenReturn(true);
+        when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
+        
+        ResourceIsolationInterface iso = 
mock(ResourceIsolationInterface.class);
+        
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
+                "SUPERVISOR", port, la, iso, workerId, topoConf, ops);
+        mc.allPids.add(pid);
+        
+        mc.cleanUp();
+        verify(ops).deleteIfExists(eq(new File(workerPidsRoot, 
String.valueOf(pid))), eq(user), any(String.class));
+        verify(iso).releaseResourcesForWorker(workerId);
+        
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), 
any(String.class));
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "tmp")), eq(user), 
any(String.class));
+        verify(ops).deleteIfExists(eq(new File(workerRoot, "heartbeats")), 
eq(user), any(String.class));
+        verify(ops).deleteIfExists(eq(workerRoot), eq(user), 
any(String.class));
+        verify(ops).deleteIfExists(workerUserFile);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java 
b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
new file mode 100644
index 0000000..24ccda5
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java
@@ -0,0 +1,515 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.daemon.supervisor.Slot.StaticState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.daemon.supervisor.Slot.DynamicState;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+public class SlotTest {
+    static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, 
Double mem_off_heap) {
+        WorkerResources resources = new WorkerResources();
+        if (cpu != null) {
+            resources.set_cpu(cpu);
+        }
+        
+        if (mem_on_heap != null) {
+            resources.set_mem_on_heap(mem_on_heap);
+        }
+        
+        if (mem_off_heap != null) {
+            resources.set_mem_off_heap(mem_off_heap);
+        }
+        return resources;
+    }
+    
+    static LSWorkerHeartbeat mkWorkerHB(String id, int port, 
List<ExecutorInfo> exec, Integer timeSecs) {
+        LSWorkerHeartbeat ret = new LSWorkerHeartbeat();
+        ret.set_topology_id(id);
+        ret.set_port(port);
+        ret.set_executors(exec);
+        ret.set_time_secs(timeSecs);
+        return ret;
+    }
+    
+    static List<ExecutorInfo> mkExecutorInfoList(int ... executors) {
+        ArrayList<ExecutorInfo> ret = new ArrayList<>(executors.length);
+        for (int exec : executors) {
+            ExecutorInfo execInfo = new ExecutorInfo();
+            execInfo.set_task_start(exec);
+            execInfo.set_task_end(exec);
+            ret.add(execInfo);
+        }
+        return ret;
+    }
+    
+    static LocalAssignment mkLocalAssignment(String id, List<ExecutorInfo> 
exec, WorkerResources resources) {
+        LocalAssignment ret = new LocalAssignment();
+        ret.set_topology_id(id);
+        ret.set_executors(exec);
+        if (resources != null) {
+            ret.set_resources(resources);
+        }
+        return ret;
+    }
+    
+    @Test
+    public void testEquivilant() {
+        LocalAssignment a = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment aResized = mkLocalAssignment("A", 
mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0));
+        LocalAssignment b = mkLocalAssignment("B", 
mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0));
+        LocalAssignment bReordered = mkLocalAssignment("B", 
mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0));
+
+        assertTrue(Slot.equivalent(null, null));
+        assertTrue(Slot.equivalent(a, a));
+        assertTrue(Slot.equivalent(b, bReordered));
+        assertTrue(Slot.equivalent(bReordered, b));
+        
+        assertFalse(Slot.equivalent(a, aResized));
+        assertFalse(Slot.equivalent(aResized, a));
+        assertFalse(Slot.equivalent(a, null));
+        assertFalse(Slot.equivalent(null, b));
+        assertFalse(Slot.equivalent(a, b));
+    }
+    
+    @Test
+    public void testEmptyToEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            ILocalizer localizer = mock(ILocalizer.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 1000, 1000, 
1000, 1000,
+                    containerLauncher, "localhost", 8080, iSuper, state);
+            DynamicState dynamicState = new DynamicState(null, null, null);
+            DynamicState nextState = Slot.handleEmpty(dynamicState, 
staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertTrue(Time.currentTimeMillis() > 1000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testLaunchContainerFromEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String topoId = "NEW";
+            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment newAssignment = 
+                    mkLocalAssignment(topoId, execList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container container = mock(Container.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            when(containerLauncher.launchContainer(port, newAssignment, 
state)).thenReturn(container);
+            LSWorkerHeartbeat hb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs());
+            when(container.readHeartbeat()).thenReturn(hb, hb);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> baseFuture = mock(Future.class);
+            when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, 
port)).thenReturn(baseFuture);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> blobFuture = mock(Future.class);
+            when(localizer.requestDownloadTopologyBlobs(newAssignment, 
port)).thenReturn(blobFuture);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(null, null, null)
+                    .withNewAssignment(newAssignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
+            verify(localizer).requestDownloadBaseTopologyBlobs(newAssignment, 
port);
+            assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, 
nextState.state);
+            assertSame("pendingDownload not set properly", baseFuture, 
nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(baseFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(localizer).requestDownloadTopologyBlobs(newAssignment, 
port);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, 
nextState.state);
+            assertSame("pendingDownload not set properly", blobFuture, 
nextState.pendingDownload);
+            assertEquals(newAssignment, nextState.pendingLocalization);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(containerLauncher).launchContainer(port, newAssignment, 
state);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertEquals(0, Time.currentTimeMillis());
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(newAssignment, nextState.currentAssignment);
+            assertSame(container, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+
+
+    @Test
+    public void testRelaunch() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String topoId = "CURRENT";
+            List<ExecutorInfo> execList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment assignment = 
+                    mkLocalAssignment(topoId, execList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container container = mock(Container.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs()-10);
+            LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, 
Time.currentTimeSecs());
+            when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, 
goodhb);
+            when(container.areAllProcessesDead()).thenReturn(false, true);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(assignment, 
container, assignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
+            assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
+            verify(container).kill();
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state);
+            verify(container).forceKill();
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
+            verify(container).relaunch();
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testReschedule() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs());
+            when(cContainer.readHeartbeat()).thenReturn(chb);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            
+            String nTopoId = "NEW";
+            List<ExecutorInfo> nExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment nAssignment = 
+                    mkLocalAssignment(nTopoId, nExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            Container nContainer = mock(Container.class);
+            LocalState state = mock(LocalState.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            when(containerLauncher.launchContainer(port, nAssignment, 
state)).thenReturn(nContainer);
+            LSWorkerHeartbeat nhb = mkWorkerHB(nTopoId, 100, nExecList, 
Time.currentTimeSecs());
+            when(nContainer.readHeartbeat()).thenReturn(nhb, nhb);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> baseFuture = mock(Future.class);
+            when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, 
port)).thenReturn(baseFuture);
+            
+            @SuppressWarnings("unchecked")
+            Future<Void> blobFuture = mock(Future.class);
+            when(localizer.requestDownloadTopologyBlobs(nAssignment, 
port)).thenReturn(blobFuture);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, nAssignment);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).kill();
+            verify(localizer).requestDownloadBaseTopologyBlobs(nAssignment, 
port);
+            assertSame("pendingDownload not set properly", baseFuture, 
nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).forceKill();
+            assertSame("pendingDownload not set properly", baseFuture, 
nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, 
nextState.state);
+            verify(cContainer).cleanUp();
+            verify(localizer).releaseSlotFor(cAssignment, port);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, 
nextState.state);
+            verify(baseFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(localizer).requestDownloadTopologyBlobs(nAssignment, port);
+            assertSame("pendingDownload not set properly", blobFuture, 
nextState.pendingDownload);
+            assertEquals(nAssignment, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            verify(blobFuture).get(1000, TimeUnit.MILLISECONDS);
+            verify(containerLauncher).launchContainer(port, nAssignment, 
state);
+            assertEquals(MachineState.WAITING_FOR_WORKER_START, 
nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertSame("pendingDownload is not null", null, 
nextState.pendingDownload);
+            assertSame(null, nextState.pendingLocalization);
+            assertSame(nAssignment, nextState.currentAssignment);
+            assertSame(nContainer, nextState.container);
+            assertTrue(Time.currentTimeMillis() > 4000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+
+    
+    @Test
+    public void testRunningToEmpty() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs());
+            when(cContainer.readHeartbeat()).thenReturn(chb);
+            when(cContainer.areAllProcessesDead()).thenReturn(false, true);
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, null);
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).kill();
+            verify(localizer, never()).requestDownloadBaseTopologyBlobs(null, 
port);
+            assertSame("pendingDownload not set properly", null, 
nextState.pendingDownload);
+            assertEquals(null, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.KILL, nextState.state);
+            verify(cContainer).forceKill();
+            assertSame("pendingDownload not set properly", null, 
nextState.pendingDownload);
+            assertEquals(null, nextState.pendingLocalization);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            verify(cContainer).cleanUp();
+            verify(localizer).releaseSlotFor(cAssignment, port);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.EMPTY, nextState.state);
+            assertEquals(null, nextState.container);
+            assertEquals(null, nextState.currentAssignment);
+            assertTrue(Time.currentTimeMillis() > 3000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+    
+    @Test
+    public void testRunWithProfileActions() throws Exception {
+        Time.startSimulatingAutoAdvanceOnSleep(1010);
+        try {
+            int port = 8080;
+            String cTopoId = "CURRENT";
+            List<ExecutorInfo> cExecList =  mkExecutorInfoList(1,2,3,4,5);
+            LocalAssignment cAssignment = 
+                    mkLocalAssignment(cTopoId, cExecList, 
mkWorkerResources(100.0, 100.0, 100.0));
+            
+            Container cContainer = mock(Container.class);
+            LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, 
Time.currentTimeSecs()+100); //NOT going to timeout for a while
+            when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, 
chb, chb);
+            when(cContainer.runProfiling(any(ProfileRequest.class), 
anyBoolean())).thenReturn(true);
+            
+            ILocalizer localizer = mock(ILocalizer.class);
+            ContainerLauncher containerLauncher = 
mock(ContainerLauncher.class);
+            
+            ISupervisor iSuper = mock(ISupervisor.class);
+            LocalState state = mock(LocalState.class);
+            StaticState staticState = new StaticState(localizer, 5000, 120000, 
1000, 1000,
+                    containerLauncher, "localhost", port, iSuper, state);
+            Set<TopoProfileAction> profileActions = new HashSet<>();
+            ProfileRequest request = new ProfileRequest();
+            request.set_action(ProfileAction.JPROFILE_STOP);
+            NodeInfo info = new NodeInfo();
+            info.set_node("localhost");
+            info.add_to_port(port);
+            request.set_nodeInfo(info);
+            request.set_time_stamp(Time.currentTimeMillis() + 3000);//3 
seconds from now
+            
+            TopoProfileAction profile = new TopoProfileAction(cTopoId, 
request);
+            profileActions.add(profile);
+            Set<TopoProfileAction> expectedPending = new HashSet<>();
+            expectedPending.add(profile);
+            
+            
+            DynamicState dynamicState = new DynamicState(cAssignment, 
cContainer, cAssignment).withProfileActions(profileActions, 
Collections.<TopoProfileAction> emptySet());
+            
+            DynamicState nextState = Slot.stateMachineStep(dynamicState, 
staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            verify(cContainer).runProfiling(request, false);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 1000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 2000);
+            
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(expectedPending, nextState.pendingStopProfileActions);
+            assertEquals(expectedPending, nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 3000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            verify(cContainer).runProfiling(request, true);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 4000);
+            
+            nextState = Slot.stateMachineStep(nextState, staticState);
+            assertEquals(MachineState.RUNNING, nextState.state);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.pendingStopProfileActions);
+            assertEquals(Collections.<TopoProfileAction> emptySet(), 
nextState.profileActions);
+            assertTrue(Time.currentTimeMillis() > 5000);
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java 
b/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java
new file mode 100644
index 0000000..98a43e4
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.Time;
+import org.junit.Test;
+
+public class ReportErrorTest {
+
+    @Test
+    public void testReport() {
+        final String topo = "topology";
+        final String comp = "component";
+        final Long port = new Long(8080);
+        final AtomicLong errorCount = new AtomicLong(0l);
+        
+        WorkerTopologyContext context = mock(WorkerTopologyContext.class);
+        when(context.getThisWorkerPort()).thenReturn(port.intValue());
+        
+        IStormClusterState state = mock(IStormClusterState.class);
+        doAnswer((invocation) -> errorCount.incrementAndGet())
+            .when(state).reportError(eq(topo), eq(comp), any(String.class), 
eq(port), any(Throwable.class));
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS, 10);
+        conf.put(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL, 4);
+        
+        Time.startSimulating();
+        try {
+            ReportError report = new ReportError(conf, state, topo, comp, 
context);
+            report.report(new RuntimeException("ERROR-1"));
+            assertEquals(1, errorCount.get());
+            report.report(new RuntimeException("ERROR-2"));
+            assertEquals(2, errorCount.get());
+            report.report(new RuntimeException("ERROR-3"));
+            assertEquals(3, errorCount.get());
+            report.report(new RuntimeException("ERROR-4"));
+            assertEquals(4, errorCount.get());
+            //Too fast not reported
+            report.report(new RuntimeException("ERROR-5"));
+            assertEquals(4, errorCount.get());
+            Time.advanceTime(9000);
+            report.report(new RuntimeException("ERROR-6"));
+            assertEquals(4, errorCount.get());
+            Time.advanceTime(2000);
+            report.report(new RuntimeException("ERROR-7"));
+            assertEquals(5, errorCount.get());
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java 
b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
new file mode 100644
index 0000000..41c17c8
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.utils.ConfigUtils;
+import org.junit.Test;
+
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.apache.storm.utils.Utils;
+
+public class AsyncLocalizerTest {
+
+    @Test
+    public void testRequestDownloadBaseTopologyBlobs() throws Exception {
+        final String topoId = "TOPO";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        ExecutorInfo ei = new ExecutorInfo();
+        ei.set_task_start(1);
+        ei.set_task_end(1);
+        la.add_to_executors(ei);
+        final int port = 8080;
+        final String jarKey = topoId + "-stormjar.jar";
+        final String codeKey = topoId + "-stormcode.ser";
+        final String confKey = topoId + "-stormconf.ser";
+        final String stormLocal = "/tmp/storm-local/";
+        final String stormRoot = stormLocal+topoId+"/";
+        final File fStormRoot = new File(stormRoot);
+        ClientBlobStore blobStore = mock(ClientBlobStore.class);
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.SUPERVISOR_BLOBSTORE, ClientBlobStore.class.getName());
+        conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, 
DefaultPrincipalToLocal.class.getName());
+        conf.put(Config.STORM_CLUSTER_MODE, "distributed");
+        conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        Localizer localizer = mock(Localizer.class);
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        ConfigUtils mockedCU = mock(ConfigUtils.class);
+        Utils mockedU = mock(Utils.class);
+        
+        Map<String, Object> topoConf = new HashMap<>(conf);
+        
+        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+        Utils origUtils = Utils.setInstance(mockedU);
+        try {
+            when(mockedCU.supervisorStormDistRootImpl(conf, 
topoId)).thenReturn(stormRoot);
+            when(mockedCU.supervisorLocalDirImpl(conf)).thenReturn(stormLocal);
+            
when(mockedU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
+            when(mockedCU.readSupervisorStormConfImpl(conf, 
topoId)).thenReturn(topoConf);
+
+            Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port);
+            f.get(20, TimeUnit.SECONDS);
+            // We should be done now...
+            
+            verify(blobStore).prepare(conf);
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(jarKey), 
startsWith(stormLocal), eq(blobStore));
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(codeKey), 
startsWith(stormLocal), eq(blobStore));
+            verify(mockedU).downloadResourcesAsSupervisorImpl(eq(confKey), 
startsWith(stormLocal), eq(blobStore));
+            verify(blobStore).shutdown();
+            //Extracting the dir from the jar
+            verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), 
eq("resources"), any(File.class));
+            verify(ops).moveDirectoryPreferAtomic(any(File.class), 
eq(fStormRoot));
+            verify(ops).setupStormCodeDir(topoConf, fStormRoot);
+            
+            verify(ops, never()).deleteIfExists(any(File.class));
+        } finally {
+            al.shutdown();
+            ConfigUtils.setInstance(orig);
+            Utils.setInstance(origUtils);
+        }
+    }
+
+    @Test
+    public void testRequestDownloadTopologyBlobs() throws Exception {
+        final String topoId = "TOPO-12345";
+        LocalAssignment la = new LocalAssignment();
+        la.set_topology_id(topoId);
+        ExecutorInfo ei = new ExecutorInfo();
+        ei.set_task_start(1);
+        ei.set_task_end(1);
+        la.add_to_executors(ei);
+        final String topoName = "TOPO";
+        final int port = 8080;
+        final String user = "user";
+        final String simpleLocalName = "simple.txt";
+        final String simpleKey = "simple";
+        
+        final String stormLocal = "/tmp/storm-local/";
+        final File userDir = new File(stormLocal, user);
+        final String stormRoot = stormLocal+topoId+"/";
+        
+        final String localizerRoot = "/tmp/storm-localizer/";
+        final String simpleLocalFile = localizerRoot + user + "/simple";
+        final String simpleCurrentLocalFile = localizerRoot + user + 
"/simple.current";
+       
+        final StormTopology st = new StormTopology();
+        st.set_spouts(new HashMap<>());
+        st.set_bolts(new HashMap<>());
+        st.set_state_spouts(new HashMap<>());
+ 
+        Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+        Map<String, Object> simple = new HashMap<>();
+        simple.put("localname", simpleLocalName);
+        simple.put("uncompress", false);
+        topoBlobMap.put(simpleKey, simple);
+        
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.STORM_LOCAL_DIR, stormLocal);
+        Localizer localizer = mock(Localizer.class);
+        AdvancedFSOps ops = mock(AdvancedFSOps.class);
+        ConfigUtils mockedCU = mock(ConfigUtils.class);
+        Utils mockedU = mock(Utils.class);
+        
+        Map<String, Object> topoConf = new HashMap<>(conf);
+        topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+        topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user);
+        topoConf.put(Config.TOPOLOGY_NAME, topoName);
+        
+        List<LocalizedResource> localizedList = new ArrayList<>();
+        LocalizedResource simpleLocal = new LocalizedResource(simpleKey, 
simpleLocalFile, false);
+        localizedList.add(simpleLocal);
+        
+        AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops);
+        ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
+        Utils origUtils = Utils.setInstance(mockedU);
+        try {
+            when(mockedCU.supervisorStormDistRootImpl(conf, 
topoId)).thenReturn(stormRoot);
+            when(mockedCU.readSupervisorStormConfImpl(conf, 
topoId)).thenReturn(topoConf);
+            when(mockedCU.readSupervisorTopologyImpl(conf, topoId, 
ops)).thenReturn(st);
+            
+            when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir);
+            
+            when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), 
eq(userDir))).thenReturn(localizedList);
+            
+            Future<Void> f = al.requestDownloadTopologyBlobs(la, port);
+            f.get(20, TimeUnit.SECONDS);
+            // We should be done now...
+            
+            verify(localizer).getLocalUserFileCacheDir(user);
+            verify(ops).fileExists(userDir);
+            verify(ops).forceMkdir(userDir);
+            
+            verify(localizer).getBlobs(any(List.class), eq(user), 
eq(topoName), eq(userDir));
+
+            verify(ops).createSymlink(new File(stormRoot, simpleLocalName), 
new File(simpleCurrentLocalFile));
+        } finally {
+            al.shutdown();
+            ConfigUtils.setInstance(orig);
+            Utils.setInstance(origUtils);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
 
b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
index ab2c9af..04b5ab2 100644
--- 
a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
+++ 
b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java
@@ -34,8 +34,8 @@ public class LocalizedResourceRetentionSetTest {
     // check adding reference to local resource with topology of same name
     localresource2.addReference(("topo2"));
 
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, false);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, false);
     lrretset.addResources(lrset);
     assertEquals("number to clean is not 0", 0, 
lrretset.getSizeWithNoReferences());
     localresource1.removeReference(("topo1"));
@@ -64,9 +64,9 @@ public class LocalizedResourceRetentionSetTest {
     // check adding reference to local resource with topology of same name
     localresource2.addReference(("topo1"));
     localresource2.setSize(10);
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, false);
-    lrset.addResource("archive1", archiveresource1, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, false);
+    lrset.add("archive1", archiveresource1, true);
 
     lrretset.addResources(lrset);
     assertEquals("number to clean is not 2", 2, 
lrretset.getSizeWithNoReferences());

http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java 
b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
index 7f20d19..550d695 100644
--- 
a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
+++ 
b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java
@@ -37,9 +37,9 @@ public class LocalizedResourceSetTest {
     LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
     assertEquals("size is wrong", 0, lrset.getSize());
-    lrset.addResource("key1", localresource1, false);
+    lrset.add("key1", localresource1, false);
     assertEquals("size is wrong", 1, lrset.getSize());
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key2", localresource2, true);
     assertEquals("size is wrong", 2, lrset.getSize());
   }
 
@@ -48,8 +48,8 @@ public class LocalizedResourceSetTest {
     LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
     LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, true);
     assertEquals("get doesn't return same object", localresource1, 
lrset.get("key1", false));
     assertEquals("get doesn't return same object", localresource2, 
lrset.get("key2", true));
 
@@ -60,8 +60,8 @@ public class LocalizedResourceSetTest {
     LocalizedResourceSet lrset = new LocalizedResourceSet(user1);
     LocalizedResource localresource1 = new LocalizedResource("key1", 
"testfile1", false, "topo1");
     LocalizedResource localresource2 = new LocalizedResource("key2", 
"testfile2", true, "topo1");
-    lrset.addResource("key1", localresource1, false);
-    lrset.addResource("key2", localresource2, true);
+    lrset.add("key1", localresource1, false);
+    lrset.add("key2", localresource2, true);
     assertEquals("doesn't exist", true, lrset.exists("key1", false));
     assertEquals("doesn't exist", true, lrset.exists("key2", true));
     boolean val = lrset.remove(localresource1);

Reply via email to