http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index b70e844..f20daf5 100644
--- 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -1,24 +1,16 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.daemon.supervisor;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+package org.apache.storm.daemon.supervisor;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,7 +20,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
@@ -37,84 +28,63 @@ import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.LocalState;
 import org.apache.storm.utils.SimpleVersion;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.utils.LocalState;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class BasicContainerTest {
-    public static class CommandRun {
-        final List<String> cmd;
-        final Map<String, String> env;
-        final File pwd;
-        
-        public CommandRun(List<String> cmd, Map<String, String> env, File pwd) 
{
-            this.cmd = cmd;
-            this.env = env;
-            this.pwd = pwd;
+    private static void setSystemProp(String key, String value) {
+        if (value == null) {
+            System.clearProperty(key);
+        } else {
+            System.setProperty(key, value);
         }
     }
-    
-    public static class MockBasicContainer extends BasicContainer {
-        public MockBasicContainer(ContainerType type, Map<String, Object> 
conf, String supervisorId, int supervisorPort,
-                int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
-                LocalState localState, String workerId, Map<String, Object> 
topoConf, AdvancedFSOps ops,
-                String profileCmd) throws IOException {
-            super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, localState,
-                    workerId, topoConf, ops, profileCmd);
-        }
-
-        public final List<CommandRun> profileCmds = new ArrayList<>();
-        public final List<CommandRun> workerCmds = new ArrayList<>();
-        
 
-        @Override
-        protected Map<String, Object> readTopoConf() throws IOException {
-            return new HashMap<>();
-        }
-        
-        @Override
-        public void createNewWorkerId() {
-            super.createNewWorkerId();
-        }
-        
-        @Override
-        public List<String> substituteChildopts(Object value, int memOnheap) {
-            return super.substituteChildopts(value, memOnheap);
-        }
-               
-        @Override
-        protected boolean runProfilingCommand(List<String> command, 
Map<String, String> env, String logPrefix,
-                File targetDir) throws IOException, InterruptedException {
-            profileCmds.add(new CommandRun(command, env, targetDir));
-            return true;
+    private static void checkpoint(Run r, String... newValues) throws 
Exception {
+        if (newValues.length % 2 != 0) {
+            throw new IllegalArgumentException("Parameters are of the form 
system property name, new value");
         }
-        
-        @Override
-        protected void launchWorkerProcess(List<String> command, Map<String, 
String> env, String logPrefix,
-                ExitCodeCallback processExitCallback, File targetDir) throws 
IOException {
-            workerCmds.add(new CommandRun(command, env, targetDir));
+        Map<String, String> orig = new HashMap<>();
+        try {
+            for (int index = 0; index < newValues.length; index += 2) {
+                String key = newValues[index];
+                String value = newValues[index + 1];
+                orig.put(key, System.getProperty(key));
+                setSystemProp(key, value);
+            }
+            r.run();
+        } finally {
+            for (Map.Entry<String, String> entry : orig.entrySet()) {
+                setSystemProp(entry.getKey(), entry.getValue());
+            }
         }
-        
-        @Override
-        protected String javaCmd(String cmd) {
-            //avoid system dependent things
-            return cmd;
+    }
+
+    private static <T> void assertListEquals(List<T> a, List<T> b) {
+        if (a == null) {
+            assertNull(b);
         }
-        
-        @Override
-        protected List<String> frameworkClasspath(SimpleVersion version) {
-            //We are not really running anything so make this
-            // simple to check for
-            return Arrays.asList("FRAMEWORK_CP");
+        if (b == null) {
+            assertNull(a);
         }
-        
-        @Override
-        protected String javaLibraryPath(String stormRoot, Map<String, Object> 
conf) {
-            return "JLP";
+        int commonLen = Math.min(a.size(), b.size());
+        for (int i = 0; i < commonLen; i++) {
+            assertEquals("at index " + i + "\n" + a + " !=\n" + b + "\n", 
a.get(i), b.get(i));
         }
+
+        assertEquals("size of lists don't match \n" + a + " !=\n" + b, 
a.size(), b.size());
     }
-    
+
     @Test
     public void testCreateNewWorkerId() throws Exception {
         final String topoId = "test_topology";
@@ -122,24 +92,25 @@ public class BasicContainerTest {
         final int port = 8080;
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         Map<String, Object> superConf = new HashMap<>();
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-        
+
         LocalState ls = mock(LocalState.class);
-        
-        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
-                "SUPERVISOR", supervisorPort, port, la, null, ls, null, new 
HashMap<>(), ops, "profile");
+
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf,
+                                                       "SUPERVISOR", 
supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
+                                                       "profile");
         //null worker id means generate one...
-        
+
         assertNotNull(mc._workerId);
         verify(ls).getApprovedWorkers();
         Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
         expectedNewState.put(mc._workerId, port);
         verify(ls).setApprovedWorkers(expectedNewState);
     }
-    
+
     @Test
     public void testRecovery() throws Exception {
         final String topoId = "test_topology";
@@ -148,23 +119,24 @@ public class BasicContainerTest {
         final int port = 8080;
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         Map<String, Integer> workerState = new HashMap<String, Integer>();
         workerState.put(workerId, port);
-        
+
         LocalState ls = mock(LocalState.class);
         when(ls.getApprovedWorkers()).thenReturn(workerState);
-        
+
         Map<String, Object> superConf = new HashMap<>();
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-        
-        MockBasicContainer mc = new 
MockBasicContainer(ContainerType.RECOVER_FULL, superConf, 
-                "SUPERVISOR", supervisorPort, port, la, null, ls, null, new 
HashMap<>(), ops, "profile");
-        
+
+        MockBasicContainer mc = new 
MockBasicContainer(ContainerType.RECOVER_FULL, superConf,
+                                                       "SUPERVISOR", 
supervisorPort, port, la, null, ls, null, new HashMap<>(), ops,
+                                                       "profile");
+
         assertEquals(workerId, mc._workerId);
     }
-    
+
     @Test
     public void testRecoveryMiss() throws Exception {
         final String topoId = "test_topology";
@@ -172,22 +144,22 @@ public class BasicContainerTest {
         final int port = 8080;
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         Map<String, Integer> workerState = new HashMap<String, Integer>();
-        workerState.put("somethingelse", port+1);
-        
+        workerState.put("somethingelse", port + 1);
+
         LocalState ls = mock(LocalState.class);
         when(ls.getApprovedWorkers()).thenReturn(workerState);
-        
+
         try {
-            new MockBasicContainer(ContainerType.RECOVER_FULL, new 
HashMap<String, Object>(), 
-                    "SUPERVISOR", supervisorPort, port, la, null, ls, null, 
new HashMap<>(), null, "profile");
+            new MockBasicContainer(ContainerType.RECOVER_FULL, new 
HashMap<String, Object>(),
+                                   "SUPERVISOR", supervisorPort, port, la, 
null, ls, null, new HashMap<>(), null, "profile");
             fail("Container recovered worker incorrectly");
         } catch (ContainerRecoveryException e) {
             //Expected
         }
     }
-    
+
     @Test
     public void testCleanUp() throws Exception {
         final String topoId = "test_topology";
@@ -196,28 +168,29 @@ public class BasicContainerTest {
         final String workerId = "worker-id";
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         Map<String, Object> superConf = new HashMap<>();
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-        
+
         Map<String, Integer> workerState = new HashMap<String, Integer>();
         workerState.put(workerId, port);
-        
+
         LocalState ls = mock(LocalState.class);
         when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
-        
-        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, 
new HashMap<>(), ops, "profile");
-        
+
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf,
+                                                       "SUPERVISOR", 
supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
+                                                       "profile");
+
         mc.cleanUp();
-        
+
         assertNull(mc._workerId);
         verify(ls).getApprovedWorkers();
         Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
         verify(ls).setApprovedWorkers(expectedNewState);
     }
-    
+
     @Test
     public void testRunProfiling() throws Exception {
         final long pid = 100;
@@ -228,138 +201,92 @@ public class BasicContainerTest {
         final String stormLocal = ContainerTest.asAbsPath("tmp", "testing");
         final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, 
String.valueOf(port));
         final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, 
"worker.pid");
-        
+
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
         superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
-        
+
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         
when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid));
-        
+
         LocalState ls = mock(LocalState.class);
-        
-        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, 
new HashMap<>(), ops, "profile");
-        
+
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf,
+                                                       "SUPERVISOR", 
supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
+                                                       "profile");
+
         //HEAP DUMP
         ProfileRequest req = new ProfileRequest();
         req.set_action(ProfileAction.JMAP_DUMP);
-        
+
         mc.runProfiling(req, false);
-        
+
         assertEquals(1, mc.profileCmds.size());
         CommandRun cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", 
topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
-        
+
         //JSTACK DUMP
         req.set_action(ProfileAction.JSTACK_DUMP);
-        
+
         mc.runProfiling(req, false);
-        
+
         assertEquals(1, mc.profileCmds.size());
         cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", 
topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
-        
+
         //RESTART
         req.set_action(ProfileAction.JVM_RESTART);
-        
+
         mc.runProfiling(req, false);
-        
+
         assertEquals(1, mc.profileCmds.size());
         cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), 
cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
-        
+
         //JPROFILE DUMP
         req.set_action(ProfileAction.JPROFILE_DUMP);
-        
+
         mc.runProfiling(req, false);
-        
+
         assertEquals(1, mc.profileCmds.size());
         cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", 
topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
-        
+
         //JPROFILE START
         req.set_action(ProfileAction.JPROFILE_STOP);
-        
+
         mc.runProfiling(req, false);
-        
+
         assertEquals(1, mc.profileCmds.size());
         cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), 
cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
-        
+
         //JPROFILE STOP
         req.set_action(ProfileAction.JPROFILE_STOP);
-        
+
         mc.runProfiling(req, true);
-        
+
         assertEquals(1, mc.profileCmds.size());
         cmd = mc.profileCmds.get(0);
         mc.profileCmds.clear();
         assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", 
topoRoot), cmd.cmd);
         assertEquals(new File(topoRoot), cmd.pwd);
     }
