Repository: incubator-myriad Updated Branches: refs/heads/master 7207e2b04 -> dcccf2e3b
MYRIAD-234 JUnit test additions, updates, minor code cleanup, and additional javadoc comments JIRA: [MYRIAD-234] https://issues.apache.org/jira/browse/MYRIAD-234 Pull Request: Closes #86 Author: hokiegeek2 <hokiege...@gmail.com> Date: Fri Jul 15 09:12:24 2016 -0400 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/dcccf2e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/dcccf2e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/dcccf2e3 Branch: refs/heads/master Commit: dcccf2e3b4686d33c099a281f9001b43be4106f9 Parents: 7207e2b Author: hokiegeek2 <hokiege...@gmail.com> Authored: Fri Jul 15 09:12:24 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Wed Jul 20 14:44:02 2016 -0400 ---------------------------------------------------------------------- .../src/main/java/org/apache/myriad/Main.java | 34 +++- .../apache/myriad/scheduler/MyriadDriver.java | 8 +- .../myriad/scheduler/MyriadScheduler.java | 2 +- .../org/apache/myriad/scheduler/fgs/Node.java | 2 +- .../MyriadInitializationInterceptor.java | 3 +- .../java/org/apache/myriad/state/NodeTask.java | 2 +- .../org/apache/myriad/state/SchedulerState.java | 13 +- .../org/apache/myriad/BaseConfigurableTest.java | 6 - .../org/apache/myriad/TestObjectFactory.java | 94 ++++++++- .../myriad/scheduler/MyriadOperationsTest.java | 75 ++++--- .../myriad/scheduler/fgs/NodeStoreTest.java | 36 ++++ .../fgs/YarnNodeCapacityManagerTest.java | 91 +++++++++ .../org/apache/myriad/state/MockRMNode.java | 203 +++++++++++++++++++ .../apache/myriad/state/SchedulerStateTest.java | 2 +- 14 files changed, 508 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java index 14ab806..0615ebd 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -62,7 +62,16 @@ import com.google.inject.Guice; import com.google.inject.Injector; /** - * Main entry point for myriad scheduler + * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of + * the following components: + * + * 1. MyriadDriverManager + * 2. MyriadWebServer + * 3. TaskTerminator + * 4. HealthCheckRegistry + * + * Main uses the Guice Injector framework to manage the Myriad object graph and is + * configured by myriad-config-default.yml */ public class Main { private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); @@ -75,6 +84,18 @@ public class Main { private static Injector injector; + /** + * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of + * the following components: + * + * 1. MyriadDriverManager + * 2. MyriadWebServer + * 3. TaskTerminator + * 4. HealthCheckRegistry + * + * Main uses the Guice Injector framework to manage the Myriad object graph and is + * configured by myriad-config-default.yml + */ public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry registry) throws Exception { MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry); @@ -89,6 +110,13 @@ public class Main { return injector; } + /** + * Initializes the Myriad object graph via MyriadConfiguration and starts + * the Mesos interface (MyriadDriverManager) as well as all Myriad services + * + *@param cfg MyriadConfiguration + * @throws Exception + */ public void run(MyriadConfiguration cfg) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Bindings: " + injector.getAllBindings()); @@ -176,7 +204,7 @@ public class Main { String profile = entry.getKey(); ServiceResourceProfile nodeManager = profileManager.get(profile); if (nodeManager == null) { - throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'"); + throw new IllegalArgumentException("Invalid profile name '" + profile + "' specified in 'nmInstances'"); } if (entry.getValue() > 0) { if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus @@ -186,7 +214,7 @@ public class Main { } } if (maxCpu <= 0 || maxMem <= 0) { - throw new RuntimeException( + throw new IllegalStateException( "Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources."); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java index 31656fb..071982b 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java @@ -90,7 +90,7 @@ public class MyriadDriver { */ public Status kill(final TaskID taskId) { Status status = driver.killTask(taskId); - LOGGER.info("Task {} killed with status: {}", taskId, status); + LOGGER.info("Task {} kill initiated with Driver status {}", taskId, status); return status; } @@ -112,6 +112,12 @@ public class MyriadDriver { return status; } + /** + * Returns reference to the underlying Mesos SchedulerDriver + * to which all method invocations are delegated to. + * + * @return the underlying Mesos SchedulerDriver + */ public SchedulerDriver getDriver() { return driver; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java index 561d36e..798c68f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java @@ -196,4 +196,4 @@ public class MyriadScheduler implements Scheduler { } }); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java index f9df6af..c481ca3 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.mesos.Protos; /** - * Abstraction that encapsulates YARN and Mesos view of a node. + * Abstraction that encapsulates the combined YARN and Mesos views of a node. */ public class Node { /** http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java index 150f9fd..94af40e 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/interceptor/MyriadInitializationInterceptor.java @@ -27,7 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Responsible for intializing myriad. + * Responsible for initializing Myriad by invoking initialize upon the + * Myriad driver {@link org.apache.myriad.Main} */ public class MyriadInitializationInterceptor extends BaseInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadInitializationInterceptor.class); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java index 5acd7cb..6c48007 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/NodeTask.java @@ -30,7 +30,7 @@ import org.apache.myriad.scheduler.TaskUtils; import org.apache.myriad.scheduler.constraints.Constraint; /** - * Represents a task to be launched by the executor + * Represents a Mesos task to be launched by the Mesos executor */ public class NodeTask { @JsonProperty http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java index 8a531ea..8b5fb51 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java @@ -42,7 +42,8 @@ import com.google.common.base.Optional; import com.google.common.collect.Sets; /** - * Represents the state of the Myriad scheduler + * Encapsulates the state of the all {@link NodeTask} objects managed + * by a {@link MyriadScheduler} as well as corresponding state update methods. */ public class SchedulerState { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerState.class); @@ -164,6 +165,10 @@ public class SchedulerState { return this.tasks.get(taskId); } + /** + * Return a list of TaskIDs corresponding to all killable tasks + * @return + */ public synchronized Set<Protos.TaskID> getKillableTasks() { Set<Protos.TaskID> returnSet = new HashSet<>(); for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { @@ -172,6 +177,12 @@ public class SchedulerState { return returnSet; } + /** + * Retrieve set of TaskIDs corresponding to killable tasks for a given prefix + * + * @param taskPrefix + * @return + */ public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) { SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks()); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java index 45443fe..e5c3f57 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java @@ -2,7 +2,6 @@ package org.apache.myriad; import org.apache.myriad.configuration.MyriadConfiguration; import org.junit.Before; -import org.junit.Test; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -27,9 +26,4 @@ public class BaseConfigurableTest { cfgWithDocker = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-docker-info.yml"), MyriadConfiguration.class); } - - @Test - public void testMyriadConfiguration() throws Exception { - cfg.getFrameworkName(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java index c0cf187..9117e3b 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java @@ -1,23 +1,40 @@ package org.apache.myriad; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.mesos.Protos; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.policy.LeastAMNodesFirstPolicy; import org.apache.myriad.scheduler.MockSchedulerDriver; import org.apache.myriad.scheduler.MyriadDriver; import org.apache.myriad.scheduler.MyriadDriverManager; +import org.apache.myriad.scheduler.MyriadOperations; import org.apache.myriad.scheduler.yarn.MyriadCapacityScheduler; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; import org.apache.myriad.state.MockDispatcher; +import org.apache.myriad.state.MockRMContext; +import org.apache.myriad.state.MockRMNode; import org.apache.myriad.state.SchedulerState; import org.apache.myriad.webapp.HttpConnectorProvider; import org.apache.myriad.webapp.MyriadWebServer; @@ -30,15 +47,23 @@ import org.mortbay.jetty.servlet.ServletHolder; import com.google.inject.servlet.GuiceFilter; /** - * Factory for common objects utilized over 1..n Junit tests + * Factory for common standard and mock objects utilized for JUnit tests */ public class TestObjectFactory { - public static SchedulerState getSchedulerState(MyriadConfiguration cfg) { - SchedulerState state = new SchedulerState(new MyriadFileSystemRMStateStore()); + public static SchedulerState getSchedulerState(MyriadConfiguration cfg) throws Exception { + Configuration conf = new Configuration(); + SchedulerState state = new SchedulerState(TestObjectFactory.getStateStore(conf, false)); state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); return state; } + public static FileSystemRMStateStore getRMStateStore(Configuration conf) throws Exception { + FileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); + conf.set("yarn.resourcemanager.fs.state-store.uri", "/tmp"); + store.initInternal(conf); + return store; + } + public static MyriadDriverManager getMyriadDriverManager() { return new MyriadDriverManager(new MyriadDriver(new MockSchedulerDriver())); } @@ -52,7 +77,7 @@ public class TestObjectFactory { return scheduler; } - public static Server getJettyServer() { + private static Server getJettyServer() { Server server = new Server(); ServletHandler context = new ServletHandler(); ServletHolder holder = new ServletHolder(DefaultServlet.class); @@ -72,12 +97,14 @@ public class TestObjectFactory { return new MyriadWebServer(server, connector, new GuiceFilter()); } - public static MyriadFileSystemRMStateStore getStateStore(Configuration conf) throws Exception { + public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, boolean loadState) throws Exception { conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/"); MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); store.init(conf); store.start(); - store.loadState(); + if (loadState) { + store.loadState(); + } store.setRMDispatcher(new MockDispatcher()); return store; } @@ -87,4 +114,59 @@ public class TestObjectFactory { Protos.FrameworkID fid = FrameworkID.newBuilder().setValue(frameworkId).build(); return Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).setSlaveId(sid).setFrameworkId(fid).build(); } + + public static RMContext getRMContext(Configuration conf) throws Exception { + conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/"); + MockRMContext context = null; + Dispatcher dispatcher = new MockDispatcher(); + + RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter(); + AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher); + AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher); + RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context); + + context = new MockRMContext(); + context.setStateStore(TestObjectFactory.getStateStore(conf, false)); + context.setAmLivelinessMonitor(amLivelinessMonitor); + context.setAmFinishingMonitor(amFinishingMonitor); + context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + context.setRMDelegationTokenSecretManager(delegationTokenSecretManager); + return context; + } + + public static MyriadOperations getMyriadOperations(MyriadConfiguration cfg) throws Exception { + AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); + SchedulerState sState = TestObjectFactory.getSchedulerState(cfg); + sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); + + MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager(); + MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg); + CompositeInterceptor registry = new CompositeInterceptor(); + LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState); + return new MyriadOperations(cfg, sState, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration())); + } + + public static SchedulerNode getSchedulerNode(NodeId nodeId, int vCores, int memory) { + RMNode node = getMockRMNode(nodeId, vCores, memory); + return new FiCaSchedulerNode(node, true); + } + + public static RMNode getMockRMNode(NodeId nodeId, int vCores, int memory) { + MockRMNode node = new MockRMNode(nodeId, NodeState.NEW, new NodeBase("/tmp")); + node.setCommandPort(8041); + node.setHostName("0.0.0.0"); + node.setHttpPort(8042); + node.setRackName("r01n07"); + node.setHttpAddress("localhost:8042"); + node.setTotalCapability(getResource(vCores, memory)); + + return node; + } + + public static Resource getResource(int vCores, int memory) { + Resource resource = new ResourcePBImpl(); + resource.setVirtualCores(vCores); + resource.setMemory(memory); + return resource; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java index 6c039dd..78f3627 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java @@ -3,14 +3,9 @@ package org.apache.myriad.scheduler; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.mesos.Protos.FrameworkID; import org.apache.myriad.BaseConfigurableTest; import org.apache.myriad.TestObjectFactory; @@ -19,8 +14,6 @@ import org.apache.myriad.policy.LeastAMNodesFirstPolicy; import org.apache.myriad.scheduler.constraints.Constraint; import org.apache.myriad.scheduler.constraints.LikeConstraint; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; -import org.apache.myriad.state.MockDispatcher; -import org.apache.myriad.state.MockRMContext; import org.apache.myriad.state.SchedulerState; import org.apache.myriad.webapp.MyriadWebServer; import org.junit.Before; @@ -30,27 +23,31 @@ import org.junit.Test; * Unit tests for MyriadOperations class */ public class MyriadOperationsTest extends BaseConfigurableTest { - MyriadOperations ops; ServiceResourceProfile small; Constraint constraint = new LikeConstraint("localhost", "host-[0-9]*.example.com"); - SchedulerState sState; - - @Before - public void setUp() throws Exception { - super.setUp(); - AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); - //sState = new SchedulerState(new MyriadFileSystemRMStateStore()); - sState = TestObjectFactory.getSchedulerState(cfg); - sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); + MyriadWebServer webServer; + private SchedulerState getSchedulerState() throws Exception { + SchedulerState state = TestObjectFactory.getSchedulerState(this.cfg); + state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); + return state; + } + + private MyriadOperations getMyriadOperations(SchedulerState state) throws Exception { MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager(); - MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg); - CompositeInterceptor registry = new CompositeInterceptor(); - LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState); + AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); + CompositeInterceptor registry = new CompositeInterceptor(); + LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, state); manager.startDriver(); - ops = new MyriadOperations(cfg, sState, policy, manager, webServer, generateRMContext(scheduler)); + return new MyriadOperations(cfg, state, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration())); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + webServer = TestObjectFactory.getMyriadWebServer(cfg); generateProfiles(); } @@ -58,36 +55,21 @@ public class MyriadOperationsTest extends BaseConfigurableTest { small = new ServiceResourceProfile("small", 0.1, 512.0); } - private RMContext generateRMContext(AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler) throws Exception { - Configuration conf = new Configuration(); - MockRMContext context = null; - Dispatcher dispatcher = new MockDispatcher(); - - RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter(); - AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher); - AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher); - RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context); - - context = new MockRMContext(); - context.setStateStore(TestObjectFactory.getStateStore(conf)); - context.setAmLivelinessMonitor(amLivelinessMonitor); - context.setAmFinishingMonitor(amFinishingMonitor); - context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - context.setRMDelegationTokenSecretManager(delegationTokenSecretManager); - return context; - } - @Test public void testFlexUpAndFlexDownCluster() throws Exception { + SchedulerState sState = this.getSchedulerState(); + MyriadOperations ops = this.getMyriadOperations(sState); assertEquals(0, sState.getPendingTaskIds().size()); ops.flexUpCluster(small, 1, constraint); assertEquals(1, sState.getPendingTaskIds().size()); ops.flexDownCluster(small, constraint, 1); assertEquals(0, sState.getPendingTaskIds().size()); } - + @Test public void testFlexUpAndFlexDownService() throws Exception { + SchedulerState sState = this.getSchedulerState(); + MyriadOperations ops = this.getMyriadOperations(sState); ops.flexUpAService(1, "jobhistory"); assertEquals(1, sState.getPendingTasksByType("jobhistory").size()); ops.flexDownAService(1, "jobhistory"); @@ -96,17 +78,28 @@ public class MyriadOperationsTest extends BaseConfigurableTest { @Test(expected = MyriadBadConfigurationException.class) public void testFlexUpAServiceOverMaxInstances() throws Exception { + SchedulerState sState = this.getSchedulerState(); + MyriadOperations ops = this.getMyriadOperations(sState); + /* + * There is 1 jobhhistory task loaded from configuration file, so flexing up + * by two should result in MyriadBadConfigurationException + */ ops.flexUpAService(2, "jobhistory"); } @Test public void testGetFlexibleInstances() throws Exception { + SchedulerState sState = this.getSchedulerState(); + MyriadOperations ops = this.getMyriadOperations(sState); + assertEquals(0, ops.getFlexibleInstances("jobhistory").intValue()); ops.flexUpAService(1, "jobhistory"); assertEquals(1, ops.getFlexibleInstances("jobhistory").intValue()); } @Test public void testShutdownCluster() throws Exception { + SchedulerState sState = this.getSchedulerState(); + MyriadOperations ops = this.getMyriadOperations(sState); ops.shutdownFramework(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java new file mode 100644 index 0000000..f20e603 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/NodeStoreTest.java @@ -0,0 +1,36 @@ +package org.apache.myriad.scheduler.fgs; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.myriad.TestObjectFactory; +import org.junit.Test; + +/** + * Unit tests for NodeStore class + */ +public class NodeStoreTest { + NodeStore store = new NodeStore(); + SchedulerNode sNode = TestObjectFactory.getSchedulerNode(NodeId.newInstance("0.0.0.0", 8888), 2, 4096); + + @Test + public void testAddNode() throws Exception { + store.add(sNode); + assertTrue(store.isPresent("0.0.0.0")); + assertNotNull(store.getNode("0.0.0.0")); + } + + @Test + public void testRemoveNode() throws Exception { + if (!store.isPresent("0.0.0.0")) { + store.add(sNode); + } + store.remove("0.0.0.0"); + assertFalse(store.isPresent("0.0.0.0")); + assertNull(store.getNode("0.0.0.0")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java new file mode 100644 index 0000000..e1f4eb5 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManagerTest.java @@ -0,0 +1,91 @@ +package org.apache.myriad.scheduler.fgs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.mesos.Protos.TaskID; +import org.apache.myriad.BaseConfigurableTest; +import org.apache.myriad.TestObjectFactory; +import org.apache.myriad.scheduler.MockSchedulerDriver; +import org.apache.myriad.scheduler.MyriadDriver; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.TaskUtils; +import org.apache.myriad.scheduler.constraints.Constraint; +import org.apache.myriad.scheduler.constraints.LikeConstraint; +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests for YarnNodeCapacityManager + */ +public class YarnNodeCapacityManagerTest extends BaseConfigurableTest { + YarnNodeCapacityManager manager; + NodeStore store; + NodeId zNodeId = NodeId.newInstance("0.0.0.1", 8041); + TaskID zTaskId = TaskID.newBuilder().setValue("nm").build(); + NodeTask ntZero; + SchedulerNode zNode; + SchedulerState sState; + + @Before + public void setUp() throws Exception { + super.setUp(); + + AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); + InterceptorRegistry registry = TestObjectFactory.getInterceptorRegistry(); + sState = TestObjectFactory.getSchedulerState(cfg); + + RMContext context = TestObjectFactory.getRMContext(new Configuration()); + MyriadDriver driver = new MyriadDriver(new MockSchedulerDriver()); + store = new NodeStore(); + OfferLifecycleManager oManager = new OfferLifecycleManager(store, driver); + + zNode = TestObjectFactory.getSchedulerNode(zNodeId, 0, 0); + + manager = new YarnNodeCapacityManager(registry, scheduler, context, driver, oManager, store, sState, new TaskUtils(cfg)); + } + + private Set<NodeTask> getNodeTasks() { + Constraint cZero = new LikeConstraint("0.0.0.1", "host-[0-9]*.example.com"); + ServiceResourceProfile zProfile = new ServiceResourceProfile("zProfile", 0.0, 0.0, 0.0, 0.0); + + ntZero = new NodeTask(zProfile, cZero); + ntZero.setTaskPrefix("nm"); + ntZero.setHostname("0.0.0.1"); + + return Sets.newHashSet(ntZero); + } + + @Test + public void testAllowCallBacksForNode() throws Exception { + store.add(zNode); + sState.addNodes(getNodeTasks()); + sState.addTask(zTaskId, ntZero); + + sState.makeTaskActive(zTaskId); + assertEquals(1, sState.getActiveTasks().size()); + + YarnSchedulerInterceptor.CallBackFilter filter = manager.getCallBackFilter(); + assertTrue(filter.allowCallBacksForNode(zNodeId)); + } + + public void testIncrementNodeCapacity() throws Exception { + manager.incrementNodeCapacity(zNode.getRMNode(), TestObjectFactory.getResource(2, 2048)); + assertEquals(6, zNode.getTotalResource().getVirtualCores()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java new file mode 100644 index 0000000..f8cb700 --- /dev/null +++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/MockRMNode.java @@ -0,0 +1,203 @@ +package org.apache.myriad.state; + +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.net.Node; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; + +/** + * Mock implementation of RMNode interface for unit test cases + */ +public class MockRMNode implements RMNode { + String hostName; + NodeId nodeId; + String nodeAddress; + String httpAddress; + int commandPort; + int httpPort; + String healthReport; + long lastHealthReportTime; + Set<String> nodeLabels; + String nodeManagerVersion; + Resource totalCapability; + String rackName; + Node node; + NodeState nodeState; + List<ContainerId> containersToCleanUp; + List<ApplicationId> appsToCleanup; + List<UpdatedContainerInfo> containerUpdates; + NodeHeartbeatResponse heartbeatResponse; + + public MockRMNode(NodeId nodeId, NodeState nodeState, Node node) { + this.nodeId = nodeId; + this.nodeState = nodeState; + this.node = node; + } + + public NodeState getNodeState() { + return nodeState; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public void setNodeAddress(String nodeAddress) { + this.nodeAddress = nodeAddress; + } + + public void setHttpAddress(String httpAddress) { + this.httpAddress = httpAddress; + } + + public void setCommandPort(int commandPort) { + this.commandPort = commandPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public void setHealthReport(String healthReport) { + this.healthReport = healthReport; + } + + public void setNodeLabels(Set<String> nodeLabels) { + this.nodeLabels = nodeLabels; + } + + public void setNodeManagerVersion(String nodeManagerVersion) { + this.nodeManagerVersion = nodeManagerVersion; + } + + public void setTotalCapability(Resource totalCapability) { + this.totalCapability = totalCapability; + } + + public void setRackName(String rackName) { + this.rackName = rackName; + } + + public void setContainersToCleanUp(List<ContainerId> containersToCleanUp) { + this.containersToCleanUp = containersToCleanUp; + } + + public void setAppsToCleanup(List<ApplicationId> appsToCleanup) { + this.appsToCleanup = appsToCleanup; + } + + public void setLastHealthReportTime(long lastHealthReportTime) { + this.lastHealthReportTime = lastHealthReportTime; + } + + public void setContainerUpdates(List<UpdatedContainerInfo> containerUpdates) { + this.containerUpdates = containerUpdates; + } + + public void setHeartbeatResponse(NodeHeartbeatResponse heartbeatResponse) { + this.heartbeatResponse = heartbeatResponse; + } + + @Override + public NodeId getNodeID() { + return nodeId; + } + + @Override + public String getHostName() { + return hostName; + } + + @Override + public int getCommandPort() { + return commandPort; + } + + @Override + public int getHttpPort() { + return httpPort; + } + + @Override + public String getNodeAddress() { + return nodeAddress; + } + + @Override + public String getHttpAddress() { + return httpAddress; + } + + @Override + public String getHealthReport() { + return healthReport; + } + + @Override + public long getLastHealthReportTime() { + return lastHealthReportTime; + } + + @Override + public String getNodeManagerVersion() { + return nodeManagerVersion; + } + + @Override + public Resource getTotalCapability() { + return totalCapability; + } + + @Override + public String getRackName() { + return rackName; + } + + @Override + public Node getNode() { + return node; + } + + @Override + public NodeState getState() { + return nodeState; + } + + @Override + public List<ContainerId> getContainersToCleanUp() { + return containersToCleanUp; + } + + @Override + public List<ApplicationId> getAppsToCleanup() { + return appsToCleanup; + } + + @Override + public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { + //noop + } + + @Override + public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { + return heartbeatResponse; + } + + @Override + public List<UpdatedContainerInfo> pullContainerUpdates() { + return containerUpdates; + } + + @Override + public Set<String> getNodeLabels() { + return nodeLabels; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/dcccf2e3/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java index a3cdf4f..59e2d41 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/state/SchedulerStateTest.java @@ -26,7 +26,7 @@ public class SchedulerStateTest { @Before public void setUp() throws Exception { - MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(getConfiguration()); + MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(getConfiguration(), false); state = new SchedulerState(store); }