Repository: storm Updated Branches: refs/heads/master 4ce6f04e8 -> e0056c7d6
http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java new file mode 100644 index 0000000..5265cf6 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.daemon.supervisor.Container.ContainerType; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.junit.Test; + +public class BasicContainerTest { + public static class CommandRun { + final List<String> cmd; + final Map<String, String> env; + final File pwd; + + public CommandRun(List<String> cmd, Map<String, String> env, File pwd) { + this.cmd = cmd; + this.env = env; + this.pwd = pwd; + } + } + + public static class MockBasicContainer extends BasicContainer { + public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, LocalState localState, + String workerId, Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) + throws IOException { + super(type, conf, supervisorId, port, assignment, resourceIsolationManager, localState, workerId, topoConf, ops, + profileCmd); + } + + public final List<CommandRun> profileCmds = new ArrayList<>(); + public final List<CommandRun> workerCmds = new ArrayList<>(); + + + @Override + protected Map<String, Object> readTopoConf() throws IOException { + return new HashMap<>(); + } + + @Override + public void createNewWorkerId() { + super.createNewWorkerId(); + } + + @Override + public List<String> substituteChildopts(Object value, int memOnheap) { + return super.substituteChildopts(value, memOnheap); + } + + @Override + protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix, + File targetDir) throws IOException, InterruptedException { + profileCmds.add(new CommandRun(command, env, targetDir)); + return true; + } + + @Override + protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix, + ExitCodeCallback processExitCallback, File targetDir) throws IOException { + workerCmds.add(new CommandRun(command, env, targetDir)); + } + + @Override + protected String javaCmd(String cmd) { + //avoid system dependent things + return cmd; + } + + @Override + protected List<String> frameworkClasspath() { + //We are not really running anything so make this + // simple to check for + return Arrays.asList("FRAMEWORK_CP"); + } + + @Override + protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) { + return "JLP"; + } + } + + @Test + public void testCreateNewWorkerId() throws Exception { + final String topoId = "test_topology"; + final int port = 8080; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + Map<String, Object> superConf = new HashMap<>(); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + + LocalState ls = mock(LocalState.class); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), ops, "profile"); + //null worker id means generate one... + + assertNotNull(mc._workerId); + verify(ls).getApprovedWorkers(); + Map<String, Integer> expectedNewState = new HashMap<String, Integer>(); + expectedNewState.put(mc._workerId, port); + verify(ls).setApprovedWorkers(expectedNewState); + } + + @Test + public void testRecovery() throws Exception { + final String topoId = "test_topology"; + final String workerId = "myWorker"; + final int port = 8080; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + Map<String, Integer> workerState = new HashMap<String, Integer>(); + workerState.put(workerId, port); + + LocalState ls = mock(LocalState.class); + when(ls.getApprovedWorkers()).thenReturn(workerState); + + Map<String, Object> superConf = new HashMap<>(); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf, + "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), ops, "profile"); + + assertEquals(workerId, mc._workerId); + } + + @Test + public void testRecoveryMiss() throws Exception { + final String topoId = "test_topology"; + final int port = 8080; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + Map<String, Integer> workerState = new HashMap<String, Integer>(); + workerState.put("somethingelse", port+1); + + LocalState ls = mock(LocalState.class); + when(ls.getApprovedWorkers()).thenReturn(workerState); + + try { + new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(), + "SUPERVISOR", port, la, null, ls, null, new HashMap<>(), null, "profile"); + fail("Container recovered worker incorrectly"); + } catch (ContainerRecoveryException e) { + //Expected + } + } + + @Test + public void testCleanUp() throws Exception { + final String topoId = "test_topology"; + final int port = 8080; + final String workerId = "worker-id"; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + Map<String, Object> superConf = new HashMap<>(); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + + Map<String, Integer> workerState = new HashMap<String, Integer>(); + workerState.put(workerId, port); + + LocalState ls = mock(LocalState.class); + when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState)); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); + + mc.cleanUp(); + + assertNull(mc._workerId); + verify(ls).getApprovedWorkers(); + Map<String, Integer> expectedNewState = new HashMap<String, Integer>(); + verify(ls).setApprovedWorkers(expectedNewState); + } + + @Test + public void testRunProfiling() throws Exception { + final long pid = 100; + final String topoId = "test_topology"; + final int port = 8080; + final String workerId = "worker-id"; + final String stormLocal = ContainerTest.asAbsPath("tmp", "testing"); + final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, String.valueOf(port)); + final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, "worker.pid"); + + final Map<String, Object> superConf = new HashMap<>(); + superConf.put(Config.STORM_LOCAL_DIR, stormLocal); + superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); + + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid)); + + LocalState ls = mock(LocalState.class); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); + + //HEAP DUMP + ProfileRequest req = new ProfileRequest(); + req.set_action(ProfileAction.JMAP_DUMP); + + mc.runProfiling(req, false); + + assertEquals(1, mc.profileCmds.size()); + CommandRun cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", topoRoot), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + + //JSTACK DUMP + req.set_action(ProfileAction.JSTACK_DUMP); + + mc.runProfiling(req, false); + + assertEquals(1, mc.profileCmds.size()); + cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", topoRoot), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + + //RESTART + req.set_action(ProfileAction.JVM_RESTART); + + mc.runProfiling(req, false); + + assertEquals(1, mc.profileCmds.size()); + cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + + //JPROFILE DUMP + req.set_action(ProfileAction.JPROFILE_DUMP); + + mc.runProfiling(req, false); + + assertEquals(1, mc.profileCmds.size()); + cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", topoRoot), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + + //JPROFILE START + req.set_action(ProfileAction.JPROFILE_STOP); + + mc.runProfiling(req, false); + + assertEquals(1, mc.profileCmds.size()); + cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + + //JPROFILE STOP + req.set_action(ProfileAction.JPROFILE_STOP); + + mc.runProfiling(req, true); + + assertEquals(1, mc.profileCmds.size()); + cmd = mc.profileCmds.get(0); + mc.profileCmds.clear(); + assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", topoRoot), cmd.cmd); + assertEquals(new File(topoRoot), cmd.pwd); + } + + private static void setSystemProp(String key, String value) { + if (value == null) { + System.clearProperty(key); + } else { + System.setProperty(key, value); + } + } + + private static interface Run { + public void run() throws Exception; + } + + private static void checkpoint(Run r, String ... newValues) throws Exception { + if (newValues.length % 2 != 0) { + throw new IllegalArgumentException("Parameters are of the form system property name, new value"); + } + Map<String, String> orig = new HashMap<>(); + try { + for (int index = 0; index < newValues.length; index += 2) { + String key = newValues[index]; + String value = newValues[index + 1]; + orig.put(key, System.getProperty(key)); + setSystemProp(key, value); + } + r.run(); + } finally { + for (Map.Entry<String, String> entry: orig.entrySet()) { + setSystemProp(entry.getKey(), entry.getValue()); + } + } + } + + private static <T> void assertListEquals(List<T> a, List<T> b) { + if (a == null) { + assertNull(b); + } + if (b == null) { + assertNull(a); + } + int commonLen = Math.min(a.size(), b.size()); + for (int i = 0; i < commonLen; i++) { + assertEquals("at index "+i+"\n"+a+" !=\n"+b+"\n", a.get(i), b.get(i)); + } + + assertEquals("size of lists don't match \n"+a+" !=\n"+b, a.size(), b.size()); + } + + @Test + public void testLaunch() throws Exception { + final String topoId = "test_topology"; + final int port = 8080; + final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home"); + final String stormLogDir = ContainerTest.asFile(".", "target").getCanonicalPath(); + final String workerId = "worker-id"; + final String stormLocal = ContainerTest.asAbsPath("tmp", "storm-local"); + final String distRoot = ContainerTest.asAbsPath(stormLocal, "supervisor", "stormdist", topoId); + final File stormcode = new File(distRoot, "stormcode.ser"); + final File stormjar = new File(distRoot, "stormjar.jar"); + final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf"); + final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml"); + final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId); + final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp"); + + final StormTopology st = new StormTopology(); + st.set_spouts(new HashMap<>()); + st.set_bolts(new HashMap<>()); + st.set_state_spouts(new HashMap<>()); + byte [] serializedState = Utils.gzip(Utils.thriftSerialize(st)); + + final Map<String, Object> superConf = new HashMap<>(); + superConf.put(Config.STORM_LOCAL_DIR, stormLocal); + superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); + superConf.put(Config.STORM_LOG4J2_CONF_DIR, log4jdir); + + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + when(ops.slurp(stormcode)).thenReturn(serializedState); + + LocalState ls = mock(LocalState.class); + + + checkpoint(() -> { + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); + + mc.launch(); + + assertEquals(1, mc.workerCmds.size()); + CommandRun cmd = mc.workerCmds.get(0); + mc.workerCmds.clear(); + assertListEquals(Arrays.asList( + "java", + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "org.apache.storm.LogWriter", + "java", + "-server", + "-Dlogging.sensitivity=S3", + "-Dlogfile.name=worker.log", + "-Dstorm.home=" + stormHome, + "-Dworkers.artifacts=" + stormLocal, + "-Dstorm.id=" + topoId, + "-Dworker.id=" + workerId, + "-Dworker.port=" + port, + "-Dstorm.log.dir=" + stormLogDir, + "-Dlog4j.configurationFile=" + workerConf, + "-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector", + "-Dstorm.local.dir=" + stormLocal, + "-Djava.library.path=JLP", + "-Dstorm.conf.file=", + "-Dstorm.options=", + "-Djava.io.tmpdir="+workerTmpDir, + "-cp", + "FRAMEWORK_CP:" + stormjar.getAbsolutePath(), + "org.apache.storm.daemon.worker", + topoId, + "SUPERVISOR", + String.valueOf(port), + workerId + ), cmd.cmd); + assertEquals(new File(workerRoot), cmd.pwd); + }, + "storm.home", stormHome, + "storm.log.dir", stormLogDir); + } + + @Test + public void testSubstChildOpts() throws Exception { + String workerId = "w-01"; + String topoId = "s-01"; + int port = 9999; + int memOnheap = 512; + + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + + Map<String, Object> superConf = new HashMap<>(); + + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + + LocalState ls = mock(LocalState.class); + + MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, null, ls, workerId, new HashMap<>(), ops, "profile"); + + assertListEquals(Arrays.asList( + "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", + "-Xms256m", + "-Xmx512m"), + mc.substituteChildopts("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m", memOnheap)); + + assertListEquals(Arrays.asList( + "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log", + "-Xms256m", + "-Xmx512m"), + mc.substituteChildopts(Arrays.asList("-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log","-Xms256m","-Xmx%HEAP-MEM%m"), memOnheap)); + + assertListEquals(Collections.emptyList(), + mc.substituteChildopts(null)); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java new file mode 100644 index 0000000..b1adcd8 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/ContainerTest.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.storm.Config; +import org.apache.storm.container.ResourceIsolationInterface; +import org.apache.storm.daemon.supervisor.Container.ContainerType; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.junit.Test; +import org.yaml.snakeyaml.Yaml; + +import com.google.common.base.Joiner; + +public class ContainerTest { + public static class MockContainer extends Container { + + protected MockContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int port, + LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager, String workerId, + Map<String, Object> topoConf, AdvancedFSOps ops) throws IOException { + super(type, conf, supervisorId, port, assignment, resourceIsolationManager, workerId, topoConf, ops); + } + + public final List<Long> killedPids = new ArrayList<>(); + public final List<Long> forceKilledPids = new ArrayList<>(); + public final Set<Long> allPids = new HashSet<>(); + + @Override + protected void kill(long pid) { + killedPids.add(pid); + } + + @Override + protected void forceKill(long pid) { + forceKilledPids.add(pid); + } + + @Override + protected Set<Long> getAllPids() throws IOException { + return allPids; + } + + @Override + public void launch() throws IOException { + fail("THIS IS NOT UNDER TEST"); + } + + @Override + public void relaunch() throws IOException { + fail("THIS IS NOT UNDER TEST"); + } + + @Override + public boolean didMainProcessExit() { + fail("THIS IS NOT UNDER TEST"); + return false; + } + + @Override + public boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException { + fail("THIS IS NOT UNDER TEST"); + return false; + } + } + + @Test + public void testKill() throws Exception { + final String topoId = "test_topology"; + final Map<String, Object> superConf = new HashMap<>(); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", 8080, la, null, "worker", new HashMap<>(), ops); + mc.kill(); + assertEquals(Collections.EMPTY_LIST, mc.killedPids); + assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); + mc.forceKill(); + assertEquals(Collections.EMPTY_LIST, mc.killedPids); + assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); + + long pid = 987654321; + mc.allPids.add(pid); + + mc.kill(); + assertEquals(mc.allPids, new HashSet<>(mc.killedPids)); + assertEquals(Collections.EMPTY_LIST, mc.forceKilledPids); + mc.killedPids.clear(); + + mc.forceKill(); + assertEquals(Collections.EMPTY_LIST, mc.killedPids); + assertEquals(mc.allPids, new HashSet<>(mc.forceKilledPids)); + } + + private static final Joiner PATH_JOIN = Joiner.on(File.separator).skipNulls(); + private static final String DOUBLE_SEP = File.separator + File.separator; + static String asAbsPath(String ... parts) { + return (File.separator + PATH_JOIN.join(parts)).replace(DOUBLE_SEP, File.separator); + } + + static File asAbsFile(String ... parts) { + return new File(asAbsPath(parts)); + } + + static String asPath(String ... parts) { + return PATH_JOIN.join(parts); + } + + public static File asFile(String ... parts) { + return new File(asPath(parts)); + } + + @SuppressWarnings("unchecked") + @Test + public void testSetup() throws Exception { + final int port = 8080; + final String topoId = "test_topology"; + final String workerId = "worker_id"; + final String user = "me"; + final String stormLocal = asAbsPath("tmp", "testing"); + final File workerArtifacts = asAbsFile(stormLocal, topoId, String.valueOf(port)); + final File logMetadataFile = new File(workerArtifacts, "worker.yaml"); + final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId); + final File workerRoot = asAbsFile(stormLocal, "workers", workerId); + final File distRoot = asAbsFile(stormLocal, "supervisor", "stormdist", topoId); + + final Map<String, Object> topoConf = new HashMap<>(); + final List<String> topoUsers = Arrays.asList("t-user-a", "t-user-b"); + final List<String> logUsers = Arrays.asList("l-user-a", "l-user-b"); + + final List<String> topoGroups = Arrays.asList("t-group-a", "t-group-b"); + final List<String> logGroups = Arrays.asList("l-group-a", "l-group-b"); + + topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); + topoConf.put(Config.LOGS_GROUPS, logGroups); + topoConf.put(Config.TOPOLOGY_GROUPS, topoGroups); + topoConf.put(Config.LOGS_USERS, logUsers); + topoConf.put(Config.TOPOLOGY_USERS, topoUsers); + + final Map<String, Object> superConf = new HashMap<>(); + superConf.put(Config.STORM_LOCAL_DIR, stormLocal); + superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); + + final StringWriter yamlDump = new StringWriter(); + + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + when(ops.fileExists(workerArtifacts)).thenReturn(true); + when(ops.fileExists(workerRoot)).thenReturn(true); + when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump); + + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", 8080, la, null, workerId, topoConf, ops); + + mc.setup(); + + //Initial Setup + verify(ops).forceMkdir(new File(workerRoot, "pids")); + verify(ops).forceMkdir(new File(workerRoot, "tmp")); + verify(ops).forceMkdir(new File(workerRoot, "heartbeats")); + verify(ops).fileExists(workerArtifacts); + + //Log file permissions + verify(ops).getWriter(logMetadataFile); + + String yamlResult = yamlDump.toString(); + Yaml yaml = new Yaml(); + Map<String, Object> result = (Map<String, Object>) yaml.load(yamlResult); + assertEquals(workerId, result.get("worker-id")); + assertEquals(user, result.get(Config.TOPOLOGY_SUBMITTER_USER)); + HashSet<String> allowedUsers = new HashSet<>(topoUsers); + allowedUsers.addAll(logUsers); + assertEquals(allowedUsers, new HashSet<String>((List<String>)result.get(Config.LOGS_USERS))); + + HashSet<String> allowedGroups = new HashSet<>(topoGroups); + allowedGroups.addAll(logGroups); + assertEquals(allowedGroups, new HashSet<String>((List<String>)result.get(Config.LOGS_GROUPS))); + + //Save the current user to help with recovery + verify(ops).dump(workerUserFile, user); + + //Create links to artifacts dir + verify(ops).createSymlink(new File(workerRoot, "artifacts"), workerArtifacts); + + //Create links to blobs + verify(ops).createSymlink(new File(workerRoot, "resources"), new File(distRoot, "resources")); + } + + @Test + public void testCleanup() throws Exception { + final int port = 8080; + final long pid = 100; + final String topoId = "test_topology"; + final String workerId = "worker_id"; + final String user = "me"; + final String stormLocal = asAbsPath("tmp", "testing"); + final File workerArtifacts = asAbsFile(stormLocal, topoId, String.valueOf(port)); + final File logMetadataFile = new File(workerArtifacts, "worker.yaml"); + final File workerUserFile = asAbsFile(stormLocal, "workers-users", workerId); + final File workerRoot = asAbsFile(stormLocal, "workers", workerId); + final File workerPidsRoot = new File(workerRoot, "pids"); + + final Map<String, Object> topoConf = new HashMap<>(); + topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); + + final Map<String, Object> superConf = new HashMap<>(); + superConf.put(Config.STORM_LOCAL_DIR, stormLocal); + superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); + + final StringWriter yamlDump = new StringWriter(); + + AdvancedFSOps ops = mock(AdvancedFSOps.class); + when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true); + when(ops.fileExists(workerArtifacts)).thenReturn(true); + when(ops.fileExists(workerRoot)).thenReturn(true); + when(ops.getWriter(logMetadataFile)).thenReturn(yamlDump); + + ResourceIsolationInterface iso = mock(ResourceIsolationInterface.class); + + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + MockContainer mc = new MockContainer(ContainerType.LAUNCH, superConf, + "SUPERVISOR", port, la, iso, workerId, topoConf, ops); + mc.allPids.add(pid); + + mc.cleanUp(); + verify(ops).deleteIfExists(eq(new File(workerPidsRoot, String.valueOf(pid))), eq(user), any(String.class)); + verify(iso).releaseResourcesForWorker(workerId); + + verify(ops).deleteIfExists(eq(new File(workerRoot, "pids")), eq(user), any(String.class)); + verify(ops).deleteIfExists(eq(new File(workerRoot, "tmp")), eq(user), any(String.class)); + verify(ops).deleteIfExists(eq(new File(workerRoot, "heartbeats")), eq(user), any(String.class)); + verify(ops).deleteIfExists(eq(workerRoot), eq(user), any(String.class)); + verify(ops).deleteIfExists(workerUserFile); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java new file mode 100644 index 0000000..24ccda5 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/daemon/supervisor/SlotTest.java @@ -0,0 +1,515 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.storm.daemon.supervisor.Slot.StaticState; +import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction; +import org.apache.storm.daemon.supervisor.Slot.DynamicState; +import org.apache.storm.daemon.supervisor.Slot.MachineState; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.ILocalizer; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.junit.Test; + +public class SlotTest { + static WorkerResources mkWorkerResources(Double cpu, Double mem_on_heap, Double mem_off_heap) { + WorkerResources resources = new WorkerResources(); + if (cpu != null) { + resources.set_cpu(cpu); + } + + if (mem_on_heap != null) { + resources.set_mem_on_heap(mem_on_heap); + } + + if (mem_off_heap != null) { + resources.set_mem_off_heap(mem_off_heap); + } + return resources; + } + + static LSWorkerHeartbeat mkWorkerHB(String id, int port, List<ExecutorInfo> exec, Integer timeSecs) { + LSWorkerHeartbeat ret = new LSWorkerHeartbeat(); + ret.set_topology_id(id); + ret.set_port(port); + ret.set_executors(exec); + ret.set_time_secs(timeSecs); + return ret; + } + + static List<ExecutorInfo> mkExecutorInfoList(int ... executors) { + ArrayList<ExecutorInfo> ret = new ArrayList<>(executors.length); + for (int exec : executors) { + ExecutorInfo execInfo = new ExecutorInfo(); + execInfo.set_task_start(exec); + execInfo.set_task_end(exec); + ret.add(execInfo); + } + return ret; + } + + static LocalAssignment mkLocalAssignment(String id, List<ExecutorInfo> exec, WorkerResources resources) { + LocalAssignment ret = new LocalAssignment(); + ret.set_topology_id(id); + ret.set_executors(exec); + if (resources != null) { + ret.set_resources(resources); + } + return ret; + } + + @Test + public void testEquivilant() { + LocalAssignment a = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment aResized = mkLocalAssignment("A", mkExecutorInfoList(1,2,3,4,5), mkWorkerResources(100.0, 200.0, 100.0)); + LocalAssignment b = mkLocalAssignment("B", mkExecutorInfoList(1,2,3,4,5,6), mkWorkerResources(100.0, 100.0, 100.0)); + LocalAssignment bReordered = mkLocalAssignment("B", mkExecutorInfoList(6,5,4,3,2,1), mkWorkerResources(100.0, 100.0, 100.0)); + + assertTrue(Slot.equivalent(null, null)); + assertTrue(Slot.equivalent(a, a)); + assertTrue(Slot.equivalent(b, bReordered)); + assertTrue(Slot.equivalent(bReordered, b)); + + assertFalse(Slot.equivalent(a, aResized)); + assertFalse(Slot.equivalent(aResized, a)); + assertFalse(Slot.equivalent(a, null)); + assertFalse(Slot.equivalent(null, b)); + assertFalse(Slot.equivalent(a, b)); + } + + @Test + public void testEmptyToEmpty() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + ILocalizer localizer = mock(ILocalizer.class); + LocalState state = mock(LocalState.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + ISupervisor iSuper = mock(ISupervisor.class); + StaticState staticState = new StaticState(localizer, 1000, 1000, 1000, 1000, + containerLauncher, "localhost", 8080, iSuper, state); + DynamicState dynamicState = new DynamicState(null, null, null); + DynamicState nextState = Slot.handleEmpty(dynamicState, staticState); + assertEquals(MachineState.EMPTY, nextState.state); + assertTrue(Time.currentTimeMillis() > 1000); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testLaunchContainerFromEmpty() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + int port = 8080; + String topoId = "NEW"; + List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment newAssignment = + mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); + + ILocalizer localizer = mock(ILocalizer.class); + Container container = mock(Container.class); + LocalState state = mock(LocalState.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + when(containerLauncher.launchContainer(port, newAssignment, state)).thenReturn(container); + LSWorkerHeartbeat hb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()); + when(container.readHeartbeat()).thenReturn(hb, hb); + + @SuppressWarnings("unchecked") + Future<Void> baseFuture = mock(Future.class); + when(localizer.requestDownloadBaseTopologyBlobs(newAssignment, port)).thenReturn(baseFuture); + + @SuppressWarnings("unchecked") + Future<Void> blobFuture = mock(Future.class); + when(localizer.requestDownloadTopologyBlobs(newAssignment, port)).thenReturn(blobFuture); + + ISupervisor iSuper = mock(ISupervisor.class); + StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + containerLauncher, "localhost", port, iSuper, state); + DynamicState dynamicState = new DynamicState(null, null, null) + .withNewAssignment(newAssignment); + + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); + verify(localizer).requestDownloadBaseTopologyBlobs(newAssignment, port); + assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, nextState.state); + assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload); + assertEquals(newAssignment, nextState.pendingLocalization); + assertEquals(0, Time.currentTimeMillis()); + + nextState = Slot.stateMachineStep(nextState, staticState); + verify(baseFuture).get(1000, TimeUnit.MILLISECONDS); + verify(localizer).requestDownloadTopologyBlobs(newAssignment, port); + assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state); + assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload); + assertEquals(newAssignment, nextState.pendingLocalization); + assertEquals(0, Time.currentTimeMillis()); + + nextState = Slot.stateMachineStep(nextState, staticState); + verify(blobFuture).get(1000, TimeUnit.MILLISECONDS); + verify(containerLauncher).launchContainer(port, newAssignment, state); + assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(newAssignment, nextState.currentAssignment); + assertSame(container, nextState.container); + assertEquals(0, Time.currentTimeMillis()); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(newAssignment, nextState.currentAssignment); + assertSame(container, nextState.container); + assertEquals(0, Time.currentTimeMillis()); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(newAssignment, nextState.currentAssignment); + assertSame(container, nextState.container); + assertTrue(Time.currentTimeMillis() > 1000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(newAssignment, nextState.currentAssignment); + assertSame(container, nextState.container); + assertTrue(Time.currentTimeMillis() > 2000); + } finally { + Time.stopSimulating(); + } + } + + + @Test + public void testRelaunch() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + int port = 8080; + String topoId = "CURRENT"; + List<ExecutorInfo> execList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment assignment = + mkLocalAssignment(topoId, execList, mkWorkerResources(100.0, 100.0, 100.0)); + + ILocalizer localizer = mock(ILocalizer.class); + Container container = mock(Container.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + LSWorkerHeartbeat oldhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()-10); + LSWorkerHeartbeat goodhb = mkWorkerHB(topoId, port, execList, Time.currentTimeSecs()); + when(container.readHeartbeat()).thenReturn(oldhb, oldhb, goodhb, goodhb); + when(container.areAllProcessesDead()).thenReturn(false, true); + + ISupervisor iSuper = mock(ISupervisor.class); + LocalState state = mock(LocalState.class); + StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + containerLauncher, "localhost", port, iSuper, state); + DynamicState dynamicState = new DynamicState(assignment, container, assignment); + + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); + assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state); + verify(container).kill(); + assertTrue(Time.currentTimeMillis() > 1000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.KILL_AND_RELAUNCH, nextState.state); + verify(container).forceKill(); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); + verify(container).relaunch(); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); + assertTrue(Time.currentTimeMillis() > 3000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testReschedule() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + int port = 8080; + String cTopoId = "CURRENT"; + List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + + Container cContainer = mock(Container.class); + LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()); + when(cContainer.readHeartbeat()).thenReturn(chb); + when(cContainer.areAllProcessesDead()).thenReturn(false, true); + + String nTopoId = "NEW"; + List<ExecutorInfo> nExecList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment nAssignment = + mkLocalAssignment(nTopoId, nExecList, mkWorkerResources(100.0, 100.0, 100.0)); + + ILocalizer localizer = mock(ILocalizer.class); + Container nContainer = mock(Container.class); + LocalState state = mock(LocalState.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + when(containerLauncher.launchContainer(port, nAssignment, state)).thenReturn(nContainer); + LSWorkerHeartbeat nhb = mkWorkerHB(nTopoId, 100, nExecList, Time.currentTimeSecs()); + when(nContainer.readHeartbeat()).thenReturn(nhb, nhb); + + @SuppressWarnings("unchecked") + Future<Void> baseFuture = mock(Future.class); + when(localizer.requestDownloadBaseTopologyBlobs(nAssignment, port)).thenReturn(baseFuture); + + @SuppressWarnings("unchecked") + Future<Void> blobFuture = mock(Future.class); + when(localizer.requestDownloadTopologyBlobs(nAssignment, port)).thenReturn(blobFuture); + + ISupervisor iSuper = mock(ISupervisor.class); + StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + containerLauncher, "localhost", port, iSuper, state); + DynamicState dynamicState = new DynamicState(cAssignment, cContainer, nAssignment); + + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); + assertEquals(MachineState.KILL, nextState.state); + verify(cContainer).kill(); + verify(localizer).requestDownloadBaseTopologyBlobs(nAssignment, port); + assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload); + assertEquals(nAssignment, nextState.pendingLocalization); + assertTrue(Time.currentTimeMillis() > 1000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.KILL, nextState.state); + verify(cContainer).forceKill(); + assertSame("pendingDownload not set properly", baseFuture, nextState.pendingDownload); + assertEquals(nAssignment, nextState.pendingLocalization); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.WAITING_FOR_BASIC_LOCALIZATION, nextState.state); + verify(cContainer).cleanUp(); + verify(localizer).releaseSlotFor(cAssignment, port); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.WAITING_FOR_BLOB_LOCALIZATION, nextState.state); + verify(baseFuture).get(1000, TimeUnit.MILLISECONDS); + verify(localizer).requestDownloadTopologyBlobs(nAssignment, port); + assertSame("pendingDownload not set properly", blobFuture, nextState.pendingDownload); + assertEquals(nAssignment, nextState.pendingLocalization); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + verify(blobFuture).get(1000, TimeUnit.MILLISECONDS); + verify(containerLauncher).launchContainer(port, nAssignment, state); + assertEquals(MachineState.WAITING_FOR_WORKER_START, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(nAssignment, nextState.currentAssignment); + assertSame(nContainer, nextState.container); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(nAssignment, nextState.currentAssignment); + assertSame(nContainer, nextState.container); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(nAssignment, nextState.currentAssignment); + assertSame(nContainer, nextState.container); + assertTrue(Time.currentTimeMillis() > 3000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertSame("pendingDownload is not null", null, nextState.pendingDownload); + assertSame(null, nextState.pendingLocalization); + assertSame(nAssignment, nextState.currentAssignment); + assertSame(nContainer, nextState.container); + assertTrue(Time.currentTimeMillis() > 4000); + } finally { + Time.stopSimulating(); + } + } + + + @Test + public void testRunningToEmpty() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + int port = 8080; + String cTopoId = "CURRENT"; + List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + + Container cContainer = mock(Container.class); + LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()); + when(cContainer.readHeartbeat()).thenReturn(chb); + when(cContainer.areAllProcessesDead()).thenReturn(false, true); + + ILocalizer localizer = mock(ILocalizer.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + + ISupervisor iSuper = mock(ISupervisor.class); + LocalState state = mock(LocalState.class); + StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + containerLauncher, "localhost", port, iSuper, state); + DynamicState dynamicState = new DynamicState(cAssignment, cContainer, null); + + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); + assertEquals(MachineState.KILL, nextState.state); + verify(cContainer).kill(); + verify(localizer, never()).requestDownloadBaseTopologyBlobs(null, port); + assertSame("pendingDownload not set properly", null, nextState.pendingDownload); + assertEquals(null, nextState.pendingLocalization); + assertTrue(Time.currentTimeMillis() > 1000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.KILL, nextState.state); + verify(cContainer).forceKill(); + assertSame("pendingDownload not set properly", null, nextState.pendingDownload); + assertEquals(null, nextState.pendingLocalization); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.EMPTY, nextState.state); + verify(cContainer).cleanUp(); + verify(localizer).releaseSlotFor(cAssignment, port); + assertEquals(null, nextState.container); + assertEquals(null, nextState.currentAssignment); + assertTrue(Time.currentTimeMillis() > 2000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.EMPTY, nextState.state); + assertEquals(null, nextState.container); + assertEquals(null, nextState.currentAssignment); + assertTrue(Time.currentTimeMillis() > 3000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.EMPTY, nextState.state); + assertEquals(null, nextState.container); + assertEquals(null, nextState.currentAssignment); + assertTrue(Time.currentTimeMillis() > 3000); + } finally { + Time.stopSimulating(); + } + } + + @Test + public void testRunWithProfileActions() throws Exception { + Time.startSimulatingAutoAdvanceOnSleep(1010); + try { + int port = 8080; + String cTopoId = "CURRENT"; + List<ExecutorInfo> cExecList = mkExecutorInfoList(1,2,3,4,5); + LocalAssignment cAssignment = + mkLocalAssignment(cTopoId, cExecList, mkWorkerResources(100.0, 100.0, 100.0)); + + Container cContainer = mock(Container.class); + LSWorkerHeartbeat chb = mkWorkerHB(cTopoId, port, cExecList, Time.currentTimeSecs()+100); //NOT going to timeout for a while + when(cContainer.readHeartbeat()).thenReturn(chb, chb, chb, chb, chb, chb); + when(cContainer.runProfiling(any(ProfileRequest.class), anyBoolean())).thenReturn(true); + + ILocalizer localizer = mock(ILocalizer.class); + ContainerLauncher containerLauncher = mock(ContainerLauncher.class); + + ISupervisor iSuper = mock(ISupervisor.class); + LocalState state = mock(LocalState.class); + StaticState staticState = new StaticState(localizer, 5000, 120000, 1000, 1000, + containerLauncher, "localhost", port, iSuper, state); + Set<TopoProfileAction> profileActions = new HashSet<>(); + ProfileRequest request = new ProfileRequest(); + request.set_action(ProfileAction.JPROFILE_STOP); + NodeInfo info = new NodeInfo(); + info.set_node("localhost"); + info.add_to_port(port); + request.set_nodeInfo(info); + request.set_time_stamp(Time.currentTimeMillis() + 3000);//3 seconds from now + + TopoProfileAction profile = new TopoProfileAction(cTopoId, request); + profileActions.add(profile); + Set<TopoProfileAction> expectedPending = new HashSet<>(); + expectedPending.add(profile); + + + DynamicState dynamicState = new DynamicState(cAssignment, cContainer, cAssignment).withProfileActions(profileActions, Collections.<TopoProfileAction> emptySet()); + + DynamicState nextState = Slot.stateMachineStep(dynamicState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + verify(cContainer).runProfiling(request, false); + assertEquals(expectedPending, nextState.pendingStopProfileActions); + assertEquals(expectedPending, nextState.profileActions); + assertTrue(Time.currentTimeMillis() > 1000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertEquals(expectedPending, nextState.pendingStopProfileActions); + assertEquals(expectedPending, nextState.profileActions); + assertTrue(Time.currentTimeMillis() > 2000); + + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertEquals(expectedPending, nextState.pendingStopProfileActions); + assertEquals(expectedPending, nextState.profileActions); + assertTrue(Time.currentTimeMillis() > 3000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + verify(cContainer).runProfiling(request, true); + assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions); + assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions); + assertTrue(Time.currentTimeMillis() > 4000); + + nextState = Slot.stateMachineStep(nextState, staticState); + assertEquals(MachineState.RUNNING, nextState.state); + assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.pendingStopProfileActions); + assertEquals(Collections.<TopoProfileAction> emptySet(), nextState.profileActions); + assertTrue(Time.currentTimeMillis() > 5000); + } finally { + Time.stopSimulating(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java b/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java new file mode 100644 index 0000000..98a43e4 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/executor/error/ReportErrorTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.error; + +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.Time; +import org.junit.Test; + +public class ReportErrorTest { + + @Test + public void testReport() { + final String topo = "topology"; + final String comp = "component"; + final Long port = new Long(8080); + final AtomicLong errorCount = new AtomicLong(0l); + + WorkerTopologyContext context = mock(WorkerTopologyContext.class); + when(context.getThisWorkerPort()).thenReturn(port.intValue()); + + IStormClusterState state = mock(IStormClusterState.class); + doAnswer((invocation) -> errorCount.incrementAndGet()) + .when(state).reportError(eq(topo), eq(comp), any(String.class), eq(port), any(Throwable.class)); + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS, 10); + conf.put(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL, 4); + + Time.startSimulating(); + try { + ReportError report = new ReportError(conf, state, topo, comp, context); + report.report(new RuntimeException("ERROR-1")); + assertEquals(1, errorCount.get()); + report.report(new RuntimeException("ERROR-2")); + assertEquals(2, errorCount.get()); + report.report(new RuntimeException("ERROR-3")); + assertEquals(3, errorCount.get()); + report.report(new RuntimeException("ERROR-4")); + assertEquals(4, errorCount.get()); + //Too fast not reported + report.report(new RuntimeException("ERROR-5")); + assertEquals(4, errorCount.get()); + Time.advanceTime(9000); + report.report(new RuntimeException("ERROR-6")); + assertEquals(4, errorCount.get()); + Time.advanceTime(2000); + report.report(new RuntimeException("ERROR-7")); + assertEquals(5, errorCount.get()); + } finally { + Time.stopSimulating(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java new file mode 100644 index 0000000..41c17c8 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/localizer/AsyncLocalizerTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.localizer; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.storm.daemon.supervisor.AdvancedFSOps; +import org.apache.storm.utils.ConfigUtils; +import org.junit.Test; + +import org.apache.storm.Config; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.security.auth.DefaultPrincipalToLocal; +import org.apache.storm.utils.Utils; + +public class AsyncLocalizerTest { + + @Test + public void testRequestDownloadBaseTopologyBlobs() throws Exception { + final String topoId = "TOPO"; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + ExecutorInfo ei = new ExecutorInfo(); + ei.set_task_start(1); + ei.set_task_end(1); + la.add_to_executors(ei); + final int port = 8080; + final String jarKey = topoId + "-stormjar.jar"; + final String codeKey = topoId + "-stormcode.ser"; + final String confKey = topoId + "-stormconf.ser"; + final String stormLocal = "/tmp/storm-local/"; + final String stormRoot = stormLocal+topoId+"/"; + final File fStormRoot = new File(stormRoot); + ClientBlobStore blobStore = mock(ClientBlobStore.class); + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.SUPERVISOR_BLOBSTORE, ClientBlobStore.class.getName()); + conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName()); + conf.put(Config.STORM_CLUSTER_MODE, "distributed"); + conf.put(Config.STORM_LOCAL_DIR, stormLocal); + Localizer localizer = mock(Localizer.class); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + ConfigUtils mockedCU = mock(ConfigUtils.class); + Utils mockedU = mock(Utils.class); + + Map<String, Object> topoConf = new HashMap<>(conf); + + AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops); + ConfigUtils orig = ConfigUtils.setInstance(mockedCU); + Utils origUtils = Utils.setInstance(mockedU); + try { + when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot); + when(mockedCU.supervisorLocalDirImpl(conf)).thenReturn(stormLocal); + when(mockedU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore); + when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf); + + Future<Void> f = al.requestDownloadBaseTopologyBlobs(la, port); + f.get(20, TimeUnit.SECONDS); + // We should be done now... + + verify(blobStore).prepare(conf); + verify(mockedU).downloadResourcesAsSupervisorImpl(eq(jarKey), startsWith(stormLocal), eq(blobStore)); + verify(mockedU).downloadResourcesAsSupervisorImpl(eq(codeKey), startsWith(stormLocal), eq(blobStore)); + verify(mockedU).downloadResourcesAsSupervisorImpl(eq(confKey), startsWith(stormLocal), eq(blobStore)); + verify(blobStore).shutdown(); + //Extracting the dir from the jar + verify(mockedU).extractDirFromJarImpl(endsWith("stormjar.jar"), eq("resources"), any(File.class)); + verify(ops).moveDirectoryPreferAtomic(any(File.class), eq(fStormRoot)); + verify(ops).setupStormCodeDir(topoConf, fStormRoot); + + verify(ops, never()).deleteIfExists(any(File.class)); + } finally { + al.shutdown(); + ConfigUtils.setInstance(orig); + Utils.setInstance(origUtils); + } + } + + @Test + public void testRequestDownloadTopologyBlobs() throws Exception { + final String topoId = "TOPO-12345"; + LocalAssignment la = new LocalAssignment(); + la.set_topology_id(topoId); + ExecutorInfo ei = new ExecutorInfo(); + ei.set_task_start(1); + ei.set_task_end(1); + la.add_to_executors(ei); + final String topoName = "TOPO"; + final int port = 8080; + final String user = "user"; + final String simpleLocalName = "simple.txt"; + final String simpleKey = "simple"; + + final String stormLocal = "/tmp/storm-local/"; + final File userDir = new File(stormLocal, user); + final String stormRoot = stormLocal+topoId+"/"; + + final String localizerRoot = "/tmp/storm-localizer/"; + final String simpleLocalFile = localizerRoot + user + "/simple"; + final String simpleCurrentLocalFile = localizerRoot + user + "/simple.current"; + + final StormTopology st = new StormTopology(); + st.set_spouts(new HashMap<>()); + st.set_bolts(new HashMap<>()); + st.set_state_spouts(new HashMap<>()); + + Map<String, Map<String, Object>> topoBlobMap = new HashMap<>(); + Map<String, Object> simple = new HashMap<>(); + simple.put("localname", simpleLocalName); + simple.put("uncompress", false); + topoBlobMap.put(simpleKey, simple); + + Map<String, Object> conf = new HashMap<>(); + conf.put(Config.STORM_LOCAL_DIR, stormLocal); + Localizer localizer = mock(Localizer.class); + AdvancedFSOps ops = mock(AdvancedFSOps.class); + ConfigUtils mockedCU = mock(ConfigUtils.class); + Utils mockedU = mock(Utils.class); + + Map<String, Object> topoConf = new HashMap<>(conf); + topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap); + topoConf.put(Config.TOPOLOGY_SUBMITTER_USER, user); + topoConf.put(Config.TOPOLOGY_NAME, topoName); + + List<LocalizedResource> localizedList = new ArrayList<>(); + LocalizedResource simpleLocal = new LocalizedResource(simpleKey, simpleLocalFile, false); + localizedList.add(simpleLocal); + + AsyncLocalizer al = new AsyncLocalizer(conf, localizer, ops); + ConfigUtils orig = ConfigUtils.setInstance(mockedCU); + Utils origUtils = Utils.setInstance(mockedU); + try { + when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot); + when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf); + when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st); + + when(localizer.getLocalUserFileCacheDir(user)).thenReturn(userDir); + + when(localizer.getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir))).thenReturn(localizedList); + + Future<Void> f = al.requestDownloadTopologyBlobs(la, port); + f.get(20, TimeUnit.SECONDS); + // We should be done now... + + verify(localizer).getLocalUserFileCacheDir(user); + verify(ops).fileExists(userDir); + verify(ops).forceMkdir(userDir); + + verify(localizer).getBlobs(any(List.class), eq(user), eq(topoName), eq(userDir)); + + verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile)); + } finally { + al.shutdown(); + ConfigUtils.setInstance(orig); + Utils.setInstance(origUtils); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java index ab2c9af..04b5ab2 100644 --- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java +++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceRetentionSetTest.java @@ -34,8 +34,8 @@ public class LocalizedResourceRetentionSetTest { // check adding reference to local resource with topology of same name localresource2.addReference(("topo2")); - lrset.addResource("key1", localresource1, false); - lrset.addResource("key2", localresource2, false); + lrset.add("key1", localresource1, false); + lrset.add("key2", localresource2, false); lrretset.addResources(lrset); assertEquals("number to clean is not 0", 0, lrretset.getSizeWithNoReferences()); localresource1.removeReference(("topo1")); @@ -64,9 +64,9 @@ public class LocalizedResourceRetentionSetTest { // check adding reference to local resource with topology of same name localresource2.addReference(("topo1")); localresource2.setSize(10); - lrset.addResource("key1", localresource1, false); - lrset.addResource("key2", localresource2, false); - lrset.addResource("archive1", archiveresource1, true); + lrset.add("key1", localresource1, false); + lrset.add("key2", localresource2, false); + lrset.add("archive1", archiveresource1, true); lrretset.addResources(lrset); assertEquals("number to clean is not 2", 2, lrretset.getSizeWithNoReferences()); http://git-wip-us.apache.org/repos/asf/storm/blob/5a320461/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java index 7f20d19..550d695 100644 --- a/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java +++ b/storm-core/test/jvm/org/apache/storm/localizer/LocalizedResourceSetTest.java @@ -37,9 +37,9 @@ public class LocalizedResourceSetTest { LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1"); LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1"); assertEquals("size is wrong", 0, lrset.getSize()); - lrset.addResource("key1", localresource1, false); + lrset.add("key1", localresource1, false); assertEquals("size is wrong", 1, lrset.getSize()); - lrset.addResource("key2", localresource2, true); + lrset.add("key2", localresource2, true); assertEquals("size is wrong", 2, lrset.getSize()); } @@ -48,8 +48,8 @@ public class LocalizedResourceSetTest { LocalizedResourceSet lrset = new LocalizedResourceSet(user1); LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1"); LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1"); - lrset.addResource("key1", localresource1, false); - lrset.addResource("key2", localresource2, true); + lrset.add("key1", localresource1, false); + lrset.add("key2", localresource2, true); assertEquals("get doesn't return same object", localresource1, lrset.get("key1", false)); assertEquals("get doesn't return same object", localresource2, lrset.get("key2", true)); @@ -60,8 +60,8 @@ public class LocalizedResourceSetTest { LocalizedResourceSet lrset = new LocalizedResourceSet(user1); LocalizedResource localresource1 = new LocalizedResource("key1", "testfile1", false, "topo1"); LocalizedResource localresource2 = new LocalizedResource("key2", "testfile2", true, "topo1"); - lrset.addResource("key1", localresource1, false); - lrset.addResource("key2", localresource2, true); + lrset.add("key1", localresource1, false); + lrset.add("key2", localresource2, true); assertEquals("doesn't exist", true, lrset.exists("key1", false)); assertEquals("doesn't exist", true, lrset.exists("key2", true)); boolean val = lrset.remove(localresource1);