-    
-    private static void setSystemProp(String key, String value) {
-        if (value == null) {
-            System.clearProperty(key);
-        } else {
-            System.setProperty(key, value);
-        }
-    }
-    
-    private static interface Run {
-        public void run() throws Exception;
-    }
-    
-    private static void checkpoint(Run r, String ... newValues) throws 
Exception {
-        if (newValues.length % 2 != 0) {
-            throw new IllegalArgumentException("Parameters are of the form 
system property name, new value");
-        }
-        Map<String, String> orig = new HashMap<>();
-        try {
-            for (int index = 0; index < newValues.length; index += 2) {
-                String key = newValues[index];
-                String value = newValues[index + 1];
-                orig.put(key, System.getProperty(key));
-                setSystemProp(key, value);
-            }
-            r.run();
-        } finally {
-            for (Map.Entry<String, String> entry: orig.entrySet()) {
-                setSystemProp(entry.getKey(), entry.getValue());
-            }
-        }
-    }
-    
-    private static <T> void assertListEquals(List<T> a, List<T> b) {
-        if (a == null) {
-            assertNull(b);
-        }
-        if (b == null) {
-            assertNull(a);
-        }
-        int commonLen = Math.min(a.size(), b.size());
-        for (int i = 0; i < commonLen; i++) {
-            assertEquals("at index "+i+"\n"+a+" !=\n"+b+"\n", a.get(i), 
b.get(i));
-        }
-        
-        assertEquals("size of lists don't match \n"+a+" !=\n"+b, a.size(), 
b.size());
-    }
-    
+
     @Test
     public void testLaunch() throws Exception {
         final String topoId = "test_topology_current";
@@ -376,86 +303,88 @@ public class BasicContainerTest {
         final String workerConf = ContainerTest.asAbsPath(log4jdir, 
"worker.xml");
         final String workerRoot = ContainerTest.asAbsPath(stormLocal, 
"workers", workerId);
         final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
-        
+
         final StormTopology st = new StormTopology();
         st.set_spouts(new HashMap<>());
         st.set_bolts(new HashMap<>());
         st.set_state_spouts(new HashMap<>());
-        byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st));
-        
+        byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
+
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
         superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
         superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
         superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
-        
+
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         when(ops.slurp(stormcode)).thenReturn(serializedState);
-        
+
         LocalState ls = mock(LocalState.class);
-        
+
         checkpoint(() -> {
-            MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
-                    "SUPERVISOR", supervisorPort, port, la, null, ls, 
workerId, new HashMap<>(), ops, "profile");
-
-            mc.launch();
-
-            assertEquals(1, mc.workerCmds.size());
-            CommandRun cmd = mc.workerCmds.get(0);
-            mc.workerCmds.clear();
-            assertListEquals(Arrays.asList(
-                    "java",
-                    "-cp",
-                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                    "-Dlogging.sensitivity=S3",
-                    "-Dlogfile.name=worker.log",
-                    "-Dstorm.home=" + stormHome,
-                    "-Dworkers.artifacts=" + stormLocal,
-                    "-Dstorm.id=" + topoId,
-                    "-Dworker.id=" + workerId,
-                    "-Dworker.port=" + port,
-                    "-Dstorm.log.dir=" + stormLogDir,
-                    "-Dlog4j.configurationFile=" + workerConf,
-                    
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                    "-Dstorm.local.dir=" + stormLocal,
-                    "-Dworker.memory_limit_mb=768",
-                    "org.apache.storm.LogWriter",
-                    "java",
-                    "-server",
-                    "-Dlogging.sensitivity=S3",
-                    "-Dlogfile.name=worker.log",
-                    "-Dstorm.home=" + stormHome,
-                    "-Dworkers.artifacts=" + stormLocal,
-                    "-Dstorm.id=" + topoId,
-                    "-Dworker.id=" + workerId,
-                    "-Dworker.port=" + port,
-                    "-Dstorm.log.dir=" + stormLogDir,
-                    "-Dlog4j.configurationFile=" + workerConf,
-                    
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                    "-Dstorm.local.dir=" + stormLocal,
-                    "-Dworker.memory_limit_mb=768",
-                    "-Dtesting=true",
-                    "-Djava.library.path=JLP",
-                    "-Dstorm.conf.file=",
-                    "-Dstorm.options=",
-                    "-Djava.io.tmpdir="+workerTmpDir,
-                    "-cp",
-                    "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                    "org.apache.storm.daemon.worker.Worker",
-                    topoId,
-                    "SUPERVISOR",
-                    String.valueOf(supervisorPort),
-                    String.valueOf(port),
-                    workerId
-                    ), cmd.cmd);
-            assertEquals(new File(workerRoot), cmd.pwd);
-          },
-          "storm.home", stormHome,
-          "storm.log.dir", stormLogDir);
+                       MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
+                                                                      
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
+                                                                          
HashMap<>(), ops,
+                                                                      
"profile");
+
+                       mc.launch();
+
+                       assertEquals(1, mc.workerCmds.size());
+                       CommandRun cmd = mc.workerCmds.get(0);
+                       mc.workerCmds.clear();
+                       assertListEquals(Arrays.asList(
+                           "java",
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "org.apache.storm.LogWriter",
+                           "java",
+                           "-server",
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "-Dtesting=true",
+                           "-Djava.library.path=JLP",
+                           "-Dstorm.conf.file=",
+                           "-Dstorm.options=",
+                           "-Djava.io.tmpdir=" + workerTmpDir,
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "org.apache.storm.daemon.worker.Worker",
+                           topoId,
+                           "SUPERVISOR",
+                           String.valueOf(supervisorPort),
+                           String.valueOf(port),
+                           workerId
+                       ), cmd.cmd);
+                       assertEquals(new File(workerRoot), cmd.pwd);
+                   },
+                   "storm.home", stormHome,
+                   "storm.log.dir", stormLogDir);
     }
 
     @Test
