Implementation of MYRIAD-229, MYRIAD-237, MYRIAD-238, MYRIAD-225 JIRA: [MYRIAD-225] https://issues.apache.org/jira/browse/MYRIAD-225 [MYRIAD-229] https://issues.apache.org/jira/browse/MYRIAD-239 [MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237 [MYRIAD-237] https://issues.apache.org/jira/browse/MYRIAD-237 Pull Request: Closes #91
Author: hokiegeek2 <hokiege...@gmail.com> Date: Wed Aug 17 15:21:56 2016 -0400 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/577c30b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/577c30b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/577c30b1 Branch: refs/heads/master Commit: 577c30b1abdaa30313251c887c014aebac3fd93c Parents: 7aea259 Author: hokiegeek2 <hokiege...@gmail.com> Authored: Wed Aug 17 15:21:56 2016 -0400 Committer: darinj <dar...@apache.org> Committed: Tue Aug 30 13:08:36 2016 -0400 ---------------------------------------------------------------------- build.gradle | 1 - gradle/spock.gradle | 46 --- .../src/main/java/org/apache/myriad/Main.java | 18 +- .../java/org/apache/myriad/MyriadModule.java | 2 +- .../myriad/api/SchedulerStateResource.java | 2 +- .../configuration/NodeManagerConfiguration.java | 18 +- .../myriad/scheduler/MyriadOperations.java | 9 +- .../org/apache/myriad/scheduler/Rebalancer.java | 4 +- .../apache/myriad/scheduler/SchedulerUtils.java | 2 +- .../apache/myriad/scheduler/TaskFactory.java | 2 +- .../apache/myriad/scheduler/TaskTerminator.java | 4 +- .../org/apache/myriad/scheduler/TaskUtils.java | 8 +- .../handlers/StatusUpdateEventHandler.java | 2 +- .../scheduler/fgs/NMHeartBeatHandler.java | 67 +++-- .../scheduler/fgs/OfferLifecycleManager.java | 67 +++-- .../scheduler/fgs/YarnNodeCapacityManager.java | 10 +- .../scheduler/yarn/MyriadFairScheduler.java | 5 + .../org/apache/myriad/state/SchedulerState.java | 14 +- .../MyriadFileSystemRMStateStoreTest.java | 22 +- .../org/apache/myriad/BaseConfigurableTest.java | 77 ++++- .../org/apache/myriad/MyriadTestModule.java | 32 +- .../org/apache/myriad/TestObjectFactory.java | 290 +++++++++++++------ .../myriad/api/ArtifactsResourceTest.java | 17 ++ .../myriad/api/SchedulerStateResourceTest.java | 17 ++ .../configuration/MyriadConfigurationTest.java | 5 +- .../myriad/health/HealthCheckUtilsTest.java | 17 ++ .../health/MesosDriverHealthCheckTest.java | 17 ++ .../myriad/scheduler/MyriadOperationsTest.java | 97 ++++--- .../myriad/scheduler/SchedulerUtilsSpec.groovy | 90 ------ .../myriad/scheduler/SchedulerUtilsTest.java | 89 ++++++ .../apache/myriad/scheduler/TestTaskUtils.java | 3 +- .../constraints/LikeConstraintSpec.groovy | 93 ------ .../constraints/LikeConstraintTest.java | 86 ++++++ .../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 175 ----------- .../scheduler/fgs/NMHeartBeatHandlerSpec.groovy | 114 -------- .../scheduler/fgs/NMHeartBeatHandlerTest.java | 240 +++++++++++++++ .../myriad/scheduler/fgs/NodeStoreTest.java | 23 +- .../apache/myriad/scheduler/fgs/NodeTest.java | 89 ++++++ .../fgs/OfferLifeCycleManagerTest.java | 22 +- .../fgs/YarnNodeCapacityManagerSpec.groovy | 143 --------- .../fgs/YarnNodeCapacityManagerTest.java | 149 ++++++---- .../org/apache/myriad/state/ClusterTest.java | 17 ++ .../org/apache/myriad/state/MockDispatcher.java | 28 +- .../org/apache/myriad/state/MockFuture.java | 17 ++ .../java/org/apache/myriad/state/MockRMApp.java | 17 ++ .../org/apache/myriad/state/MockRMContext.java | 25 +- .../org/apache/myriad/state/MockRMNode.java | 17 ++ .../java/org/apache/myriad/state/MockState.java | 17 ++ .../org/apache/myriad/state/MockVariable.java | 17 ++ .../apache/myriad/state/MyriadStateTest.java | 17 ++ .../org/apache/myriad/state/NodeTaskTest.java | 17 ++ .../apache/myriad/state/SchedulerStateTest.java | 159 +++++++--- .../state/utils/ByteBufferSupportTest.java | 17 ++ .../webapp/HttpConnectorProviderTest.java | 17 ++ .../myriad/webapp/MyriadWebServerTest.java | 17 ++ .../resources/myriad-config-test-default.yml | 39 +-- 56 files changed, 1590 insertions(+), 1036 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index b48cfc9..a551ba0 100644 --- a/build.gradle +++ b/build.gradle @@ -57,7 +57,6 @@ subprojects { apply plugin: 'java' apply plugin: 'application' - apply from: "$rootDir/gradle/spock.gradle" apply from: "$rootDir/gradle/quality.gradle" sourceCompatibility = '1.7' http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/gradle/spock.gradle ---------------------------------------------------------------------- diff --git a/gradle/spock.gradle b/gradle/spock.gradle deleted file mode 100644 index fc974ec..0000000 --- a/gradle/spock.gradle +++ /dev/null @@ -1,46 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -// used for unit tests -apply plugin: 'groovy' - -def spockVersion = '1.0-groovy-2.4' -def powermockVersion = "1.6.1" - -dependencies { - - testCompile "org.codehaus.groovy:groovy-all:2.4.1" - testCompile "org.spockframework:spock-core:$spockVersion" - - testCompile 'cglib:cglib-nodep:2.2.2' // need to mock classes - - // useful to mock out statics and final classes in Java. - testCompile "org.powermock:powermock-module-junit4:$powermockVersion" - testCompile "org.powermock:powermock-module-junit4-rule:$powermockVersion" - testCompile "org.powermock:powermock-classloading-xstream:$powermockVersion" - testCompile "org.powermock:powermock-api-mockito:$powermockVersion" -} - -// for spock to live in test java tree -sourceSets { - test { - groovy { srcDir 'src/test/java' } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java index 8c028f1..463c4c5 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -79,18 +79,6 @@ public class Main { private static Injector injector; - /** - * Main is the bootstrap class for the Myriad scheduler, managing the lifecycles of - * the following components: - * - * 1. MyriadDriverManager - * 2. MyriadWebServer - * 3. TaskTerminator - * 4. HealthCheckRegistry - * - * Main uses the Guice Injector framework to manage the Myriad object graph and is - * configured by myriad-config-default.yml - */ public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry registry) throws Exception { MyriadModule myriadModule = new MyriadModule("myriad-config-default.yml", hadoopConf, yarnScheduler, rmContext, registry); @@ -217,19 +205,19 @@ public class Main { SchedulerState schedulerState = injector.getInstance(SchedulerState.class); Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>(); - launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size()); return; } - launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size()); return; } - launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size()); return; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java index bb560a4..4bcb628 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java @@ -102,7 +102,7 @@ public class MyriadModule extends AbstractModule { bind(NMHeartBeatHandler.class).asEagerSingleton(); MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); - mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); + mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java index ae21c69..2050199 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java @@ -51,7 +51,7 @@ public class SchedulerStateResource { @GET public GetSchedulerStateResponse getState() { return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection( - state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks())); + state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTaskIds())); } private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java index 56ea43d..79a8301 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java @@ -56,8 +56,13 @@ public class NodeManagerConfiguration { /** * Default NodeManager Mesos task prefix */ - public static final String NM_TASK_PREFIX = "nm"; - + public static final String DEFAULT_NM_TASK_PREFIX = "nm"; + + /** + * Default max CPU cores for NodeManager JVM + */ + public static final double DEFAULT_NM_MAX_CPUS = 24; + /** * Translates to -Xmx for the NodeManager JVM. */ @@ -85,7 +90,10 @@ public class NodeManagerConfiguration { */ @JsonProperty private Boolean cgroups; - + + @JsonProperty + private Double maxCpus; + private Double generateNodeManagerMemory() { return (NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD); } @@ -117,4 +125,8 @@ public class NodeManagerConfiguration { public boolean getCgroups() { return Optional.fromNullable(cgroups).or(DEFAULT_NM_CGROUPS); } + + public Double getMaxCpus() { + return Optional.fromNullable(maxCpus).or(DEFAULT_NM_MAX_CPUS); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java index fb1f1bb..13e83cd 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java @@ -40,6 +40,7 @@ import org.apache.myriad.webapp.MyriadWebServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.inject.Inject; @@ -49,7 +50,6 @@ import com.google.inject.Inject; public class MyriadOperations { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadOperations.class); private final SchedulerState schedulerState; - private MyriadConfiguration cfg; private NodeScaleDownPolicy nodeScaleDownPolicy; private MyriadDriverManager driverManager; @@ -69,12 +69,17 @@ public class MyriadOperations { myriadStateStore = (MyriadStateStore) rmContext.getStateStore(); } } + + @VisibleForTesting + protected SchedulerState getSchedulerState() { + return schedulerState; + } public void flexUpCluster(ServiceResourceProfile serviceResourceProfile, int instances, Constraint constraint) { Collection<NodeTask> nodes = new HashSet<>(); for (int i = 0; i < instances; i++) { NodeTask nodeTask = new NodeTask(serviceResourceProfile, constraint); - nodeTask.setTaskPrefix(NodeManagerConfiguration.NM_TASK_PREFIX); + nodeTask.setTaskPrefix(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX); nodes.add(nodeTask); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java index 3c3ce79..aa09f89 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java @@ -46,8 +46,8 @@ public class Rebalancer implements Runnable { @Override public void run() { - final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX); - final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX); + final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX); + final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX); LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size()); if (activeIds.size() < 1 && pendingIds.size() < 1) { myriadOperations.flexUpCluster(profileManager.get("small"), 1, null); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java index 693ae63..583be2f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java @@ -59,7 +59,7 @@ public class SchedulerUtils { * @return */ public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) { - for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)) { + for (NodeTask activeNMTask : state.getActiveTasksByType(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX)) { if (activeNMTask.getProfile().getCpus() == 0 && activeNMTask.getProfile().getMemory() == 0 && activeNMTask.getHostname().equals(hostName)) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java index 7e63e0d..8c806c1 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java @@ -150,7 +150,7 @@ public abstract class TaskFactory { * Simple helper to convert Mesos Range Resource to a list of longs. */ protected List<Long> rangesConverter(List<Protos.Resource> rangeResources) { - List<Long> ret = new ArrayList(); + List<Long> ret = new ArrayList<Long>(); for (Protos.Resource range : rangeResources) { ret.add(range.getRanges().getRange(0).getBegin()); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java index 4110b37..e695b40 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java @@ -62,12 +62,12 @@ public class TaskTerminator implements Runnable { @Override public void run() { //If there are 1..n killable tasks, proceed; otherwise, simply return - if (CollectionUtils.isNotEmpty(schedulerState.getKillableTasks())) { + if (CollectionUtils.isNotEmpty(schedulerState.getKillableTaskIds())) { /* * Clone the killable task collection, iterate through all tasks, and * process any pending and/or non-pending tasks */ - Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTasks()); + Set<TaskID> killableTasks = Sets.newHashSet(schedulerState.getKillableTaskIds()); Status driverStatus = driverManager.getDriverStatus(); //TODO (hokiegeek2) Can the DriverManager be restarted? If not, should the ResourceManager stop? http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java index 4bd60bc..b26bdaf 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java @@ -46,7 +46,11 @@ public class TaskUtils { public double getNodeManagerMemory() { return cfg.getNodeManagerConfiguration().getJvmMaxMemoryMB(); } - + + public double getNodeManagerMaxCpus() { + return cfg.getNodeManagerConfiguration().getMaxCpus(); + } + public double getNodeManagerCpus() { return cfg.getNodeManagerConfiguration().getCpus(); } @@ -80,7 +84,7 @@ public class TaskUtils { */ public Iterable<Protos.Resource> getScalarResource(Protos.Offer offer, String name, Double value, Double used) { String role = cfg.getFrameworkRole(); - List<Protos.Resource> resources = new ArrayList<>(); + List<Protos.Resource> resources = new ArrayList<Protos.Resource>(); double resourceDifference = 0; //used to determine the resource difference of value and the resources requested from role * //Find role by name, must loop through resources http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java index 079df4b..b787b48 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -124,6 +124,6 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> LOGGER.info("Removed {} task with id {}", stopReason, taskId); } private boolean taskIsKillable(TaskID taskId) { - return schedulerState.getKillableTasks().contains(taskId); + return schedulerState.getKillableTaskIds().contains(taskId); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java index 86bbc8c..e16e8b4 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Offer; +import org.apache.myriad.configuration.NodeManagerConfiguration; import org.apache.myriad.scheduler.MyriadDriver; import org.apache.myriad.scheduler.SchedulerUtils; import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; @@ -56,11 +57,12 @@ public class NMHeartBeatHandler extends BaseInterceptor { private final OfferLifecycleManager offerLifecycleMgr; private final NodeStore nodeStore; private final SchedulerState state; + private final NodeManagerConfiguration conf; @Inject public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver, YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr, - NodeStore nodeStore, SchedulerState state) { + NodeStore nodeStore, SchedulerState state, NodeManagerConfiguration conf) { if (registry != null) { registry.register(this); @@ -72,6 +74,7 @@ public class NMHeartBeatHandler extends BaseInterceptor { this.offerLifecycleMgr = offerLifecycleMgr; this.nodeStore = nodeStore; this.state = state; + this.conf = conf; } @Override @@ -88,9 +91,11 @@ public class NMHeartBeatHandler extends BaseInterceptor { public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) { switch (event.getType()) { case STARTED: + // Since the RMNode was just started, it should not have a non-zero capacity RMNode rmNode = context.getRMNodes().get(event.getNodeId()); - Resource totalCapability = rmNode.getTotalCapability(); - if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) { + + if (isNonZeroCapacityNode(rmNode)) { + Resource totalCapability = rmNode.getTotalCapability(); logger.warn( "FineGrainedScaling feature got invoked for a NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the " + "NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability.getVirtualCores()); @@ -109,6 +114,12 @@ public class NMHeartBeatHandler extends BaseInterceptor { } @VisibleForTesting + protected boolean isNonZeroCapacityNode(RMNode node) { + Resource resource = node.getTotalCapability(); + return (resource.getMemory() != 0 || resource.getVirtualCores() != 0); + } + + @VisibleForTesting protected void handleStatusUpdate(RMNodeEvent event, RMContext context) { if (!(event instanceof RMNodeStatusEvent)) { logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName()); @@ -124,25 +135,42 @@ public class NMHeartBeatHandler extends BaseInterceptor { host.snapshotRunningContainers(); } - // New capacity of the node = - // resources under use on the node (due to previous offers) + - // new resources offered by mesos for the node - yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos( - hostName))); + /* + * Set the new node capacity which is the sum of the current node resources plus those offered by Mesos. + * If the sum is greater than the max capacity of the node, reject the offer. + */ + Resource offeredResources = getNewResourcesOfferedByMesos(hostName); + Resource currentResources = getResourcesUnderUse(statusEvent); + + if (offerWithinResourceLimits(currentResources, offeredResources)) { + yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(currentResources, offeredResources)); + logger.info("Updated resources for {} with {} cores and {} memory", rmNode.getNode().getName(), + offeredResources.getVirtualCores(), offeredResources.getMemory()); + } else { + logger.info("Did not update {} with {} cores and {} memory, over max cpu cores and/or max memory", + rmNode.getNode().getName(), offeredResources.getVirtualCores(), offeredResources.getMemory()); + } } - - private Resource getNewResourcesOfferedByMesos(String hostname) { + + @VisibleForTesting + protected boolean offerWithinResourceLimits(Resource currentResources, Resource offeredResources) { + int newMemory = currentResources.getMemory() + offeredResources.getMemory(); + int newCores = currentResources.getVirtualCores() + offeredResources.getVirtualCores(); + + return (newMemory <= conf.getJvmMaxMemoryMB() && newCores <= conf.getMaxCpus()); + } + + @VisibleForTesting + protected Resource getNewResourcesOfferedByMesos(String hostname) { OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname); - if (feed == null) { - logger.debug("No offer feed for: {}", hostname); - return Resource.newInstance(0, 0); - } List<Offer> offers = new ArrayList<>(); Protos.Offer offer; + while ((offer = feed.poll()) != null) { - offers.add(offer); + offers.add(offer); offerLifecycleMgr.markAsConsumed(offer); } + Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers); if (logger.isDebugEnabled()) { @@ -153,10 +181,11 @@ public class NMHeartBeatHandler extends BaseInterceptor { return fromMesosOffers; } - private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) { + @VisibleForTesting + protected Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) { Resource usedResources = Resource.newInstance(0, 0); for (ContainerStatus status : statusEvent.getContainers()) { - if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) { + if (containerInUse(status)) { RMContainer rmContainer = yarnScheduler.getRMContainer(status.getContainerId()); // (sdaingade) This check is needed as RMContainer information may not be populated // immediately after a RM restart. @@ -167,4 +196,8 @@ public class NMHeartBeatHandler extends BaseInterceptor { } return usedResources; } + + private boolean containerInUse(ContainerStatus status) { + return (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java index e4cec83..9698fe7 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; /** * Manages the Mesos offers tracked by Myriad. @@ -57,10 +58,31 @@ public class OfferLifecycleManager { this.myriadDriver = myriadDriver; } - public OfferFeed getOfferFeed(String hostname) { - return offerFeedMap.get(hostname); + /** + * Retrieves the OfferFeed for a host and, if null, creates a new OfferFeed + * for a host that has either been added to, or rejoined, the Mesos cluster. + * + * @param hostname + * @return feed + */ + @VisibleForTesting + protected OfferFeed getOfferFeed(String hostname) { + OfferFeed feed = offerFeedMap.get(hostname); + if (feed == null) { + feed = new OfferFeed(); + offerFeedMap.put(hostname, feed); + } + return feed; } - + + protected Optional<Node> getOfferNode(String host) { + return Optional.fromNullable(nodeStore.getNode(host)); + } + + protected Optional<Offer> getOffer(OfferFeed feed) { + return Optional.fromNullable(feed.poll()); + } + public void declineOffer(Protos.Offer offer) { myriadDriver.getDriver().declineOffer(offer.getId()); LOGGER.debug("Declined offer {}", offer.getId()); @@ -69,16 +91,12 @@ public class OfferLifecycleManager { public void addOffers(Protos.Offer... offers) { for (Protos.Offer offer : offers) { String hostname = offer.getHostname(); - Node node = nodeStore.getNode(hostname); - if (node != null) { - OfferFeed feed = offerFeedMap.get(hostname); - if (feed == null) { - feed = new OfferFeed(); - offerFeedMap.put(hostname, feed); - } + + Optional<Node> optNode = getOfferNode(hostname); + if (optNode.isPresent()) { + OfferFeed feed = getOfferFeed(hostname); feed.add(offer); - - node.setSlaveId(offer.getSlaveId()); + optNode.get().setSlaveId(offer.getSlaveId()); LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", hostname, offer.getId().getValue()); } else { @@ -98,6 +116,17 @@ public class OfferLifecycleManager { consumedOffer.add(offer); } + @VisibleForTesting + protected ConsumedOffer getConsumedOffer(String hostname) { + ConsumedOffer cOffer = consumedOfferMap.get(hostname); + if (cOffer == null) { + cOffer = new ConsumedOffer(); + consumedOfferMap.put(hostname, cOffer); + } + + return cOffer; + } + public ConsumedOffer drainConsumedOffer(String hostname) { return consumedOfferMap.remove(hostname); } @@ -105,18 +134,14 @@ public class OfferLifecycleManager { public void declineOutstandingOffers(String hostname) { int numOutStandingOffers = 0; OfferFeed offerFeed = getOfferFeed(hostname); - Offer offer; - while (offerFeed != null && (offer = offerFeed.poll()) != null) { - declineOffer(offer); + Optional<Offer> optOffer; + + while ((optOffer = getOffer(offerFeed)).isPresent()) { + declineOffer(optOffer.get()); numOutStandingOffers++; } if (numOutStandingOffers > 0) { LOGGER.info("Declined {} outstanding offers for host {}", numOutStandingOffers, hostname); } } - - @VisibleForTesting - public ConsumedOffer getConsumedOffer(String hostname) { - return consumedOfferMap.get(hostname); - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index 8f7c6f5..52cdfeb 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -18,14 +18,12 @@ */ package org.apache.myriad.scheduler.fgs; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; + import javax.inject.Inject; import org.apache.hadoop.yarn.api.records.Container; @@ -59,6 +57,10 @@ import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + /** * Manages the capacity exposed by NodeManager. It uses the offers available * from Mesos to inflate the node capacity and lets ResourceManager make the @@ -316,7 +318,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { // as this is now cached in the NodeTask object in scheduler state. Protos.ExecutorInfo executorInfo = node.getExecInfo(); if (executorInfo == null) { - executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX) + executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX) .getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build(); node.setExecInfo(executorInfo); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java index 5251f19..b81b881 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; @@ -101,5 +102,9 @@ public class MyriadFairScheduler extends FairScheduler { super.handle(event); this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); } + + public void addNode(FSSchedulerNode node) { + this.nodes.put(node.getNodeID(), node); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java index 8b5fb51..7ee2b62 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/state/SchedulerState.java @@ -169,10 +169,10 @@ public class SchedulerState { * Return a list of TaskIDs corresponding to all killable tasks * @return */ - public synchronized Set<Protos.TaskID> getKillableTasks() { + public synchronized Set<Protos.TaskID> getKillableTaskIds() { Set<Protos.TaskID> returnSet = new HashSet<>(); for (Map.Entry<String, SchedulerStateForType> entry : statesForTaskType.entrySet()) { - returnSet.addAll(entry.getValue().getKillableTasks()); + returnSet.addAll(entry.getValue().getKillableTaskIds()); } return returnSet; } @@ -183,9 +183,9 @@ public class SchedulerState { * @param taskPrefix * @return */ - public synchronized Set<Protos.TaskID> getKillableTasks(String taskPrefix) { + public synchronized Set<Protos.TaskID> getKillableTaskIds(String taskPrefix) { SchedulerStateForType stateTask = statesForTaskType.get(taskPrefix); - return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTasks()); + return (stateTask == null ? new HashSet<Protos.TaskID>() : stateTask.getKillableTaskIds()); } public synchronized void removeTask(Protos.TaskID taskId) { @@ -384,7 +384,7 @@ public class SchedulerState { try { StoreContext sc = new StoreContext(frameworkId, tasks, getPendingTaskIds(), getStagingTaskIds(), getActiveTaskIds(), - getLostTaskIds(), getKillableTasks()); + getLostTaskIds(), getKillableTaskIds()); stateStore.storeMyriadState(sc); } catch (Exception e) { LOGGER.error("Failed to update scheduler state to state store", e); @@ -411,7 +411,7 @@ public class SchedulerState { LOGGER.debug("State Store state includes frameworkId: {}, pending tasks count: {}, staging tasks count: {} " + "active tasks count: {}, lost tasks count: {}, and killable tasks count: {}", frameworkId.getValue(), this.getPendingTaskIds().size(), this.getStagingTaskIds().size(), this.getActiveTaskIds().size(), - this.getLostTaskIds().size(), this.getKillableTasks().size()); + this.getLostTaskIds().size(), this.getKillableTaskIds().size()); } } catch (Exception e) { LOGGER.error("Failed to read scheduler state from state store", e); @@ -545,7 +545,7 @@ public class SchedulerState { return Collections.unmodifiableSet(this.lostTasks); } - public synchronized Set<Protos.TaskID> getKillableTasks() { + public synchronized Set<Protos.TaskID> getKillableTaskIds() { return Collections.unmodifiableSet(this.killableTasks); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java index a0a9ed1..df3e35b 100644 --- a/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java +++ b/myriad-scheduler/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStoreTest.java @@ -6,7 +6,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.myriad.state.MockDispatcher; +import org.apache.myriad.TestObjectFactory; import org.apache.myriad.state.MockRMApp; import org.junit.Test; @@ -17,13 +17,16 @@ public class MyriadFileSystemRMStateStoreTest { @Test public void testInit() throws Exception { - Configuration conf = getConfiguration(); + Configuration conf = new Configuration(); + conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + "/tmp/myriad-file-system-rm-state-store-test"); MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); assertTrue(store.isInState(STATE.NOTINITED)); store.init(conf); assertTrue(store.isInState(STATE.INITED)); - store.startInternal(); + store.start(); + assertTrue(store.isInState(STATE.STARTED)); store.close(); + assertTrue(store.isInState(STATE.STOPPED)); } @Test @@ -51,19 +54,8 @@ public class MyriadFileSystemRMStateStoreTest { } private MyriadFileSystemRMStateStore getInitializedStore() throws Exception { - Configuration conf = getConfiguration(); - MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); - store.init(conf); - store.startInternal(); - store.loadState(); + MyriadFileSystemRMStateStore store = TestObjectFactory.getStateStore(new Configuration(), "/tmp/myriad-file-system-rm-state-store-test"); store.loadMyriadState(); - store.setRMDispatcher(new MockDispatcher()); return store; } - - private Configuration getConfiguration() { - Configuration conf = new Configuration(); - conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/"); - return conf; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java index e5c3f57..13b97de 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/BaseConfigurableTest.java @@ -1,6 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad; +import java.io.File; +import java.net.URL; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; import org.apache.myriad.configuration.MyriadConfiguration; +import org.junit.After; import org.junit.Before; import com.fasterxml.jackson.databind.ObjectMapper; @@ -15,15 +38,63 @@ public class BaseConfigurableTest { protected MyriadConfiguration cfg; protected MyriadConfiguration cfgWithRole; protected MyriadConfiguration cfgWithDocker; + protected String baseStateStoreDirectory = StringUtils.EMPTY; + /** + * This is normally overridden in derived classes. Be sure to invoke this implementation; + * otherwise, cfg, cfgWithRole, and cfgWithDocker will all be null. + * + * @throws Exception + */ @Before public void setUp() throws Exception { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default.yml"), + cfg = mapper.readValue(getConfURL("myriad-config-test-default.yml"), MyriadConfiguration.class); - cfgWithRole = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-framework-role.yml"), + cfgWithRole = mapper.readValue(getConfURL("myriad-config-test-default-with-framework-role.yml"), MyriadConfiguration.class); - cfgWithDocker = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-test-default-with-docker-info.yml"), + cfgWithDocker = mapper.readValue(getConfURL("myriad-config-test-default-with-docker-info.yml"), MyriadConfiguration.class); } + + /** + * Deletes the directories and files that back the MyriadFileSystemRMStateStore to ensure there + * is no stale state within the MyriadFileSystemRMStateStore that could result in race conditions + * depending upon how the unit tests are executed. + * + * @throws Exception + */ + protected void resetStoreState() throws Exception { + checkConfiguration(); + File rootFile = new File(baseStateStoreDirectory + "/FSRMStateRoot/RMMyriadRoot"); + //Delete directory if present and recursively create directory path + FileUtils.deleteDirectory(rootFile); + FileUtils.forceMkdir(rootFile); + + File storeFile = new File(rootFile.getAbsolutePath() + "/MyriadState"); + + if (!storeFile.createNewFile()) { + throw new IllegalStateException(rootFile.getAbsolutePath() + "/MyriadState could not be created"); + } + } + + /** + * Confirms the configuration of object graph is correct + * + * @throws IllegalStateException + */ + protected void checkConfiguration() throws IllegalStateException { + if (StringUtils.isEmpty(baseStateStoreDirectory)) { + throw new IllegalStateException("The baseStateStoreDirectory must be set, preferably in overridden setUp method"); + } + } + + private URL getConfURL(String file) { + return Thread.currentThread().getContextClassLoader().getResource(file); + } + + @After + public void cleanUp() throws Exception { + FileUtils.deleteDirectory(new File(baseStateStoreDirectory)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java index dc7a00b..4652f1c 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/MyriadTestModule.java @@ -18,15 +18,27 @@ package org.apache.myriad; -import com.fasterxml.jackson.databind.*; -import com.fasterxml.jackson.dataformat.yaml.*; -import com.google.inject.*; -import com.google.inject.multibindings.*; -import java.io.*; -import java.util.*; -import org.apache.myriad.configuration.*; -import org.apache.myriad.scheduler.*; -import org.slf4j.*; +import java.io.IOException; +import java.util.Map; + +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.scheduler.ExecutorCommandLineGenerator; +import org.apache.myriad.scheduler.NMExecutorCommandLineGenerator; +import org.apache.myriad.scheduler.NMTaskFactory; +import org.apache.myriad.scheduler.ServiceTaskFactory; +import org.apache.myriad.scheduler.TaskFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.inject.multibindings.MapBinder; /** * AbstractModule extension for UnitTests @@ -57,7 +69,7 @@ public class MyriadTestModule extends AbstractModule { bind(MyriadConfiguration.class).toInstance(cfg); MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); - mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); + mapBinder.addBinding(NodeManagerConfiguration.DEFAULT_NM_TASK_PREFIX).to(NMTaskFactory.class).in(Scopes.SINGLETON); Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java index 9117e3b..43014fa 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/TestObjectFactory.java @@ -1,40 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad; +import java.util.HashMap; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.net.NodeBase; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeHealthStatusPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.mesos.Protos; +import org.apache.mesos.Protos.ExecutorID; +import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.SlaveID; +import org.apache.mesos.Protos.Value.Type; +import org.apache.mesos.SchedulerDriver; import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.policy.LeastAMNodesFirstPolicy; +import org.apache.myriad.scheduler.ExtendedResourceProfile; import org.apache.myriad.scheduler.MockSchedulerDriver; import org.apache.myriad.scheduler.MyriadDriver; import org.apache.myriad.scheduler.MyriadDriverManager; -import org.apache.myriad.scheduler.MyriadOperations; +import org.apache.myriad.scheduler.NMProfile; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.constraints.LikeConstraint; import org.apache.myriad.scheduler.yarn.MyriadCapacityScheduler; +import org.apache.myriad.scheduler.yarn.MyriadFairScheduler; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; import org.apache.myriad.state.MockDispatcher; import org.apache.myriad.state.MockRMContext; -import org.apache.myriad.state.MockRMNode; +import org.apache.myriad.state.MyriadStateStore; +import org.apache.myriad.state.NodeTask; import org.apache.myriad.state.SchedulerState; import org.apache.myriad.webapp.HttpConnectorProvider; import org.apache.myriad.webapp.MyriadWebServer; @@ -44,28 +81,165 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHandler; import org.mortbay.jetty.servlet.ServletHolder; +import com.google.common.collect.Lists; import com.google.inject.servlet.GuiceFilter; /** - * Factory for common standard and mock objects utilized for JUnit tests + * Factory for common objects utilized over 1..n JUnit tests */ public class TestObjectFactory { - public static SchedulerState getSchedulerState(MyriadConfiguration cfg) throws Exception { - Configuration conf = new Configuration(); - SchedulerState state = new SchedulerState(TestObjectFactory.getStateStore(conf, false)); + + /** + * Returns a new RMContainer corresponding to the RMNode and RMContext. The RMContainer is the + * ResourceManager's view of an application container per the Hadoop docs + * + * @param node + * @param context + * @param appId + * @param cores + * @param memory + * @return RMContainer + */ + public static RMContainer getRMContainer(RMNode node, RMContext context, int appId, int cores, int memory) { + ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance( + ApplicationId.newInstance(123456789, 1), 1), appId); + + Container container = Container.newInstance(containerId, node.getNodeID(), node.getHttpAddress(), + Resources.createResource(memory, cores), null, null); + return new RMContainerImpl(container, containerId.getApplicationAttemptId(), node.getNodeID(), "user1", context); + } + + public static RMNodeStatusEvent getRMStatusEvent(RMNode node) { + NodeId id = node.getNodeID(); + NodeHealthStatus hStatus = NodeHealthStatusPBImpl.newInstance(true, "HEALTHY", System.currentTimeMillis()); + List<ContainerStatus> cStatus = Lists.newArrayList(getContainerStatus(node)); + List<ApplicationId> keepAliveIds = Lists.newArrayList(getApplicationId(node.getHttpPort())); + NodeHeartbeatResponse response = new NodeHeartbeatResponsePBImpl(); + + return new RMNodeStatusEvent(id, hStatus, cStatus, keepAliveIds, response); + } + + private static ContainerStatus getContainerStatus(RMNode node) { + ContainerStatus status = new ContainerStatusPBImpl(); + return status; + } + + private static ApplicationId getApplicationId(int id) { + return ApplicationId.newInstance(System.currentTimeMillis(), id); + } + /** + * Returns a ServiceResourceProfile or ExtendedResourceProfile object depending upon + * whether the execCores and execMemory parameters are null or non-null, respectively + * + * @param profileName + * @param cores + * @param memory + * @param execCores + * @param execMemory + * @return ServiceResourceProfile if execCores and execMemory are null, ExtendedResourceProfile otherwise + */ + public static ServiceResourceProfile getServiceResourceProfile(String profileName, Double cores, Double memory, + Long execCores, Long execMemory) { + if (isExtendedResource(execCores, execMemory)) { + NMProfile nmProfile = new NMProfile(profileName, execCores, execMemory); + return new ExtendedResourceProfile(nmProfile, cores, memory, new HashMap<String, Long>()); + } + return new ServiceResourceProfile(profileName, cores, memory, new HashMap<String, Long>()); + } + + private static boolean isExtendedResource(Long execCores, Long execMemory) { + return execCores != null && execMemory != null; + } + + /** + * Returns a NodeTask with either a ServiceResourceProfile or an ExtendedResourceProfile, + * depending upon whether execCores and execMemory are null or non-null, respectively + * + * @param profileName + * @param hostName + * @param cores + * @param memory + * @param execCores + * @param execMemory + * @return NodeTask + */ + public static NodeTask getNodeTask(String profileName, String hostName, Double cores, Double memory, + Long execCores, Long execMemory) { + NodeTask task = new NodeTask(getServiceResourceProfile(profileName, cores, memory, execCores, execMemory), + new LikeConstraint(hostName, "host-[0-9]*.example.com")); + task.setHostname(hostName); + task.setTaskPrefix("nm"); + task.setSlaveId(SlaveID.newBuilder().setValue(profileName + "-" + hostName).build()); + task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")). + setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build()); + return task; + } + + /** + * Returns a NodeTask given a ServiceResourceProfile and hostname + * + * @param hostName + * @param profile + * @return + */ + public static NodeTask getNodeTask(String hostName, ServiceResourceProfile profile) { + NodeTask task = new NodeTask(profile, new LikeConstraint(hostName, "host-[0-9]*.example.com")); + task.setHostname(hostName); + task.setTaskPrefix("nm"); + task.setSlaveId(SlaveID.newBuilder().setValue(profile.getName() + "-" + hostName).build()); + task.setExecutorInfo(ExecutorInfo.newBuilder().setExecutorId(ExecutorID.newBuilder().setValue("exec")). + setCommand(org.apache.mesos.Protos.CommandInfo.newBuilder().setValue("command")).build()); + return task; + } + + public static RMNode getRMNode(String host, int port, Resource resource) { + NodeId id = NodeId.newInstance(host, port); + RMContext context = new MockRMContext(); + return new RMNodeImpl(id, context, id.getHost(), id.getPort(), id.getPort(), new NodeBase(host, "/tmp"), resource, "version-one"); + } + + public static RMNode getRMNode(String host, int port, int memory, int cores) { + Resource resource = Resource.newInstance(memory, cores); + return getRMNode(host, port, resource); + } + + public static Dispatcher getMockDispatcher() { + return new MockDispatcher(); + } + + public static SchedulerState getSchedulerState(MyriadConfiguration cfg, String baseDir) throws Exception { + MyriadStateStore store = TestObjectFactory.getStateStore(new Configuration(), baseDir); + SchedulerState state = new SchedulerState(store); state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); return state; } - public static FileSystemRMStateStore getRMStateStore(Configuration conf) throws Exception { - FileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); - conf.set("yarn.resourcemanager.fs.state-store.uri", "/tmp"); - store.initInternal(conf); - return store; + public static MyriadFairScheduler getMyriadFairScheduler(RMContext context) { + MyriadFairScheduler scheduler = new MyriadFairScheduler(); + scheduler.setRMContext(context); + return scheduler; } - + + public static SchedulerNode getSchedulerNode(String host, int port, int cores, int memory) { + RMNode node = TestObjectFactory.getRMNode(host, port, cores, memory); + return new FiCaSchedulerNode(node, false); + } + + public static MyriadFairScheduler getYarnFairScheduler() { + RMContext context = new MockRMContext(); + return getMyriadFairScheduler(context); + } + + public static MyriadDriverManager getMyriadDriverManager(MyriadDriver driver) { + return new MyriadDriverManager(driver); + } + public static MyriadDriverManager getMyriadDriverManager() { - return new MyriadDriverManager(new MyriadDriver(new MockSchedulerDriver())); + return getMyriadDriverManager(new MyriadDriver(new MockSchedulerDriver())); + } + + public static MyriadDriver getMyriadDriver(SchedulerDriver driver) { + return new MyriadDriver(driver); } public static InterceptorRegistry getInterceptorRegistry() { @@ -77,7 +251,7 @@ public class TestObjectFactory { return scheduler; } - private static Server getJettyServer() { + public static Server getJettyServer() { Server server = new Server(); ServletHandler context = new ServletHandler(); ServletHolder holder = new ServletHolder(DefaultServlet.class); @@ -97,76 +271,24 @@ public class TestObjectFactory { return new MyriadWebServer(server, connector, new GuiceFilter()); } - public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, boolean loadState) throws Exception { - conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/"); + public static MyriadFileSystemRMStateStore getStateStore(Configuration conf, String baseDir) throws Exception { + conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///" + baseDir); MyriadFileSystemRMStateStore store = new MyriadFileSystemRMStateStore(); store.init(conf); store.start(); - if (loadState) { - store.loadState(); - } + store.loadState(); store.setRMDispatcher(new MockDispatcher()); return store; } - - public static Offer getOffer(String host, String slaveId, String frameworkId, String offerId) { + + public static Offer getOffer(String host, String slaveId, String frameworkId, String offerId, double cpuCores, double memory) { Protos.SlaveID sid = SlaveID.newBuilder().setValue(slaveId).build(); Protos.FrameworkID fid = FrameworkID.newBuilder().setValue(frameworkId).build(); - return Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)).setSlaveId(sid).setFrameworkId(fid).build(); - } - - public static RMContext getRMContext(Configuration conf) throws Exception { - conf.set("yarn.resourcemanager.fs.state-store.uri", "file:///tmp/"); - MockRMContext context = null; - Dispatcher dispatcher = new MockDispatcher(); - - RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter(); - AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher); - AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher); - RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context); - - context = new MockRMContext(); - context.setStateStore(TestObjectFactory.getStateStore(conf, false)); - context.setAmLivelinessMonitor(amLivelinessMonitor); - context.setAmFinishingMonitor(amFinishingMonitor); - context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); - context.setRMDelegationTokenSecretManager(delegationTokenSecretManager); - return context; - } - - public static MyriadOperations getMyriadOperations(MyriadConfiguration cfg) throws Exception { - AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); - SchedulerState sState = TestObjectFactory.getSchedulerState(cfg); - sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); - - MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager(); - MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg); - CompositeInterceptor registry = new CompositeInterceptor(); - LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState); - return new MyriadOperations(cfg, sState, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration())); - } - - public static SchedulerNode getSchedulerNode(NodeId nodeId, int vCores, int memory) { - RMNode node = getMockRMNode(nodeId, vCores, memory); - return new FiCaSchedulerNode(node, true); - } - - public static RMNode getMockRMNode(NodeId nodeId, int vCores, int memory) { - MockRMNode node = new MockRMNode(nodeId, NodeState.NEW, new NodeBase("/tmp")); - node.setCommandPort(8041); - node.setHostName("0.0.0.0"); - node.setHttpPort(8042); - node.setRackName("r01n07"); - node.setHttpAddress("localhost:8042"); - node.setTotalCapability(getResource(vCores, memory)); - - return node; - } - - public static Resource getResource(int vCores, int memory) { - Resource resource = new ResourcePBImpl(); - resource.setVirtualCores(vCores); - resource.setMemory(memory); - return resource; + Protos.Value.Scalar cores = Protos.Value.Scalar.newBuilder().setValue(cpuCores).build(); + Protos.Value.Scalar mem = Protos.Value.Scalar.newBuilder().setValue(memory).build(); + Protos.Resource cpuResource = Protos.Resource.newBuilder().setName("cpus").setScalar(cores).setType(Type.SCALAR).build(); + Protos.Resource memResource = Protos.Resource.newBuilder().setName("mem").setScalar(mem).setType(Type.SCALAR).build(); + return Protos.Offer.newBuilder().setHostname(host).setId(OfferID.newBuilder().setValue(offerId)). + setSlaveId(sid).setFrameworkId(fid).addResources(cpuResource).addResources(memResource).build(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java index 5d7bb75..b30e2a7 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/ArtifactsResourceTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.api; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java index e0eda0f..7965f02 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/api/SchedulerStateResourceTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.api; import static org.junit.Assert.assertNotNull; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java index 562d128..9aef9a9 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/configuration/MyriadConfigurationTest.java @@ -89,8 +89,9 @@ public class MyriadConfigurationTest extends BaseConfigurableTest { NodeManagerConfiguration config = cfg.getNodeManagerConfiguration(); assertFalse(config.getCgroups()); - assertEquals(new Double(0.2), config.getCpus()); - assertEquals(new Double(1024.0), config.getJvmMaxMemoryMB()); + assertEquals(new Double(0.8), config.getCpus()); + assertEquals(new Double(2048.0), config.getJvmMaxMemoryMB()); + assertEquals(new Double(4.0), config.getMaxCpus()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java index e403f90..4adc6e5 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/health/HealthCheckUtilsTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.health; import java.net.ServerSocket; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java index cebf2c7..7d1a786 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/health/MesosDriverHealthCheckTest.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.myriad.health; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java index 29087e7..cd4683a 100644 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java +++ b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/MyriadOperationsTest.java @@ -19,10 +19,17 @@ package org.apache.myriad.scheduler; import static org.junit.Assert.assertEquals; +import java.util.HashMap; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.mesos.Protos.FrameworkID; import org.apache.myriad.BaseConfigurableTest; import org.apache.myriad.TestObjectFactory; @@ -31,95 +38,99 @@ import org.apache.myriad.policy.LeastAMNodesFirstPolicy; import org.apache.myriad.scheduler.constraints.Constraint; import org.apache.myriad.scheduler.constraints.LikeConstraint; import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.state.MockDispatcher; +import org.apache.myriad.state.MockRMContext; import org.apache.myriad.state.SchedulerState; import org.apache.myriad.webapp.MyriadWebServer; import org.junit.Before; import org.junit.Test; -import java.util.TreeMap; - /** * Unit tests for MyriadOperations class */ public class MyriadOperationsTest extends BaseConfigurableTest { ServiceResourceProfile small; Constraint constraint = new LikeConstraint("localhost", "host-[0-9]*.example.com"); - MyriadWebServer webServer; - private SchedulerState getSchedulerState() throws Exception { - SchedulerState state = TestObjectFactory.getSchedulerState(this.cfg); - state.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); - return state; + @Before + public void setUp() throws Exception { + super.setUp(); + this.baseStateStoreDirectory = "/tmp/myriad-operations-test"; + generateProfiles(); } - - private MyriadOperations getMyriadOperations(SchedulerState state) throws Exception { - MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager(); + private MyriadOperations initialize() throws Exception { + resetStoreState(); + SchedulerState sState = TestObjectFactory.getSchedulerState(cfg, "tmp/myriad-operations-test"); + sState.setFrameworkId(FrameworkID.newBuilder().setValue("mock-framework").build()); AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler = TestObjectFactory.getYarnScheduler(); + MyriadDriverManager manager = TestObjectFactory.getMyriadDriverManager(); + MyriadWebServer webServer = TestObjectFactory.getMyriadWebServer(cfg); CompositeInterceptor registry = new CompositeInterceptor(); - LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, state); + LeastAMNodesFirstPolicy policy = new LeastAMNodesFirstPolicy(registry, scheduler, sState); + manager.startDriver(); - return new MyriadOperations(cfg, state, policy, manager, webServer, TestObjectFactory.getRMContext(new Configuration())); + return new MyriadOperations(cfg, sState, policy, manager, webServer, generateRMContext(scheduler)); } - - @Before - public void setUp() throws Exception { - super.setUp(); - webServer = TestObjectFactory.getMyriadWebServer(cfg); - generateProfiles(); + private void generateProfiles() { + small = new ServiceResourceProfile("small", new Double(0.1), new Double(512.0), new HashMap<String, Long>()); } - private void generateProfiles() { - TreeMap<String, Long> ports = new TreeMap<>(); - small = new ServiceResourceProfile("small", 0.1, 512.0, ports); + private RMContext generateRMContext(AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> scheduler) throws Exception { + Configuration conf = new Configuration(); + MockRMContext context = null; + Dispatcher dispatcher = new MockDispatcher(); + + RMApplicationHistoryWriter rmApplicationHistoryWriter = new RMApplicationHistoryWriter(); + AMLivelinessMonitor amLivelinessMonitor = new AMLivelinessMonitor(dispatcher); + AMLivelinessMonitor amFinishingMonitor = new AMLivelinessMonitor(dispatcher); + RMDelegationTokenSecretManager delegationTokenSecretManager = new RMDelegationTokenSecretManager(1, 1, 1, 1, context); + + context = new MockRMContext(); + context.setStateStore(TestObjectFactory.getStateStore(conf, "tmp/myriad-operations-test")); + context.setAmLivelinessMonitor(amLivelinessMonitor); + context.setAmFinishingMonitor(amFinishingMonitor); + context.setRMApplicationHistoryWriter(rmApplicationHistoryWriter); + context.setRMDelegationTokenSecretManager(delegationTokenSecretManager); + return context; } @Test public void testFlexUpAndFlexDownCluster() throws Exception { - SchedulerState sState = this.getSchedulerState(); - MyriadOperations ops = this.getMyriadOperations(sState); - assertEquals(0, sState.getPendingTaskIds().size()); + MyriadOperations ops = initialize(); + assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size()); ops.flexUpCluster(small, 1, constraint); - assertEquals(1, sState.getPendingTaskIds().size()); + assertEquals(1, ops.getSchedulerState().getPendingTaskIds().size()); ops.flexDownCluster(small, constraint, 1); - assertEquals(0, sState.getPendingTaskIds().size()); + assertEquals(0, ops.getSchedulerState().getPendingTaskIds().size()); } - + @Test public void testFlexUpAndFlexDownService() throws Exception { - SchedulerState sState = this.getSchedulerState(); - MyriadOperations ops = this.getMyriadOperations(sState); + MyriadOperations ops = initialize(); ops.flexUpAService(1, "jobhistory"); - assertEquals(1, sState.getPendingTasksByType("jobhistory").size()); + assertEquals(1, ops.getSchedulerState().getPendingTasksByType("jobhistory").size()); ops.flexDownAService(1, "jobhistory"); - assertEquals(0, sState.getPendingTasksByType("jobhistory").size()); + assertEquals(0, ops.getSchedulerState().getPendingTasksByType("jobhistory").size()); } @Test(expected = MyriadBadConfigurationException.class) public void testFlexUpAServiceOverMaxInstances() throws Exception { - SchedulerState sState = this.getSchedulerState(); - MyriadOperations ops = this.getMyriadOperations(sState); - /* - * There is 1 jobhhistory task loaded from configuration file, so flexing up - * by two should result in MyriadBadConfigurationException - */ - ops.flexUpAService(2, "jobhistory"); + MyriadOperations ops = initialize(); + ops.flexUpAService(3, "jobhistory"); } @Test public void testGetFlexibleInstances() throws Exception { - SchedulerState sState = this.getSchedulerState(); - MyriadOperations ops = this.getMyriadOperations(sState); - assertEquals(0, ops.getFlexibleInstances("jobhistory").intValue()); + MyriadOperations ops = initialize(); ops.flexUpAService(1, "jobhistory"); assertEquals(1, ops.getFlexibleInstances("jobhistory").intValue()); } @Test public void testShutdownCluster() throws Exception { - SchedulerState sState = this.getSchedulerState(); - MyriadOperations ops = this.getMyriadOperations(sState); + MyriadOperations ops = initialize(); ops.shutdownFramework(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/577c30b1/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy b/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy deleted file mode 100644 index 6555f04..0000000 --- a/myriad-scheduler/src/test/java/org/apache/myriad/scheduler/SchedulerUtilsSpec.groovy +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.myriad.scheduler - -import org.apache.mesos.Protos -import org.apache.myriad.configuration.NodeManagerConfiguration -import org.apache.myriad.state.NodeTask -import org.apache.myriad.state.SchedulerState -import spock.lang.Specification - -/** - * - * @author kensipe - */ -class SchedulerUtilsSpec extends Specification { - - def "is unique host name"() { - given: - def offer = Mock(Protos.OfferOrBuilder) - offer.getHostname() >> "hostname" - - expect: - returnValue == SchedulerUtils.isUniqueHostname(offer, launchTask, tasks) - - where: - tasks | launchTask | returnValue - [] | null | true - null | null | true - createNodeTaskList("hostname") | createNodeTask("hostname") | false - createNodeTaskList("missinghost") | createNodeTask("hostname") | true - createNodeTaskList("missinghost1", "missinghost2") | createNodeTask("missinghost3") | true - createNodeTaskList("missinghost1", "hostname") | createNodeTask("hostname") | false - - } - - def "is eligible for Fine Grained Scaling"() { - given: - def state = Mock(SchedulerState) - def tasks = [] - def fgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("zero", 0, 0), 1.0, 2.0, new HashMap<String, Long>()), null) - def cgsNMTask = new NodeTask(new ExtendedResourceProfile(new NMProfile("low", 2, 4096), 1.0, 2.0, new HashMap<String, Long>()), null) - fgsNMTask.setHostname("test_fgs_hostname") - cgsNMTask.setHostname("test_cgs_hostname") - tasks << fgsNMTask << cgsNMTask - state.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX) >> tasks - - expect: - returnValue == SchedulerUtils.isEligibleForFineGrainedScaling(hostName, state) - - where: - hostName | returnValue - "test_fgs_hostname" | true - "test_cgs_hostname" | false - "blah" | false - "" | false - null | false - } - - ArrayList<NodeTask> createNodeTaskList(String... hostnames) { - def list = [] - hostnames.each { hostname -> - list << createNodeTask(hostname) - } - return list - } - - - NodeTask createNodeTask(String hostname) { - def node = new NodeTask(new ExtendedResourceProfile(new NMProfile("", 1, 1), 1.0, 1.0, new HashMap<String, Long>()), null) - node.hostname = hostname - node.taskPrefix = "nm" - node - } -}