http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java index b70e844..f20daf5 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java @@ -1,24 +1,16 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.daemon.supervisor; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +package org.apache.storm.daemon.supervisor; import java.io.File; import java.io.IOException; @@ -28,7 +20,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.container.ResourceIsolationInterface; @@ -37,84 +28,63 @@ import org.apache.storm.generated.LocalAssignment; import org.apache.storm.generated.ProfileAction; import org.apache.storm.generated.ProfileRequest; import org.apache.storm.generated.StormTopology; +import org.apache.storm.utils.LocalState; import org.apache.storm.utils.SimpleVersion; import org.apache.storm.utils.Utils; -import org.apache.storm.utils.LocalState; import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class BasicContainerTest { - public static class CommandRun { - final List<String> cmd; - final Map<String, String> env; - final File pwd; - - public CommandRun(List<String> cmd, Map<String, String> env, File pwd) { - this.cmd = cmd; - this.env = env; - this.pwd = pwd; + private static void setSystemProp(String key, String value) { + if (value == null) { + System.clearProperty(key); + } else { + System.setProperty(key, value); } } - - public static class MockBasicContainer extends BasicContainer { - public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, - int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, - LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, - String profileCmd) throws IOException { - super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, - workerId, topoConf, ops, profileCmd); - } - - public final List<CommandRun> profileCmds = new ArrayList<>(); - public final List<CommandRun> workerCmds = new ArrayList<>(); - - @Override - protected Map<String, Object> readTopoConf() throws IOException { - return new HashMap<>(); - } - - @Override - public void createNewWorkerId() { - super.createNewWorkerId(); - } - - @Override - public List<String> substituteChildopts(Object value, int memOnheap) { - return super.substituteChildopts(value, memOnheap); - } - - @Override - protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, - File targetDir) throws IOException, InterruptedException { - profileCmds.add(new CommandRun(command, env, targetDir)); - return true; + private static void checkpoint(Run r, String... newValues) throws Exception { + if (newValues.length % 2 != 0) { + throw new IllegalArgumentException("Parameters are of the form system property name, new value"); } - - @Override - protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, - ExitCodeCallback processExitCallback, File targetDir) throws IOException { - workerCmds.add(new CommandRun(command, env, targetDir)); + Map<String, String> orig = new HashMap<>(); + try { + for (int index = 0; index < newValues.length; index += 2) { + String key = newValues[index]; + String value = newValues[index + 1]; + orig.put(key, System.getProperty(key)); + setSystemProp(key, value); + } + r.run(); + } finally { + for (Map.Entry<String, String> entry : orig.entrySet()) { + setSystemProp(entry.getKey(), entry.getValue()); + } } - - @Override - protected String javaCmd(String cmd) { - //avoid system dependent things - return cmd; + } + + private static <T> void assertListEquals(List<T> a, List<T> b) { + if (a == null) { + assertNull(b); } - - @Override - protected List<String> frameworkClasspath(SimpleVersion version) { - //We are not really running anything so make this - // simple to check for - return Arrays.asList("FRAMEWORK_CP"); + if (b == null) { + assertNull(a); } - - @Override - protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) { - return "JLP"; + int commonLen = Math.min(a.size(), b.size()); + for (int i = 0; i < commonLen; i++) { + assertEquals("at index " + i + "\n" + a + " !=\n" + b + "\n", a.get(i), b.get(i)); } + + assertEquals("size of lists don't match \n" + a + " !=\n" + b, a.size(), b.size()); } - + @Test public void testCreateNewWorkerId() throws Exception { final String topoId = "test_topology"; @@ -122,24 +92,25 @@ public class BasicContainerTest { final int port = 8080; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + Map<String, Object> superConf = new HashMap<>(); AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); - + LocalState ls = mock(LocalState.class); - - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops, "profile"); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops, + "profile"); //null worker id means generate one... - + assertNotNull(mc._workerId); verify(ls).getApprovedWorkers(); Map<String, Integer> expectedNewState = new HashMap<String, Integer>(); expectedNewState.put(mc._workerId, port); verify(ls).setApprovedWorkers(expectedNewState); } - + @Test public void testRecovery() throws Exception { final String topoId = "test_topology"; @@ -148,23 +119,24 @@ public class BasicContainerTest { final int port = 8080; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + Map<String, Integer> workerState = new HashMap<String, Integer>(); workerState.put(workerId, port); - + LocalState ls = mock(LocalState.class); when(ls.getApprovedWorkers()).thenReturn(workerState); - + Map<String, Object> superConf = new HashMap<>(); AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); - - MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops, "profile"); - + + MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), ops, + "profile"); + assertEquals(workerId, mc._workerId); } - + @Test public void testRecoveryMiss() throws Exception { final String topoId = "test_topology"; @@ -172,22 +144,22 @@ public class BasicContainerTest { final int port = 8080; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + Map<String, Integer> workerState = new HashMap<String, Integer>(); - workerState.put("somethingelse", port+1); - + workerState.put("somethingelse", port + 1); + LocalState ls = mock(LocalState.class); when(ls.getApprovedWorkers()).thenReturn(workerState); - + try { - new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(), - "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), null, "profile"); + new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(), + "SUPERVISOR", supervisorPort, port, la, null, ls, null, new HashMap<>(), null, "profile"); fail("Container recovered worker incorrectly"); } catch (ContainerRecoveryException e) { //Expected } } - + @Test public void testCleanUp() throws Exception { final String topoId = "test_topology"; @@ -196,28 +168,29 @@ public class BasicContainerTest { final String workerId = "worker-id"; LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + Map<String, Object> superConf = new HashMap<>(); AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); - + Map<String, Integer> workerState = new HashMap<String, Integer>(); workerState.put(workerId, port); - + LocalState ls = mock(LocalState.class); when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState)); - - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, + "profile"); + mc.cleanUp(); - + assertNull(mc._workerId); verify(ls).getApprovedWorkers(); Map<String, Integer> expectedNewState = new HashMap<String, Integer>(); verify(ls).setApprovedWorkers(expectedNewState); } - + @Test public void testRunProfiling() throws Exception { final long pid = 100; @@ -228,138 +201,92 @@ public class BasicContainerTest { final String stormLocal = ContainerTest.asAbsPath("tmp", "testing"); final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, String.valueOf(port)); final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, "worker.pid"); - + final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); - + LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid)); - + LocalState ls = mock(LocalState.class); - - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, + "profile"); + //HEAP DUMP ProfileRequest req = new ProfileRequest(); req.set_action(ProfileAction.JMAP_DUMP); - + mc.runProfiling(req, false); - + assertEquals(1, mc.profileCmds.size()); CommandRun cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", topoRoot), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); - + //JSTACK DUMP req.set_action(ProfileAction.JSTACK_DUMP); - + mc.runProfiling(req, false); - + assertEquals(1, mc.profileCmds.size()); cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", topoRoot), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); - + //RESTART req.set_action(ProfileAction.JVM_RESTART); - + mc.runProfiling(req, false); - + assertEquals(1, mc.profileCmds.size()); cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); - + //JPROFILE DUMP req.set_action(ProfileAction.JPROFILE_DUMP); - + mc.runProfiling(req, false); - + assertEquals(1, mc.profileCmds.size()); cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", topoRoot), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); - + //JPROFILE START req.set_action(ProfileAction.JPROFILE_STOP); - + mc.runProfiling(req, false); - + assertEquals(1, mc.profileCmds.size()); cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); - + //JPROFILE STOP req.set_action(ProfileAction.JPROFILE_STOP); - + mc.runProfiling(req, true); - + assertEquals(1, mc.profileCmds.size()); cmd = mc.profileCmds.get(0); mc.profileCmds.clear(); assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", topoRoot), cmd.cmd); assertEquals(new File(topoRoot), cmd.pwd); } - - private static void setSystemProp(String key, String value) { - if (value == null) { - System.clearProperty(key); - } else { - System.setProperty(key, value); - } - } - - private static interface Run { - public void run() throws Exception; - } - - private static void checkpoint(Run r, String ... newValues) throws Exception { - if (newValues.length % 2 != 0) { - throw new IllegalArgumentException("Parameters are of the form system property name, new value"); - } - Map<String, String> orig = new HashMap<>(); - try { - for (int index = 0; index < newValues.length; index += 2) { - String key = newValues[index]; - String value = newValues[index + 1]; - orig.put(key, System.getProperty(key)); - setSystemProp(key, value); - } - r.run(); - } finally { - for (Map.Entry<String, String> entry: orig.entrySet()) { - setSystemProp(entry.getKey(), entry.getValue()); - } - } - } - - private static <T> void assertListEquals(List<T> a, List<T> b) { - if (a == null) { - assertNull(b); - } - if (b == null) { - assertNull(a); - } - int commonLen = Math.min(a.size(), b.size()); - for (int i = 0; i < commonLen; i++) { - assertEquals("at index "+i+"\n"+a+" !=\n"+b+"\n", a.get(i), b.get(i)); - } - - assertEquals("size of lists don't match \n"+a+" !=\n"+b, a.size(), b.size()); - } - + @Test public void testLaunch() throws Exception { final String topoId = "test_topology_current"; @@ -376,86 +303,88 @@ public class BasicContainerTest { final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml"); final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId); final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp"); - + final StormTopology st = new StormTopology(); st.set_spouts(new HashMap<>()); st.set_bolts(new HashMap<>()); st.set_state_spouts(new HashMap<>()); - byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st)); - + byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st)); + final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir); superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true"); - + LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); when(ops.slurp(stormcode)).thenReturn(serializedState); - + LocalState ls = mock(LocalState.class); - + checkpoint(() -> { - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - - mc.launch(); - - assertEquals(1, mc.workerCmds.size()); - CommandRun cmd = mc.workerCmds.get(0); - mc.workerCmds.clear(); - assertListEquals(Arrays.asList( - "java", - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "org.apache.storm.LogWriter", - "java", - "-server", - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "-Dtesting=true", - "-Djava.library.path=JLP", - "-Dstorm.conf.file=", - "-Dstorm.options=", - "-Djava.io.tmpdir="+workerTmpDir, - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "org.apache.storm.daemon.worker.Worker", - topoId, - "SUPERVISOR", - String.valueOf(supervisorPort), - String.valueOf(port), - workerId - ), cmd.cmd); - assertEquals(new File(workerRoot), cmd.pwd); - }, - "storm.home", stormHome, - "storm.log.dir", stormLogDir); + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new + HashMap<>(), ops, + "profile"); + + mc.launch(); + + assertEquals(1, mc.workerCmds.size()); + CommandRun cmd = mc.workerCmds.get(0); + mc.workerCmds.clear(); + assertListEquals(Arrays.asList( + "java", + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "org.apache.storm.LogWriter", + "java", + "-server", + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "-Dtesting=true", + "-Djava.library.path=JLP", + "-Dstorm.conf.file=", + "-Dstorm.options=", + "-Djava.io.tmpdir=" + workerTmpDir, + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "org.apache.storm.daemon.worker.Worker", + topoId, + "SUPERVISOR", + String.valueOf(supervisorPort), + String.valueOf(port), + workerId + ), cmd.cmd); + assertEquals(new File(workerRoot), cmd.pwd); + }, + "storm.home", stormHome, + "storm.log.dir", stormLogDir); } @Test @@ -482,7 +411,7 @@ public class BasicContainerTest { // minimum 1.x version of supporting STORM-2448 would be 1.0.4 st.set_storm_version("1.0.4"); - byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st)); + byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st)); final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); @@ -500,62 +429,64 @@ public class BasicContainerTest { LocalState ls = mock(LocalState.class); checkpoint(() -> { - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - - mc.launch(); - - assertEquals(1, mc.workerCmds.size()); - CommandRun cmd = mc.workerCmds.get(0); - mc.workerCmds.clear(); - assertListEquals(Arrays.asList( - "java", - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "org.apache.storm.LogWriter", - "java", - "-server", - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "-Dtesting=true", - "-Djava.library.path=JLP", - "-Dstorm.conf.file=", - "-Dstorm.options=", - "-Djava.io.tmpdir="+workerTmpDir, - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "org.apache.storm.daemon.worker", - topoId, - "SUPERVISOR", - String.valueOf(port), - workerId - ), cmd.cmd); - assertEquals(new File(workerRoot), cmd.pwd); - }, - "storm.home", stormHome, - "storm.log.dir", stormLogDir); + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new + HashMap<>(), ops, + "profile"); + + mc.launch(); + + assertEquals(1, mc.workerCmds.size()); + CommandRun cmd = mc.workerCmds.get(0); + mc.workerCmds.clear(); + assertListEquals(Arrays.asList( + "java", + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "org.apache.storm.LogWriter", + "java", + "-server", + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "-Dtesting=true", + "-Djava.library.path=JLP", + "-Dstorm.conf.file=", + "-Dstorm.options=", + "-Djava.io.tmpdir=" + workerTmpDir, + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "org.apache.storm.daemon.worker", + topoId, + "SUPERVISOR", + String.valueOf(port), + workerId + ), cmd.cmd); + assertEquals(new File(workerRoot), cmd.pwd); + }, + "storm.home", stormHome, + "storm.log.dir", stormLogDir); } @Test @@ -582,7 +513,7 @@ public class BasicContainerTest { // minimum 0.x version of supporting STORM-2448 would be 0.10.3 st.set_storm_version("0.10.3"); - byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st)); + byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st)); final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); @@ -600,62 +531,64 @@ public class BasicContainerTest { LocalState ls = mock(LocalState.class); checkpoint(() -> { - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - - mc.launch(); - - assertEquals(1, mc.workerCmds.size()); - CommandRun cmd = mc.workerCmds.get(0); - mc.workerCmds.clear(); - assertListEquals(Arrays.asList( - "java", - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "backtype.storm.LogWriter", - "java", - "-server", - "-Dlogging.sensitivity=S3", - "-Dlogfile.name=worker.log", - "-Dstorm.home=" + stormHome, - "-Dworkers.artifacts=" + stormLocal, - "-Dstorm.id=" + topoId, - "-Dworker.id=" + workerId, - "-Dworker.port=" + port, - "-Dstorm.log.dir=" + stormLogDir, - "-Dlog4j.configurationFile=" + workerConf, - "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", - "-Dstorm.local.dir=" + stormLocal, - "-Dworker.memory_limit_mb=768", - "-Dtesting=true", - "-Djava.library.path=JLP", - "-Dstorm.conf.file=", - "-Dstorm.options=", - "-Djava.io.tmpdir="+workerTmpDir, - "-cp", - "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), - "backtype.storm.daemon.worker", - topoId, - "SUPERVISOR", - String.valueOf(port), - workerId - ), cmd.cmd); - assertEquals(new File(workerRoot), cmd.pwd); - }, - "storm.home", stormHome, - "storm.log.dir", stormLogDir); + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new + HashMap<>(), ops, + "profile"); + + mc.launch(); + + assertEquals(1, mc.workerCmds.size()); + CommandRun cmd = mc.workerCmds.get(0); + mc.workerCmds.clear(); + assertListEquals(Arrays.asList( + "java", + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "backtype.storm.LogWriter", + "java", + "-server", + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Dworker.memory_limit_mb=768", + "-Dtesting=true", + "-Djava.library.path=JLP", + "-Dstorm.conf.file=", + "-Dstorm.options=", + "-Djava.io.tmpdir=" + workerTmpDir, + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "backtype.storm.daemon.worker", + topoId, + "SUPERVISOR", + String.valueOf(port), + workerId + ), cmd.cmd); + assertEquals(new File(workerRoot), cmd.pwd); + }, + "storm.home", stormHome, + "storm.log.dir", stormLogDir); } @Test @@ -665,33 +598,112 @@ public class BasicContainerTest { int supervisorPort = 6628; int port = 9999; int memOnheap = 512; - + LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - + Map<String, Object> superConf = new HashMap<>(); - + AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); - + LocalState ls = mock(LocalState.class); - - MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); - + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new HashMap<>(), ops, + "profile"); + assertListEquals(Arrays.asList( - "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", - "-Xms256m", - "-Xmx512m"), - mc.substituteChildopts("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m", memOnheap)); - + "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", + "-Xms256m", + "-Xmx512m"), + mc.substituteChildopts( + "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m", + memOnheap)); + assertListEquals(Arrays.asList( - "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", - "-Xms256m", - "-Xmx512m"), - mc.substituteChildopts(Arrays.asList("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log","-Xms256m","-Xmx%HEAP-MEM%m"), memOnheap)); - - assertListEquals(Collections.emptyList(), - mc.substituteChildopts(null)); + "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", + "-Xms256m", + "-Xmx512m"), + mc.substituteChildopts(Arrays.asList( + "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m", + "-Xmx%HEAP-MEM%m"), memOnheap)); + + assertListEquals(Collections.emptyList(), + mc.substituteChildopts(null)); + } + + private static interface Run { + public void run() throws Exception; + } + + public static class CommandRun { + final List<String> cmd; + final Map<String, String> env; + final File pwd; + + public CommandRun(List<String> cmd, Map<String, String> env, File pwd) { + this.cmd = cmd; + this.env = env; + this.pwd = pwd; + } + } + + public static class MockBasicContainer extends BasicContainer { + public final List<CommandRun> profileCmds = new ArrayList<>(); + public final List<CommandRun> workerCmds = new ArrayList<>(); + public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, + int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + LocalState localState, String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, + String profileCmd) throws IOException { + super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState, + workerId, topoConf, ops, profileCmd); + } + + @Override + protected Map<String, Object> readTopoConf() throws IOException { + return new HashMap<>(); + } + + @Override + public void createNewWorkerId() { + super.createNewWorkerId(); + } + + @Override + public List<String> substituteChildopts(Object value, int memOnheap) { + return super.substituteChildopts(value, memOnheap); + } + + @Override + protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, + File targetDir) throws IOException, InterruptedException { + profileCmds.add(new CommandRun(command, env, targetDir)); + return true; + } + + @Override + protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, + ExitCodeCallback processExitCallback, File targetDir) throws IOException { + workerCmds.add(new CommandRun(command, env, targetDir)); + } + + @Override + protected String javaCmd(String cmd) { + //avoid system dependent things + return cmd; + } + + @Override + protected List<String> frameworkClasspath(SimpleVersion version) { + //We are not really running anything so make this + // simple to check for + return Arrays.asList("FRAMEWORK_CP"); + } + + @Override + protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) { + return "JLP"; + } } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java index c020244..b510838 100644 --- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java +++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/ContainerTest.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.daemon.supervisor; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +package org.apache.storm.daemon.supervisor; +import com.google.common.base.Joiner; import java.io.File; import java.io.IOException; import java.io.StringWriter; @@ -31,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.container.ResourceIsolationInterface; @@ -41,58 +33,33 @@ import org.apache.storm.generated.ProfileRequest; import org.junit.Test; import org.yaml.snakeyaml.Yaml; -import com.google.common.base.Joiner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ContainerTest { - public static class MockContainer extends Container { - - protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, - int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, - String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { - super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, workerId, - topoConf, ops); - } - - public final List<Long> killedPids = new ArrayList<>(); - public final List<Long> forceKilledPids = new ArrayList<>(); - public final Set<Long> allPids = new HashSet<>(); + private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls(); + private static final String DOUBLE_SEP = File.separator + File.separator; - @Override - protected void kill(long pid) { - killedPids.add(pid); - } - - @Override - protected void forceKill(long pid) { - forceKilledPids.add(pid); - } - - @Override - protected Set<Long> getAllPids() throws IOException { - return allPids; - } - - @Override - public void launch() throws IOException { - fail("THIS IS NOT UNDER TEST"); - } + static String asAbsPath(String... parts) { + return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, File.separator); + } - @Override - public void relaunch() throws IOException { - fail("THIS IS NOT UNDER TEST"); - } + static File asAbsFile(String... parts) { + return new File(asAbsPath(parts)); + } - @Override - public boolean didMainProcessExit() { - fail("THIS IS NOT UNDER TEST"); - return false; - } + static String asPath(String... parts) { + return PATH_JOIN.join(parts); + } - @Override - public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException { - fail("THIS IS NOT UNDER TEST"); - return false; - } + public static File asFile(String... parts) { + return new File(asPath(parts)); } @Test @@ -103,46 +70,28 @@ public class ContainerTest { when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); - MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops); + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", 6628, 8080, la, null, "worker", new HashMap<>(), ops); mc.kill(); assertEquals(Collections.EMPTY_LIST, mc.killedPids); assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); mc.forceKill(); assertEquals(Collections.EMPTY_LIST, mc.killedPids); assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); - + long pid = 987654321; mc.allPids.add(pid); - + mc.kill(); assertEquals(mc.allPids, new HashSet<>(mc.killedPids)); assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); mc.killedPids.clear(); - + mc.forceKill(); assertEquals(Collections.EMPTY_LIST, mc.killedPids); assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids)); } - - private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls(); - private static final String DOUBLE_SEP = File.separator + File.separator; - static String asAbsPath(String ... parts) { - return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, File.separator); - } - - static File asAbsFile(String ... parts) { - return new File(asAbsPath(parts)); - } - - static String asPath(String ... parts) { - return PATH_JOIN.join(parts); - } - - public static File asFile(String ... parts) { - return new File(asPath(parts)); - } - + @SuppressWarnings("unchecked") @Test public void testSetup() throws Exception { @@ -156,11 +105,11 @@ public class ContainerTest { final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId); final File workerRoot = asAbsFile(stormLocal, "workers", workerId); final File distRoot = asAbsFile(stormLocal, "supervisor", "stormdist", topoId); - + final Map<String, Object> topoConf = new HashMap<>(); final List<String> topoUsers = Arrays.asList("t-user-a", "t-user-b"); final List<String> logUsers = Arrays.asList("l-user-a", "l-user-b"); - + final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b"); final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b"); @@ -168,36 +117,36 @@ public class ContainerTest { topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups); topoConf.put(DaemonConfig.LOGS_USERS, logUsers); topoConf.put(Config.TOPOLOGY_USERS, topoUsers); - + final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); - + final StringWriter yamlDump = new StringWriter(); - + AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); when(ops.fileExists(workerArtifacts)).thenReturn(true); when(ops.fileExists(workerRoot)).thenReturn(true); when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump); - + LocalAssignment la = new LocalAssignment(); la.set_topology_id(topoId); la.set_owner(user); - MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops); - + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", 6628, 8080, la, null, workerId, topoConf, ops); + mc.setup(); - + //Initial Setup verify(ops).forceMkdir(new File(workerRoot, "pids")); verify(ops).forceMkdir(new File(workerRoot, "tmp")); verify(ops).forceMkdir(new File(workerRoot, "heartbeats")); verify(ops).fileExists(workerArtifacts); - + //Log file permissions verify(ops).getWriter(logMetadataFile); - + String yamlResult = yamlDump.toString(); Yaml yaml = new Yaml(); Map<String, Object> result = (Map<String, Object>) yaml.load(yamlResult); @@ -205,22 +154,22 @@ public class ContainerTest { assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER)); HashSet<String> allowedUsers = new HashSet<>(topoUsers); allowedUsers.addAll(logUsers); - assertEquals(allowedUsers, new HashSet<String>((List<String>)result.get(DaemonConfig.LOGS_USERS))); - + assertEquals(allowedUsers, new HashSet<String>((List<String>) result.get(DaemonConfig.LOGS_USERS))); + HashSet<String> allowedGroups = new HashSet<>(topoGroups); allowedGroups.addAll(logGroups); - assertEquals(allowedGroups, new HashSet<String>((List<String>)result.get(DaemonConfig.LOGS_GROUPS))); - + assertEquals(allowedGroups, new HashSet<String>((List<String>) result.get(DaemonConfig.LOGS_GROUPS))); + //Save the current user to help with recovery verify(ops).dump(workerUserFile, user); - + //Create links to artifacts dir verify(ops).createSymlink(new File(workerRoot, "artifacts"), workerArtifacts); - - //Create links to blobs + + //Create links to blobs verify(ops, never()).createSymlink(new File(workerRoot, "resources"), new File(distRoot, "resources")); } - + @Test public void testCleanup() throws Exception { final int supervisorPort = 6628; @@ -235,38 +184,88 @@ public class ContainerTest { final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId); final File workerRoot = asAbsFile(stormLocal, "workers", workerId); final File workerPidsRoot = new File(workerRoot, "pids"); - + final Map<String, Object> topoConf = new HashMap<>(); - + final Map<String, Object> superConf = new HashMap<>(); superConf.put(Config.STORM_LOCAL_DIR, stormLocal); superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); - + final StringWriter yamlDump = new StringWriter(); - + AdvancedFSOps ops = mock(AdvancedFSOps.class); when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); when(ops.fileExists(workerArtifacts)).thenReturn(true); when(ops.fileExists(workerRoot)).thenReturn(true); when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump); - + ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class); - + LocalAssignment la = new LocalAssignment(); la.set_owner(user); la.set_topology_id(topoId); - MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, - "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops); + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", supervisorPort, port, la, iso, workerId, topoConf, ops); mc.allPids.add(pid); - + mc.cleanUp(); verify(ops).deleteIfExists(eq(new File(workerPidsRoot, String.valueOf(pid))), eq(user), any(String.class)); verify(iso).releaseResourcesForWorker(workerId); - + verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), any(String.class)); verify(ops).deleteIfExists(eq(new File(workerRoot, "tmp")), eq(user), any(String.class)); verify(ops).deleteIfExists(eq(new File(workerRoot, "heartbeats")), eq(user), any(String.class)); verify(ops).deleteIfExists(eq(workerRoot), eq(user), any(String.class)); verify(ops).deleteIfExists(workerUserFile); } + + public static class MockContainer extends Container { + + public final List<Long> killedPids = new ArrayList<>(); + public final List<Long> forceKilledPids = new ArrayList<>(); + public final Set<Long> allPids = new HashSet<>(); + protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort, + int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, + String workerId, Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { + super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, workerId, + topoConf, ops); + } + + @Override + protected void kill(long pid) { + killedPids.add(pid); + } + + @Override + protected void forceKill(long pid) { + forceKilledPids.add(pid); + } + + @Override + protected Set<Long> getAllPids() throws IOException { + return allPids; + } + + @Override + public void launch() throws IOException { + fail("THIS IS NOT UNDER TEST"); + } + + @Override + public void relaunch() throws IOException { + fail("THIS IS NOT UNDER TEST"); + } + + @Override + public boolean didMainProcessExit() { + fail("THIS IS NOT UNDER TEST"); + return false; + } + + @Override + public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException { + fail("THIS IS NOT UNDER TEST"); + return false; + } + } }