@@ -482,7 +411,7 @@ public class BasicContainerTest {
 
         // minimum 1.x version of supporting STORM-2448 would be 1.0.4
         st.set_storm_version("1.0.4");
-        byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st));
+        byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
 
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
@@ -500,62 +429,64 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
 
         checkpoint(() -> {
-                    MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
-                            "SUPERVISOR", supervisorPort, port, la, null, ls, 
workerId, new HashMap<>(), ops, "profile");
-
-                    mc.launch();
-
-                    assertEquals(1, mc.workerCmds.size());
-                    CommandRun cmd = mc.workerCmds.get(0);
-                    mc.workerCmds.clear();
-                    assertListEquals(Arrays.asList(
-                            "java",
-                            "-cp",
-                            "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                            "-Dlogging.sensitivity=S3",
-                            "-Dlogfile.name=worker.log",
-                            "-Dstorm.home=" + stormHome,
-                            "-Dworkers.artifacts=" + stormLocal,
-                            "-Dstorm.id=" + topoId,
-                            "-Dworker.id=" + workerId,
-                            "-Dworker.port=" + port,
-                            "-Dstorm.log.dir=" + stormLogDir,
-                            "-Dlog4j.configurationFile=" + workerConf,
-                            
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                            "-Dstorm.local.dir=" + stormLocal,
-                            "-Dworker.memory_limit_mb=768",
-                            "org.apache.storm.LogWriter",
-                            "java",
-                            "-server",
-                            "-Dlogging.sensitivity=S3",
-                            "-Dlogfile.name=worker.log",
-                            "-Dstorm.home=" + stormHome,
-                            "-Dworkers.artifacts=" + stormLocal,
-                            "-Dstorm.id=" + topoId,
-                            "-Dworker.id=" + workerId,
-                            "-Dworker.port=" + port,
-                            "-Dstorm.log.dir=" + stormLogDir,
-                            "-Dlog4j.configurationFile=" + workerConf,
-                            
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                            "-Dstorm.local.dir=" + stormLocal,
-                            "-Dworker.memory_limit_mb=768",
-                            "-Dtesting=true",
-                            "-Djava.library.path=JLP",
-                            "-Dstorm.conf.file=",
-                            "-Dstorm.options=",
-                            "-Djava.io.tmpdir="+workerTmpDir,
-                            "-cp",
-                            "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                            "org.apache.storm.daemon.worker",
-                            topoId,
-                            "SUPERVISOR",
-                            String.valueOf(port),
-                            workerId
-                    ), cmd.cmd);
-                    assertEquals(new File(workerRoot), cmd.pwd);
-                },
-                "storm.home", stormHome,
-                "storm.log.dir", stormLogDir);
+                       MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
+                                                                      
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
+                                                                          
HashMap<>(), ops,
+                                                                      
"profile");
+
+                       mc.launch();
+
+                       assertEquals(1, mc.workerCmds.size());
+                       CommandRun cmd = mc.workerCmds.get(0);
+                       mc.workerCmds.clear();
+                       assertListEquals(Arrays.asList(
+                           "java",
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "org.apache.storm.LogWriter",
+                           "java",
+                           "-server",
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "-Dtesting=true",
+                           "-Djava.library.path=JLP",
+                           "-Dstorm.conf.file=",
+                           "-Dstorm.options=",
+                           "-Djava.io.tmpdir=" + workerTmpDir,
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "org.apache.storm.daemon.worker",
+                           topoId,
+                           "SUPERVISOR",
+                           String.valueOf(port),
+                           workerId
+                       ), cmd.cmd);
+                       assertEquals(new File(workerRoot), cmd.pwd);
+                   },
+                   "storm.home", stormHome,
+                   "storm.log.dir", stormLogDir);
     }
 
     @Test
@@ -582,7 +513,7 @@ public class BasicContainerTest {
 
         // minimum 0.x version of supporting STORM-2448 would be 0.10.3
         st.set_storm_version("0.10.3");
-        byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st));
+        byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
 
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
@@ -600,62 +531,64 @@ public class BasicContainerTest {
         LocalState ls = mock(LocalState.class);
 
         checkpoint(() -> {
-                    MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
-                            "SUPERVISOR", supervisorPort, port, la, null, ls, 
workerId, new HashMap<>(), ops, "profile");
-
-                    mc.launch();
-
-                    assertEquals(1, mc.workerCmds.size());
-                    CommandRun cmd = mc.workerCmds.get(0);
-                    mc.workerCmds.clear();
-                    assertListEquals(Arrays.asList(
-                            "java",
-                            "-cp",
-                            "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                            "-Dlogging.sensitivity=S3",
-                            "-Dlogfile.name=worker.log",
-                            "-Dstorm.home=" + stormHome,
-                            "-Dworkers.artifacts=" + stormLocal,
-                            "-Dstorm.id=" + topoId,
-                            "-Dworker.id=" + workerId,
-                            "-Dworker.port=" + port,
-                            "-Dstorm.log.dir=" + stormLogDir,
-                            "-Dlog4j.configurationFile=" + workerConf,
-                            
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                            "-Dstorm.local.dir=" + stormLocal,
-                            "-Dworker.memory_limit_mb=768",
-                            "backtype.storm.LogWriter",
-                            "java",
-                            "-server",
-                            "-Dlogging.sensitivity=S3",
-                            "-Dlogfile.name=worker.log",
-                            "-Dstorm.home=" + stormHome,
-                            "-Dworkers.artifacts=" + stormLocal,
-                            "-Dstorm.id=" + topoId,
-                            "-Dworker.id=" + workerId,
-                            "-Dworker.port=" + port,
-                            "-Dstorm.log.dir=" + stormLogDir,
-                            "-Dlog4j.configurationFile=" + workerConf,
-                            
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
-                            "-Dstorm.local.dir=" + stormLocal,
-                            "-Dworker.memory_limit_mb=768",
-                            "-Dtesting=true",
-                            "-Djava.library.path=JLP",
-                            "-Dstorm.conf.file=",
-                            "-Dstorm.options=",
-                            "-Djava.io.tmpdir="+workerTmpDir,
-                            "-cp",
-                            "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
-                            "backtype.storm.daemon.worker",
-                            topoId,
-                            "SUPERVISOR",
-                            String.valueOf(port),
-                            workerId
-                    ), cmd.cmd);
-                    assertEquals(new File(workerRoot), cmd.pwd);
-                },
-                "storm.home", stormHome,
-                "storm.log.dir", stormLogDir);
+                       MockBasicContainer mc = new 
MockBasicContainer(ContainerType.LAUNCH, superConf,
+                                                                      
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new
+                                                                          
HashMap<>(), ops,
+                                                                      
"profile");
+
+                       mc.launch();
+
+                       assertEquals(1, mc.workerCmds.size());
+                       CommandRun cmd = mc.workerCmds.get(0);
+                       mc.workerCmds.clear();
+                       assertListEquals(Arrays.asList(
+                           "java",
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "backtype.storm.LogWriter",
+                           "java",
+                           "-server",
+                           "-Dlogging.sensitivity=S3",
+                           "-Dlogfile.name=worker.log",
+                           "-Dstorm.home=" + stormHome,
+                           "-Dworkers.artifacts=" + stormLocal,
+                           "-Dstorm.id=" + topoId,
+                           "-Dworker.id=" + workerId,
+                           "-Dworker.port=" + port,
+                           "-Dstorm.log.dir=" + stormLogDir,
+                           "-Dlog4j.configurationFile=" + workerConf,
+                           
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
+                           "-Dstorm.local.dir=" + stormLocal,
+                           "-Dworker.memory_limit_mb=768",
+                           "-Dtesting=true",
+                           "-Djava.library.path=JLP",
+                           "-Dstorm.conf.file=",
+                           "-Dstorm.options=",
+                           "-Djava.io.tmpdir=" + workerTmpDir,
+                           "-cp",
+                           "FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
+                           "backtype.storm.daemon.worker",
+                           topoId,
+                           "SUPERVISOR",
+                           String.valueOf(port),
+                           workerId
+                       ), cmd.cmd);
+                       assertEquals(new File(workerRoot), cmd.pwd);
+                   },
+                   "storm.home", stormHome,
+                   "storm.log.dir", stormLogDir);
     }
 
     @Test
@@ -665,33 +598,112 @@ public class BasicContainerTest {
         int supervisorPort = 6628;
         int port = 9999;
         int memOnheap = 512;
-        
+
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        
+
         Map<String, Object> superConf = new HashMap<>();
-        
+
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
-        
+
         LocalState ls = mock(LocalState.class);
-        
-        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf, 
-                "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, 
new HashMap<>(), ops, "profile");
-        
+
+        MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, 
superConf,
+                                                       "SUPERVISOR", 
supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops,
+                                                       "profile");
+
         assertListEquals(Arrays.asList(
-                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
-                "-Xms256m",
-                "-Xmx512m"),
-                
mc.substituteChildopts("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log
 -Xms256m -Xmx%HEAP-MEM%m", memOnheap));
-        
+            "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+            "-Xms256m",
+            "-Xmx512m"),
+                         mc.substituteChildopts(
+                             
"-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log
 -Xms256m -Xmx%HEAP-MEM%m",
+                             memOnheap));
+
         assertListEquals(Arrays.asList(
-                "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
-                "-Xms256m",
-                "-Xmx512m"),
-                
mc.substituteChildopts(Arrays.asList("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log","-Xms256m","-Xmx%HEAP-MEM%m"),
 memOnheap));
-        
-        assertListEquals(Collections.emptyList(), 
-                mc.substituteChildopts(null));
+            "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+            "-Xms256m",
+            "-Xmx512m"),
+                         mc.substituteChildopts(Arrays.asList(
+                             
"-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log",
 "-Xms256m",
+                             "-Xmx%HEAP-MEM%m"), memOnheap));
+
+        assertListEquals(Collections.emptyList(),
+                         mc.substituteChildopts(null));
+    }
+
+    private static interface Run {
+        public void run() throws Exception;
+    }
+
+    public static class CommandRun {
+        final List<String> cmd;
+        final Map<String, String> env;
+        final File pwd;
+
+        public CommandRun(List<String> cmd, Map<String, String> env, File pwd) 
{
+            this.cmd = cmd;
+            this.env = env;
+            this.pwd = pwd;
+        }
+    }
+
+    public static class MockBasicContainer extends BasicContainer {
+        public final List<CommandRun> profileCmds = new ArrayList<>();
+        public final List<CommandRun> workerCmds = new ArrayList<>();
+        public MockBasicContainer(ContainerType type, Map<String, Object> 
conf, String supervisorId, int supervisorPort,
+                                  int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
+                                  LocalState localState, String workerId, 
Map<String, Object> topoConf, AdvancedFSOps ops,
+                                  String profileCmd) throws IOException {
+            super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, localState,
+                  workerId, topoConf, ops, profileCmd);
+        }
+
+        @Override
+        protected Map<String, Object> readTopoConf() throws IOException {
+            return new HashMap<>();
+        }
+
+        @Override
+        public void createNewWorkerId() {
+            super.createNewWorkerId();
+        }
+
+        @Override
+        public List<String> substituteChildopts(Object value, int memOnheap) {
+            return super.substituteChildopts(value, memOnheap);
+        }
+
+        @Override
+        protected boolean runProfilingCommand(List<String> command, 
Map<String, String> env, String logPrefix,
+                                              File targetDir) throws 
IOException, InterruptedException {
+            profileCmds.add(new CommandRun(command, env, targetDir));
+            return true;
+        }
+
+        @Override
+        protected void launchWorkerProcess(List<String> command, Map<String, 
String> env, String logPrefix,
+                                           ExitCodeCallback 
processExitCallback, File targetDir) throws IOException {
+            workerCmds.add(new CommandRun(command, env, targetDir));
+        }
+
+        @Override
+        protected String javaCmd(String cmd) {
+            //avoid system dependent things
+            return cmd;
+        }
+
+        @Override
+        protected List<String> frameworkClasspath(SimpleVersion version) {
+            //We are not really running anything so make this
+            // simple to check for
+            return Arrays.asList("FRAMEWORK_CP");
+        }
+
+        @Override
+        protected String javaLibraryPath(String stormRoot, Map<String, Object> 
conf) {
+            return "JLP";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
index c020244..b510838 100644
--- 
a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java
@@ -1,25 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The 
ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with 
the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
See the License for the specific language governing permissions
+ * and limitations under the License.
  */
-package org.apache.storm.daemon.supervisor;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+package org.apache.storm.daemon.supervisor;
 
+import com.google.common.base.Joiner;
 import java.io.File;
 import java.io.IOException;
 import java.io.StringWriter;
@@ -31,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.container.ResourceIsolationInterface;
@@ -41,58 +33,33 @@ import org.apache.storm.generated.ProfileRequest;
 import org.junit.Test;
 import org.yaml.snakeyaml.Yaml;
 
-import com.google.common.base.Joiner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ContainerTest {
-    public static class MockContainer extends Container {
-        
-        protected MockContainer(ContainerType type, Map<String, Object> conf, 
String supervisorId, int supervisorPort,
-                int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
-                String workerId, Map<String, Object> topoConf, AdvancedFSOps 
ops) throws IOException {
-            super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, workerId,
-                    topoConf, ops);
-        }
-
-        public final List<Long> killedPids = new ArrayList<>();
-        public final List<Long> forceKilledPids = new ArrayList<>();
-        public final Set<Long> allPids = new HashSet<>();
+    private static final Joiner PATH_JOIN = 
Joiner.on(File.separator).skipNulls();
+    private static final String DOUBLE_SEP = File.separator + File.separator;
 
-        @Override
-        protected void kill(long pid) {
-            killedPids.add(pid);
-        }
-        
-        @Override
-        protected void forceKill(long pid) {
-            forceKilledPids.add(pid);
-        }
-        
-        @Override
-        protected Set<Long> getAllPids() throws IOException {
-            return allPids;
-        }
-        
-        @Override
-        public void launch() throws IOException {
-            fail("THIS IS NOT UNDER TEST");
-        }
+    static String asAbsPath(String... parts) {
+        return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, 
File.separator);
+    }
 
-        @Override
-        public void relaunch() throws IOException {
-            fail("THIS IS NOT UNDER TEST");
-        }
+    static File asAbsFile(String... parts) {
+        return new File(asAbsPath(parts));
+    }
 
-        @Override
-        public boolean didMainProcessExit() {
-            fail("THIS IS NOT UNDER TEST");
-            return false;
-        }
+    static String asPath(String... parts) {
+        return PATH_JOIN.join(parts);
+    }
 
-        @Override
-        public boolean runProfiling(ProfileRequest request, boolean stop) 
throws IOException, InterruptedException {
-            fail("THIS IS NOT UNDER TEST");
-            return false;
-        }
+    public static File asFile(String... parts) {
+        return new File(asPath(parts));
     }
 
     @Test
@@ -103,46 +70,28 @@ public class ContainerTest {
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
-        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
-                "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), 
ops);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
+                                             "SUPERVISOR", 6628, 8080, la, 
null, "worker", new HashMap<>(), ops);
         mc.kill();
         assertEquals(Collections.EMPTY_LIST, mc.killedPids);
         assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
         mc.forceKill();
         assertEquals(Collections.EMPTY_LIST, mc.killedPids);
         assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
-        
+
         long pid = 987654321;
         mc.allPids.add(pid);
-        
+
         mc.kill();
         assertEquals(mc.allPids, new HashSet<>(mc.killedPids));
         assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids);
         mc.killedPids.clear();
-        
+
         mc.forceKill();
         assertEquals(Collections.EMPTY_LIST, mc.killedPids);
         assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids));
     }
-    
-    private static final Joiner PATH_JOIN = 
Joiner.on(File.separator).skipNulls();
-    private static final String DOUBLE_SEP = File.separator + File.separator;  
  
-    static String asAbsPath(String ... parts) {
-        return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, 
File.separator);
-    }
-    
-    static File asAbsFile(String ... parts) {
-        return new File(asAbsPath(parts));
-    }
-    
-    static String asPath(String ... parts) {
-        return PATH_JOIN.join(parts);
-    }
-    
-    public static File asFile(String ... parts) {
-        return new File(asPath(parts));
-    }
-    
+
     @SuppressWarnings("unchecked")
     @Test
     public void testSetup() throws Exception {
@@ -156,11 +105,11 @@ public class ContainerTest {
         final File workerUserFile = asAbsFile(stormLocal, "workers-users", 
workerId);
         final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
         final File distRoot = asAbsFile(stormLocal, "supervisor", "stormdist", 
topoId);
-        
+
         final Map<String, Object> topoConf = new HashMap<>();
         final List<String> topoUsers = Arrays.asList("t-user-a", "t-user-b");
         final List<String> logUsers = Arrays.asList("l-user-a", "l-user-b");
-        
+
         final List<String> topoGroups = Arrays.asList("t-group-a", 
"t-group-b");
         final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b");
 
@@ -168,36 +117,36 @@ public class ContainerTest {
         topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups);
         topoConf.put(DaemonConfig.LOGS_USERS, logUsers);
         topoConf.put(Config.TOPOLOGY_USERS, topoUsers);
-        
+
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
         superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
-        
+
         final StringWriter yamlDump = new StringWriter();
-        
+
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         when(ops.fileExists(workerArtifacts)).thenReturn(true);
         when(ops.fileExists(workerRoot)).thenReturn(true);
         when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
-        
+
         LocalAssignment la = new LocalAssignment();
         la.set_topology_id(topoId);
         la.set_owner(user);
-        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
-                "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops);
-        
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
+                                             "SUPERVISOR", 6628, 8080, la, 
null, workerId, topoConf, ops);
+
         mc.setup();
-        
+
         //Initial Setup
         verify(ops).forceMkdir(new File(workerRoot, "pids"));
         verify(ops).forceMkdir(new File(workerRoot, "tmp"));
         verify(ops).forceMkdir(new File(workerRoot, "heartbeats"));
         verify(ops).fileExists(workerArtifacts);
-        
+
         //Log file permissions
         verify(ops).getWriter(logMetadataFile);
-        
+
         String yamlResult = yamlDump.toString();
         Yaml yaml = new Yaml();
         Map<String, Object> result = (Map<String, Object>) 
yaml.load(yamlResult);
@@ -205,22 +154,22 @@ public class ContainerTest {
         assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER));
         HashSet<String> allowedUsers = new HashSet<>(topoUsers);
         allowedUsers.addAll(logUsers);
-        assertEquals(allowedUsers, new 
HashSet<String>((List<String>)result.get(DaemonConfig.LOGS_USERS)));
-        
+        assertEquals(allowedUsers, new HashSet<String>((List<String>) 
result.get(DaemonConfig.LOGS_USERS)));
+
         HashSet<String> allowedGroups = new HashSet<>(topoGroups);
         allowedGroups.addAll(logGroups);
-        assertEquals(allowedGroups, new 
HashSet<String>((List<String>)result.get(DaemonConfig.LOGS_GROUPS)));
-        
+        assertEquals(allowedGroups, new HashSet<String>((List<String>) 
result.get(DaemonConfig.LOGS_GROUPS)));
+
         //Save the current user to help with recovery
         verify(ops).dump(workerUserFile, user);
-        
+
         //Create links to artifacts dir
         verify(ops).createSymlink(new File(workerRoot, "artifacts"), 
workerArtifacts);
-        
-        //Create links to blobs 
+
+        //Create links to blobs
         verify(ops, never()).createSymlink(new File(workerRoot, "resources"), 
new File(distRoot, "resources"));
     }
-    
+
     @Test
     public void testCleanup() throws Exception {
         final int supervisorPort = 6628;
@@ -235,38 +184,88 @@ public class ContainerTest {
         final File workerUserFile = asAbsFile(stormLocal, "workers-users", 
workerId);
         final File workerRoot = asAbsFile(stormLocal, "workers", workerId);
         final File workerPidsRoot = new File(workerRoot, "pids");
-        
+
         final Map<String, Object> topoConf = new HashMap<>();
-        
+
         final Map<String, Object> superConf = new HashMap<>();
         superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
         superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
-        
+
         final StringWriter yamlDump = new StringWriter();
-        
+
         AdvancedFSOps ops = mock(AdvancedFSOps.class);
         when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
         when(ops.fileExists(workerArtifacts)).thenReturn(true);
         when(ops.fileExists(workerRoot)).thenReturn(true);
         when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump);
-        
+
         ResourceIsolationInterface iso = 
mock(ResourceIsolationInterface.class);
-        
+
         LocalAssignment la = new LocalAssignment();
         la.set_owner(user);
         la.set_topology_id(topoId);
-        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, 
-                "SUPERVISOR", supervisorPort, port, la, iso, workerId, 
topoConf, ops);
+        MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf,
+                                             "SUPERVISOR", supervisorPort, 
port, la, iso, workerId, topoConf, ops);
         mc.allPids.add(pid);
-        
+
         mc.cleanUp();
         verify(ops).deleteIfExists(eq(new File(workerPidsRoot, 
String.valueOf(pid))), eq(user), any(String.class));
         verify(iso).releaseResourcesForWorker(workerId);
-        
+
         verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), 
any(String.class));
         verify(ops).deleteIfExists(eq(new File(workerRoot, "tmp")), eq(user), 
any(String.class));
         verify(ops).deleteIfExists(eq(new File(workerRoot, "heartbeats")), 
eq(user), any(String.class));
         verify(ops).deleteIfExists(eq(workerRoot), eq(user), 
any(String.class));
         verify(ops).deleteIfExists(workerUserFile);
     }
+
+    public static class MockContainer extends Container {
+
+        public final List<Long> killedPids = new ArrayList<>();
+        public final List<Long> forceKilledPids = new ArrayList<>();
+        public final Set<Long> allPids = new HashSet<>();
+        protected MockContainer(ContainerType type, Map<String, Object> conf, 
String supervisorId, int supervisorPort,
+                                int port, LocalAssignment assignment, 
ResourceIsolationInterface resourceIsolationManager,
+                                String workerId, Map<String, Object> topoConf, 
AdvancedFSOps ops) throws IOException {
+            super(type, conf, supervisorId, supervisorPort, port, assignment, 
resourceIsolationManager, workerId,
+                  topoConf, ops);
+        }
+
+        @Override
+        protected void kill(long pid) {
+            killedPids.add(pid);
+        }
+
+        @Override
+        protected void forceKill(long pid) {
+            forceKilledPids.add(pid);
+        }
+
+        @Override
+        protected Set<Long> getAllPids() throws IOException {
+            return allPids;
+        }
+
+        @Override
+        public void launch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public void relaunch() throws IOException {
+            fail("THIS IS NOT UNDER TEST");
+        }
+
+        @Override
+        public boolean didMainProcessExit() {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+
+        @Override
+        public boolean runProfiling(ProfileRequest request, boolean stop) 
throws IOException, InterruptedException {
+            fail("THIS IS NOT UNDER TEST");
+            return false;
+        }
+    }
 }

Reply via email to