http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java index 235d01b..cb850ab 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadScheduler.java @@ -18,21 +18,23 @@ */ package org.apache.myriad.scheduler; +import com.lmax.disruptor.EventTranslator; +import java.util.List; +import javax.inject.Inject; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.scheduler.event.DisconnectedEvent; import org.apache.myriad.scheduler.event.ErrorEvent; +import org.apache.myriad.scheduler.event.ExecutorLostEvent; import org.apache.myriad.scheduler.event.FrameworkMessageEvent; import org.apache.myriad.scheduler.event.OfferRescindedEvent; import org.apache.myriad.scheduler.event.ReRegisteredEvent; +import org.apache.myriad.scheduler.event.RegisteredEvent; import org.apache.myriad.scheduler.event.ResourceOffersEvent; import org.apache.myriad.scheduler.event.SlaveLostEvent; import org.apache.myriad.scheduler.event.StatusUpdateEvent; -import com.lmax.disruptor.EventTranslator; -import org.apache.mesos.Protos; -import org.apache.mesos.Scheduler; -import org.apache.mesos.SchedulerDriver; - -import javax.inject.Inject; -import java.util.List; /** * Myriad Scheduler @@ -47,9 +49,9 @@ public class MyriadScheduler implements Scheduler { @Override public void registered(final SchedulerDriver driver, final Protos.FrameworkID frameworkId, final Protos.MasterInfo masterInfo) { - disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.RegisteredEvent>() { + disruptorManager.getRegisteredEventDisruptor().publishEvent(new EventTranslator<RegisteredEvent>() { @Override - public void translateTo(org.apache.myriad.scheduler.event.RegisteredEvent event, long sequence) { + public void translateTo(RegisteredEvent event, long sequence) { event.setDriver(driver); event.setFrameworkId(frameworkId); event.setMasterInfo(masterInfo); @@ -102,7 +104,8 @@ public class MyriadScheduler implements Scheduler { } @Override - public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final byte[] bytes) { + public void frameworkMessage(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, + final byte[] bytes) { disruptorManager.getFrameworkMessageEventDisruptor().publishEvent(new EventTranslator<FrameworkMessageEvent>() { @Override public void translateTo(FrameworkMessageEvent event, long sequence) { @@ -116,9 +119,9 @@ public class MyriadScheduler implements Scheduler { @Override public void disconnected(final SchedulerDriver driver) { - disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.DisconnectedEvent>() { + disruptorManager.getDisconnectedEventDisruptor().publishEvent(new EventTranslator<DisconnectedEvent>() { @Override - public void translateTo(org.apache.myriad.scheduler.event.DisconnectedEvent event, long sequence) { + public void translateTo(DisconnectedEvent event, long sequence) { event.setDriver(driver); } }); @@ -136,10 +139,11 @@ public class MyriadScheduler implements Scheduler { } @Override - public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, final int exitStatus) { - disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<org.apache.myriad.scheduler.event.ExecutorLostEvent>() { + public void executorLost(final SchedulerDriver driver, final Protos.ExecutorID executorId, final Protos.SlaveID slaveId, + final int exitStatus) { + disruptorManager.getExecutorLostEventDisruptor().publishEvent(new EventTranslator<ExecutorLostEvent>() { @Override - public void translateTo(org.apache.myriad.scheduler.event.ExecutorLostEvent event, long sequence) { + public void translateTo(ExecutorLostEvent event, long sequence) { event.setDriver(driver); event.setExecutorId(executorId); event.setSlaveId(slaveId);
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java index 411518f..19eda34 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMExecutorCLGenImpl.java @@ -19,15 +19,13 @@ package org.apache.myriad.scheduler; -import java.util.Map; import java.util.HashMap; - +import java.util.Map; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.myriad.configuration.MyriadConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.myriad.configuration.MyriadConfiguration; - /** * Implementation assumes NM binaries already deployed */ @@ -44,14 +42,14 @@ public class NMExecutorCLGenImpl implements ExecutorCommandLineGenerator { */ public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; // TODO (mohit): Should it be configurable ? - public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; - public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; - + public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; + public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; /** * YARN class to help handle LCE resources */ public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; - // TODO (mohit): Should it be configurable ? public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java index 0944a2a..12490a7 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMPorts.java @@ -19,7 +19,6 @@ package org.apache.myriad.scheduler; import com.google.common.base.Preconditions; - import java.util.HashMap; import java.util.Map; @@ -32,7 +31,8 @@ public class NMPorts implements Ports { private static final String NM_WEBAPP_HTTP_PORT_KEY = "nm.webapp.http.port"; private static final String NM_HTTP_SHUFFLE_PORT_KEY = "nm.http.shuffle.port"; - private static final String[] NM_PORT_KEYS = {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY}; + private static final String[] NM_PORT_KEYS = + {NM_RPC_PORT_KEY, NM_LOCALIZER_PORT_KEY, NM_WEBAPP_HTTP_PORT_KEY, NM_HTTP_SHUFFLE_PORT_KEY}; private Map<String, Long> portsMap = new HashMap<>(NM_PORT_KEYS.length); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java index 38fad1d..4256f40 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMProfileManager.java @@ -19,11 +19,10 @@ package org.apache.myriad.scheduler; import com.google.gson.Gson; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Node Manager Profile Manager http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java index 62d9f52..8ddd9a0 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/NMTaskFactoryAnnotation.java @@ -19,12 +19,12 @@ package org.apache.myriad.scheduler; import com.google.inject.BindingAnnotation; -import java.lang.annotation.Target; import java.lang.annotation.Retention; -import static java.lang.annotation.RetentionPolicy.RUNTIME; -import static java.lang.annotation.ElementType.PARAMETER; +import java.lang.annotation.Target; import static java.lang.annotation.ElementType.FIELD; import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; /** * NMTaskFactory annotation that allows to bind TaskFactory to NM specific implementation http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java index 955bc77..3c3ce79 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/Rebalancer.java @@ -18,10 +18,11 @@ */ package org.apache.myriad.scheduler; -import javax.inject.Inject; import java.util.Set; - +import javax.inject.Inject; import org.apache.mesos.Protos; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,12 +33,12 @@ import org.slf4j.LoggerFactory; public class Rebalancer implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(Rebalancer.class); - private final org.apache.myriad.state.SchedulerState schedulerState; + private final SchedulerState schedulerState; private final MyriadOperations myriadOperations; private final ServiceProfileManager profileManager; @Inject - public Rebalancer(org.apache.myriad.state.SchedulerState schedulerState, MyriadOperations myriadOperations, ServiceProfileManager profileManager) { + public Rebalancer(SchedulerState schedulerState, MyriadOperations myriadOperations, ServiceProfileManager profileManager) { this.schedulerState = schedulerState; this.myriadOperations = myriadOperations; this.profileManager = profileManager; @@ -45,8 +46,8 @@ public class Rebalancer implements Runnable { @Override public void run() { - final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX); - final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX); + final Set<Protos.TaskID> activeIds = schedulerState.getActiveTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX); + final Set<Protos.TaskID> pendingIds = schedulerState.getPendingTaskIds(NodeManagerConfiguration.NM_TASK_PREFIX); LOGGER.info("Active {}, Pending {}", activeIds.size(), pendingIds.size()); if (activeIds.size() < 1 && pendingIds.size() < 1) { myriadOperations.flexUpCluster(profileManager.get("small"), 1, null); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java index d0c74de..2ca375f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ReconcileService.java @@ -18,17 +18,16 @@ */ package org.apache.myriad.scheduler; -import org.apache.myriad.configuration.MyriadConfiguration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import javax.inject.Inject; import org.apache.mesos.Protos; import org.apache.mesos.SchedulerDriver; +import org.apache.myriad.configuration.MyriadConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; - /** * {@link ReconcileService} is responsible for reconciling tasks with the mesos master */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java index 0cbca37..693ae63 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/SchedulerUtils.java @@ -18,18 +18,16 @@ */ package org.apache.myriad.scheduler; -import org.apache.myriad.configuration.NodeManagerConfiguration; -import org.apache.myriad.state.NodeTask; -import org.apache.myriad.state.SchedulerState; import com.google.common.base.Preconditions; - +import java.util.Collection; import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - /** * Provides utilities for scheduling with the mesos offers */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java index 5319593..42011f9 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceProfileManager.java @@ -18,14 +18,12 @@ */ package org.apache.myriad.scheduler; +import com.google.gson.Gson; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; - /** * Class to keep all the ServiceResourceProfiles together */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java index 021007b..4c033c9 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceResourceProfile.java @@ -18,16 +18,14 @@ */ package org.apache.myriad.scheduler; -import java.lang.reflect.Type; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.gson.Gson; import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; +import java.lang.reflect.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Resource Profile for any service @@ -112,7 +110,8 @@ public class ServiceResourceProfile { private static final Logger LOGGER = LoggerFactory.getLogger(CustomDeserializer.class); @Override - public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException { + public ServiceResourceProfile deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { String type = json.getAsJsonObject().get("className").getAsString(); try { @SuppressWarnings("rawtypes") Class c = Class.forName(type); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java index 544c47f..b7bccb7 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskConstraints.java @@ -19,7 +19,6 @@ package org.apache.myriad.scheduler; import java.util.Map; - import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.ServiceConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java index d7ca31d..725e405 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/ServiceTaskFactoryImpl.java @@ -17,15 +17,16 @@ */ package org.apache.myriad.scheduler; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; - import javax.inject.Inject; - +import org.apache.mesos.Protos; import org.apache.mesos.Protos.CommandInfo; +import org.apache.mesos.Protos.CommandInfo.URI; import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; @@ -33,16 +34,13 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.Value; -import org.apache.mesos.Protos.CommandInfo.URI; import org.apache.mesos.Protos.Value.Scalar; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.MyriadExecutorConfiguration; import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.state.NodeTask; -import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Generic Service Class that allows to create a service solely base don the configuration @@ -104,7 +102,8 @@ public class ServiceTaskFactoryImpl implements TaskFactory { } // use provided ports additionalPortsNumbers = getAvailablePorts(offer, neededPortsCount); - LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", additionalPortsNumbers); + LOGGER.info("No specified ports found or number of specified ports is not enough. Using ports from Mesos Offers: {}", + additionalPortsNumbers); int index = 0; for (Map.Entry<String, Long> portEntry : ports.entrySet()) { String portProperty = portEntry.getKey(); @@ -131,8 +130,9 @@ public class ServiceTaskFactoryImpl implements TaskFactory { TaskInfo.Builder taskBuilder = TaskInfo.newBuilder(); - taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(Resource.newBuilder() - .setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build()); + taskBuilder.setName(nodeTask.getTaskPrefix()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources( + Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources( + Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build()); if (additionalPortsNumbers != null && !additionalPortsNumbers.isEmpty()) { // set ports @@ -153,9 +153,9 @@ public class ServiceTaskFactoryImpl implements TaskFactory { CommandInfo.Builder commandInfo = CommandInfo.newBuilder(); Map<String, String> envVars = cfg.getYarnEnvironment(); if (envVars != null && !envVars.isEmpty()) { - org.apache.mesos.Protos.Environment.Builder yarnHomeB = org.apache.mesos.Protos.Environment.newBuilder(); + Protos.Environment.Builder yarnHomeB = Protos.Environment.newBuilder(); for (Map.Entry<String, String> envEntry : envVars.entrySet()) { - org.apache.mesos.Protos.Environment.Variable.Builder yarnEnvB = org.apache.mesos.Protos.Environment.Variable.newBuilder(); + Protos.Environment.Variable.Builder yarnEnvB = Protos.Environment.Variable.newBuilder(); yarnEnvB.setName(envEntry.getKey()).setValue(envEntry.getValue()); yarnHomeB.addVariables(yarnEnvB.build()); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java index 46cd12b..391cb32 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskFactory.java @@ -18,20 +18,14 @@ */ package org.apache.myriad.scheduler; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Random; - import javax.inject.Inject; - -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadExecutorConfiguration; -import org.apache.myriad.state.NodeTask; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - import org.apache.mesos.Protos.CommandInfo; import org.apache.mesos.Protos.CommandInfo.URI; import org.apache.mesos.Protos.ExecutorID; @@ -39,11 +33,14 @@ import org.apache.mesos.Protos.ExecutorInfo; import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskID; +import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.Value; import org.apache.mesos.Protos.Value.Range; import org.apache.mesos.Protos.Value.Scalar; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.state.NodeTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -110,16 +107,16 @@ public interface TaskFactory { } Preconditions.checkState(allAvailablePorts.size() >= NMPorts.expectedNumPorts(), "Not enough ports in offer"); - + while (ports.size() < NMPorts.expectedNumPorts()) { int portIndex = rand.nextInt(allAvailablePorts.size()); ports.add(allAvailablePorts.get(portIndex)); allAvailablePorts.remove(portIndex); - } + } } return ports; } - + //Utility function to get the first NMPorts.expectedNumPorts number of ports of an offer @VisibleForTesting protected static NMPorts getPorts(Offer offer) { @@ -144,8 +141,7 @@ public interface TaskFactory { if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { //Both FrameworkUser and FrameworkSuperuser to get all of the directory permissions correct. if (!(cfg.getFrameworkUser().isPresent() && cfg.getFrameworkSuperUser().isPresent())) { - throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + - "and/or frameworkSuperUser not set!"); + throw new RuntimeException("Trying to use remote distribution, but frameworkUser" + "and/or frameworkSuperUser not set!"); } String nodeManagerUri = myriadExecutorConfiguration.getNodeManagerUri().get(); cmd = clGenerator.generateCommandLine(profile, ports); @@ -189,12 +185,27 @@ public interface TaskFactory { CommandInfo commandInfo = getCommandInfo(serviceProfile, ports); ExecutorInfo executorInfo = getExecutorInfoForSlave(frameworkId, offer, commandInfo); - TaskInfo.Builder taskBuilder = TaskInfo.newBuilder().setName("task-" + taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()); - - return taskBuilder.addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()).addResources(Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build()) - .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges(Value.Ranges.newBuilder().addRange(Value.Range.newBuilder().setBegin(ports.getRpcPort()).setEnd(ports.getRpcPort()).build()).addRange(Value.Range - .newBuilder().setBegin(ports.getLocalizerPort()).setEnd(ports.getLocalizerPort()).build()).addRange(Value.Range.newBuilder().setBegin(ports.getWebAppHttpPort()).setEnd(ports.getWebAppHttpPort()).build()).addRange(Value.Range - .newBuilder().setBegin(ports.getShufflePort()).setEnd(ports.getShufflePort()).build()))).setExecutor(executorInfo).build(); + TaskInfo.Builder taskBuilder = TaskInfo.newBuilder().setName("task-" + taskId.getValue()).setTaskId(taskId).setSlaveId( + offer.getSlaveId()); + + return taskBuilder.addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(taskCpus).build()) + .addResources(Resource.newBuilder().setName("mem").setType(Value.Type.SCALAR).setScalar(taskMemory).build()) + .addResources(Resource.newBuilder().setName("ports").setType(Value.Type.RANGES).setRanges( + Value.Ranges.newBuilder().addRange(Value.Range.newBuilder() + .setBegin(ports.getRpcPort()) + .setEnd(ports.getRpcPort()) + .build()).addRange(Value.Range.newBuilder() + .setBegin(ports.getLocalizerPort()) + .setEnd(ports.getLocalizerPort()) + .build()).addRange(Value.Range.newBuilder() + .setBegin(ports.getWebAppHttpPort()) + .setEnd(ports.getWebAppHttpPort()) + .build()).addRange(Value.Range.newBuilder() + .setBegin(ports.getShufflePort()) + .setEnd(ports.getShufflePort()) + .build()))) + .setExecutor(executorInfo) + .build(); } @Override @@ -203,9 +214,13 @@ public interface TaskFactory { Scalar executorCpus = Scalar.newBuilder().setValue(taskUtils.getExecutorCpus()).build(); ExecutorID executorId = ExecutorID.newBuilder().setValue(EXECUTOR_PREFIX + frameworkId.getValue() + - offer.getId().getValue() + offer.getSlaveId().getValue()).build(); - return ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).addResources(Resource.newBuilder().setName("cpus").setType(Value.Type.SCALAR).setScalar(executorCpus).build()).addResources(Resource.newBuilder().setName("mem") - .setType(Value.Type.SCALAR).setScalar(executorMemory).build()).setExecutorId(executorId).build(); + offer.getId().getValue() + offer.getSlaveId().getValue()).build(); + return ExecutorInfo.newBuilder().setCommand(commandInfo).setName(EXECUTOR_NAME).addResources(Resource.newBuilder().setName( + "cpus").setType(Value.Type.SCALAR).setScalar(executorCpus).build()).addResources(Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(executorMemory) + .build()).setExecutorId(executorId).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java index f0b3fa6..6be653b 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskTerminator.java @@ -18,19 +18,16 @@ */ package org.apache.myriad.scheduler; -import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; -import org.apache.myriad.state.NodeTask; -import org.apache.myriad.state.SchedulerState; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; - import java.util.Set; - import javax.inject.Inject; - import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; +import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +42,8 @@ public class TaskTerminator implements Runnable { private final OfferLifecycleManager offerLifeCycleManager; @Inject - public TaskTerminator(SchedulerState schedulerState, MyriadDriverManager driverManager, OfferLifecycleManager offerLifecycleManager) { + public TaskTerminator(SchedulerState schedulerState, MyriadDriverManager driverManager, + OfferLifecycleManager offerLifecycleManager) { this.schedulerState = schedulerState; this.driverManager = driverManager; this.offerLifeCycleManager = offerLifecycleManager; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java index 375ff3c..f632d84 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/TaskUtils.java @@ -18,21 +18,10 @@ */ package org.apache.myriad.scheduler; -import org.apache.myriad.configuration.ServiceConfiguration; -import org.apache.myriad.configuration.MyriadBadConfigurationException; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.configuration.MyriadExecutorConfiguration; -import org.apache.myriad.configuration.NodeManagerConfiguration; import com.google.common.base.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; -import org.xml.sax.InputSource; -import org.xml.sax.SAXException; - +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; import javax.inject.Inject; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -49,10 +38,19 @@ import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathExpressionException; import javax.xml.xpath.XPathFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.StringWriter; +import org.apache.myriad.configuration.MyriadBadConfigurationException; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.MyriadExecutorConfiguration; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; +import org.apache.myriad.executor.MyriadExecutorDefaults; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; /** * utility class for working with tasks and node manager profiles @@ -144,12 +142,13 @@ public class TaskUtils { } public double getAggregateCpus(NMProfile profile) { - return getNodeManagerCpus() + org.apache.myriad.executor.MyriadExecutorDefaults.DEFAULT_CPUS + profile.getCpus(); + return getNodeManagerCpus() + MyriadExecutorDefaults.DEFAULT_CPUS + profile.getCpus(); } public double getNodeManagerMemory() { NodeManagerConfiguration nmCfg = this.cfg.getNodeManagerConfiguration(); - return (nmCfg.getJvmMaxMemoryMB().isPresent() ? nmCfg.getJvmMaxMemoryMB().get() : NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD); + return (nmCfg.getJvmMaxMemoryMB().isPresent() ? nmCfg.getJvmMaxMemoryMB() + .get() : NodeManagerConfiguration.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + NodeManagerConfiguration.JVM_OVERHEAD); } public double getNodeManagerCpus() { @@ -159,12 +158,13 @@ public class TaskUtils { public double getExecutorCpus() { - return org.apache.myriad.executor.MyriadExecutorDefaults.DEFAULT_CPUS; + return MyriadExecutorDefaults.DEFAULT_CPUS; } public double getExecutorMemory() { MyriadExecutorConfiguration executorCfg = this.cfg.getMyriadExecutorConfiguration(); - return (executorCfg.getJvmMaxMemoryMB().isPresent() ? executorCfg.getJvmMaxMemoryMB().get() : org.apache.myriad.executor.MyriadExecutorDefaults.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + org.apache.myriad.executor.MyriadExecutorDefaults.JVM_OVERHEAD); + return (executorCfg.getJvmMaxMemoryMB().isPresent() ? executorCfg.getJvmMaxMemoryMB() + .get() : MyriadExecutorDefaults.DEFAULT_JVM_MAX_MEMORY_MB) * (1 + MyriadExecutorDefaults.JVM_OVERHEAD); } public double getTaskCpus(NMProfile profile) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/Constraint.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/Constraint.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/Constraint.java index 6d66e5a..39ee3ed 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/Constraint.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/Constraint.java @@ -22,6 +22,8 @@ package org.apache.myriad.scheduler.constraints; * Interface for Constraint. */ public interface Constraint { + public Type getType(); + /** * Type of Constraint */ @@ -30,6 +32,4 @@ public interface Constraint { LIKE } - public Type getType(); - } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/LikeConstraint.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/LikeConstraint.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/LikeConstraint.java index 6ccda7b..dc87bc2 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/LikeConstraint.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/constraints/LikeConstraint.java @@ -62,7 +62,8 @@ public class LikeConstraint implements Constraint { return this.pattern.matcher(String.valueOf(attr.getScalar().getValue())).matches(); default: - LOGGER.warn("LIKE constraint currently doesn't support Mesos slave attributes " + "of type {}. Attribute Name: {}", attr.getType(), attr.getName()); + LOGGER.warn("LIKE constraint currently doesn't support Mesos slave attributes " + "of type {}. Attribute Name: {}", + attr.getType(), attr.getName()); return false; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/ResourceOffersEvent.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/ResourceOffersEvent.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/ResourceOffersEvent.java index 1f52faa..b5bffdf 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/ResourceOffersEvent.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/ResourceOffersEvent.java @@ -18,11 +18,10 @@ */ package org.apache.myriad.scheduler.event; +import java.util.List; import org.apache.mesos.Protos; import org.apache.mesos.SchedulerDriver; -import java.util.List; - /** * resource offer event */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/DisconnectedEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/DisconnectedEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/DisconnectedEventHandler.java index 293676e..fca2760 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/DisconnectedEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/DisconnectedEventHandler.java @@ -19,17 +19,18 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; +import org.apache.myriad.scheduler.event.DisconnectedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * handles and logs disconnected events */ -public class DisconnectedEventHandler implements EventHandler<org.apache.myriad.scheduler.event.DisconnectedEvent> { +public class DisconnectedEventHandler implements EventHandler<DisconnectedEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectedEventHandler.class); @Override - public void onEvent(org.apache.myriad.scheduler.event.DisconnectedEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(DisconnectedEvent event, long sequence, boolean endOfBatch) throws Exception { LOGGER.info("Framework disconnected!"); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ErrorEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ErrorEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ErrorEventHandler.java index 5c58c20..6e432c3 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ErrorEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ErrorEventHandler.java @@ -19,17 +19,18 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; +import org.apache.myriad.scheduler.event.ErrorEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * handles and logs error events */ -public class ErrorEventHandler implements EventHandler<org.apache.myriad.scheduler.event.ErrorEvent> { +public class ErrorEventHandler implements EventHandler<ErrorEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(ErrorEventHandler.class); @Override - public void onEvent(org.apache.myriad.scheduler.event.ErrorEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(ErrorEvent event, long sequence, boolean endOfBatch) throws Exception { String message = event.getMessage(); LOGGER.error(message); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java index 053baaa..efe0359 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java @@ -21,17 +21,18 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.SlaveID; +import org.apache.myriad.scheduler.event.ExecutorLostEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * handles and logs executor lost events */ -public class ExecutorLostEventHandler implements EventHandler<org.apache.myriad.scheduler.event.ExecutorLostEvent> { +public class ExecutorLostEventHandler implements EventHandler<ExecutorLostEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorLostEventHandler.class); @Override - public void onEvent(org.apache.myriad.scheduler.event.ExecutorLostEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(ExecutorLostEvent event, long sequence, boolean endOfBatch) throws Exception { ExecutorID executorId = event.getExecutorId(); SlaveID slaveId = event.getSlaveId(); int exitStatus = event.getExitStatus(); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java index 0140f93..5bffec1 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java @@ -21,17 +21,18 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; import org.apache.mesos.Protos.ExecutorID; import org.apache.mesos.Protos.SlaveID; +import org.apache.myriad.scheduler.event.FrameworkMessageEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * handles and logs mesos framework messages */ -public class FrameworkMessageEventHandler implements EventHandler<org.apache.myriad.scheduler.event.FrameworkMessageEvent> { +public class FrameworkMessageEventHandler implements EventHandler<FrameworkMessageEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(FrameworkMessageEventHandler.class); @Override - public void onEvent(org.apache.myriad.scheduler.event.FrameworkMessageEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(FrameworkMessageEvent event, long sequence, boolean endOfBatch) throws Exception { ExecutorID executorId = event.getExecutorId(); SlaveID slaveId = event.getSlaveId(); LOGGER.info("Received framework message from executor {} of slave {}", executorId, slaveId); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java index 9d37ff1..85e8043 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java @@ -19,17 +19,18 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; +import org.apache.myriad.scheduler.event.OfferRescindedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * handles and logs offer rescinded events */ -public class OfferRescindedEventHandler implements EventHandler<org.apache.myriad.scheduler.event.OfferRescindedEvent> { +public class OfferRescindedEventHandler implements EventHandler<OfferRescindedEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(OfferRescindedEventHandler.class); @Override - public void onEvent(org.apache.myriad.scheduler.event.OfferRescindedEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(OfferRescindedEvent event, long sequence, boolean endOfBatch) throws Exception { LOGGER.info("OfferRescinded event: {}", event); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java index 57f9630..6b3e94f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java @@ -18,11 +18,11 @@ */ package org.apache.myriad.scheduler.event.handlers; +import com.google.inject.Inject; +import com.lmax.disruptor.EventHandler; import org.apache.myriad.scheduler.ReconcileService; import org.apache.myriad.scheduler.event.ReRegisteredEvent; import org.apache.myriad.state.SchedulerState; -import com.google.inject.Inject; -import com.lmax.disruptor.EventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/RegisteredEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/RegisteredEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/RegisteredEventHandler.java index 0678bf0..f4fbf11 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/RegisteredEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/RegisteredEventHandler.java @@ -19,15 +19,15 @@ package org.apache.myriad.scheduler.event.handlers; import com.lmax.disruptor.EventHandler; +import javax.inject.Inject; +import org.apache.myriad.scheduler.event.RegisteredEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; - /** * handles and logs mesos registered events */ -public class RegisteredEventHandler implements EventHandler<org.apache.myriad.scheduler.event.RegisteredEvent> { +public class RegisteredEventHandler implements EventHandler<RegisteredEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(RegisteredEventHandler.class); @Inject @@ -37,7 +37,7 @@ public class RegisteredEventHandler implements EventHandler<org.apache.myriad.sc private org.apache.myriad.scheduler.ReconcileService reconcileService; @Override - public void onEvent(org.apache.myriad.scheduler.event.RegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(RegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { LOGGER.info("Received event: {} with frameworkId: {}", event, event.getFrameworkId()); schedulerState.setFrameworkId(event.getFrameworkId()); reconcileService.reconcile(event.getDriver()); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index b61496f..1df68a5 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -20,9 +20,16 @@ package org.apache.myriad.scheduler.event.handlers; import com.google.common.collect.Sets; import com.lmax.disruptor.EventHandler; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; - +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.inject.Inject; import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Offer; @@ -31,24 +38,25 @@ import org.apache.mesos.Protos.Resource; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.Value; import org.apache.mesos.SchedulerDriver; +import org.apache.myriad.scheduler.SchedulerUtils; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.TaskConstraints; +import org.apache.myriad.scheduler.TaskConstraintsManager; +import org.apache.myriad.scheduler.TaskFactory; +import org.apache.myriad.scheduler.TaskUtils; +import org.apache.myriad.scheduler.constraints.Constraint; +import org.apache.myriad.scheduler.constraints.LikeConstraint; +import org.apache.myriad.scheduler.event.ResourceOffersEvent; +import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - /** * handles and logs resource offers events */ -public class ResourceOffersEventHandler implements EventHandler<org.apache.myriad.scheduler.event.ResourceOffersEvent> { +public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class); private static final Lock driverOperationLock = new ReentrantLock(); @@ -60,22 +68,22 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria @Inject - private org.apache.myriad.state.SchedulerState schedulerState; + private SchedulerState schedulerState; @Inject - private org.apache.myriad.scheduler.TaskUtils taskUtils; + private TaskUtils taskUtils; @Inject - private Map<String, org.apache.myriad.scheduler.TaskFactory> taskFactoryMap; + private Map<String, TaskFactory> taskFactoryMap; @Inject - private org.apache.myriad.scheduler.fgs.OfferLifecycleManager offerLifecycleMgr; + private OfferLifecycleManager offerLifecycleMgr; @Inject - private org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager; + private TaskConstraintsManager taskConstraintsManager; @Override - public void onEvent(org.apache.myriad.scheduler.event.ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception { + public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception { SchedulerDriver driver = event.getDriver(); List<Offer> offers = event.getOffers(); @@ -95,34 +103,35 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria try { for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) { Offer offer = iterator.next(); - Set<org.apache.myriad.state.NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId()); - for (org.apache.myriad.state.NodeTask nodeTask : nodeTasks) { + Set<NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId()); + for (NodeTask nodeTask : nodeTasks) { nodeTask.setSlaveAttributes(offer.getAttributesList()); } - // keep this in case SchedulerState gets out of sync. This should not happen with + // keep this in case SchedulerState gets out of sync. This should not happen with // synchronizing addNodes method in SchedulerState // but to keep it safe final Set<Protos.TaskID> missingTasks = Sets.newHashSet(); Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); if (CollectionUtils.isNotEmpty(pendingTasks)) { for (Protos.TaskID pendingTaskId : pendingTasks) { - org.apache.myriad.state.NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId); + NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId); if (taskToLaunch == null) { missingTasks.add(pendingTaskId); LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId); continue; } String taskPrefix = taskToLaunch.getTaskPrefix(); - org.apache.myriad.scheduler.ServiceResourceProfile profile = taskToLaunch.getProfile(); - org.apache.myriad.scheduler.constraints.Constraint constraint = taskToLaunch.getConstraint(); + ServiceResourceProfile profile = taskToLaunch.getProfile(); + Constraint constraint = taskToLaunch.getConstraint(); - Set<org.apache.myriad.state.NodeTask> launchedTasks = new HashSet<>(); + Set<NodeTask> launchedTasks = new HashSet<>(); launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix)); launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix)); - if (matches(offer, taskToLaunch, constraint) && org.apache.myriad.scheduler.SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { + if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { try { - final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch); + final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), + pendingTaskId, taskToLaunch); List<OfferID> offerIds = new ArrayList<>(); offerIds.add(offer.getId()); List<TaskInfo> tasks = new ArrayList<>(); @@ -154,7 +163,7 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria } for (Offer offer : offers) { - if (org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { + if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", offer.getHostname()); } @@ -171,7 +180,7 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria } } - private boolean matches(Offer offer, org.apache.myriad.state.NodeTask taskToLaunch, org.apache.myriad.scheduler.constraints.Constraint constraint) { + private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) { if (!meetsConstraint(offer, constraint)) { return false; } @@ -200,12 +209,12 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria return checkAggregates(offer, taskToLaunch, ports, cpus, mem); } - private boolean checkAggregates(Offer offer, org.apache.myriad.state.NodeTask taskToLaunch, int ports, double cpus, double mem) { - final org.apache.myriad.scheduler.ServiceResourceProfile profile = taskToLaunch.getProfile(); + private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) { + final ServiceResourceProfile profile = taskToLaunch.getProfile(); final String taskPrefix = taskToLaunch.getTaskPrefix(); final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu(); final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory(); - final org.apache.myriad.scheduler.TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); + final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) { return true; } else { @@ -214,11 +223,11 @@ public class ResourceOffersEventHandler implements EventHandler<org.apache.myria } } - private boolean meetsConstraint(Offer offer, org.apache.myriad.scheduler.constraints.Constraint constraint) { + private boolean meetsConstraint(Offer offer, Constraint constraint) { if (constraint != null) { switch (constraint.getType()) { case LIKE: { - org.apache.myriad.scheduler.constraints.LikeConstraint likeConstraint = (org.apache.myriad.scheduler.constraints.LikeConstraint) constraint; + LikeConstraint likeConstraint = (LikeConstraint) constraint; if (likeConstraint.isConstraintOnHostName()) { return likeConstraint.matchesHostName(offer.getHostname()); } else { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java index 6feebe3..b1f37bb 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/SlaveLostEventHandler.java @@ -18,9 +18,9 @@ */ package org.apache.myriad.scheduler.event.handlers; -import org.apache.myriad.scheduler.event.SlaveLostEvent; import com.lmax.disruptor.EventHandler; import org.apache.mesos.Protos.SlaveID; +import org.apache.myriad.scheduler.event.SlaveLostEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java index 29c89c3..25d0440 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -18,17 +18,15 @@ */ package org.apache.myriad.scheduler.event.handlers; -import org.apache.myriad.scheduler.event.StatusUpdateEvent; -import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; -import org.apache.myriad.state.NodeTask; -import org.apache.myriad.state.SchedulerState; import com.lmax.disruptor.EventHandler; - import javax.inject.Inject; - import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; +import org.apache.myriad.scheduler.event.StatusUpdateEvent; +import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; +import org.apache.myriad.state.NodeTask; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java index ff49496..7f46941 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/ConsumedOffer.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; - import org.apache.mesos.Protos; /** http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java index b8d8326..86bbc8c 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -18,8 +18,6 @@ */ package org.apache.myriad.scheduler.fgs; -import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; -import org.apache.myriad.state.SchedulerState; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; @@ -37,6 +35,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Offer; +import org.apache.myriad.scheduler.MyriadDriver; +import org.apache.myriad.scheduler.SchedulerUtils; +import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,15 +51,16 @@ public class NMHeartBeatHandler extends BaseInterceptor { Logger logger = LoggerFactory.getLogger(NMHeartBeatHandler.class); private final AbstractYarnScheduler yarnScheduler; - private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + private final MyriadDriver myriadDriver; private final YarnNodeCapacityManager yarnNodeCapacityMgr; private final OfferLifecycleManager offerLifecycleMgr; private final NodeStore nodeStore; private final SchedulerState state; @Inject - public NMHeartBeatHandler(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, org.apache.myriad.scheduler.MyriadDriver myriadDriver, YarnNodeCapacityManager yarnNodeCapacityMgr, - OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState state) { + public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver, + YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr, + NodeStore nodeStore, SchedulerState state) { if (registry != null) { registry.register(this); @@ -75,7 +79,7 @@ public class NMHeartBeatHandler extends BaseInterceptor { return new CallBackFilter() { @Override public boolean allowCallBacksForNode(NodeId nodeManager) { - return org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); + return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); } }; } @@ -87,8 +91,9 @@ public class NMHeartBeatHandler extends BaseInterceptor { RMNode rmNode = context.getRMNodes().get(event.getNodeId()); Resource totalCapability = rmNode.getTotalCapability(); if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) { - logger.warn("FineGrainedScaling feature got invoked for a " + "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability - .getVirtualCores()); + logger.warn( + "FineGrainedScaling feature got invoked for a NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the " + + "NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability.getVirtualCores()); totalCapability.setMemory(0); totalCapability.setVirtualCores(0); } @@ -122,7 +127,8 @@ public class NMHeartBeatHandler extends BaseInterceptor { // New capacity of the node = // resources under use on the node (due to previous offers) + // new resources offered by mesos for the node - yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(hostName))); + yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos( + hostName))); } private Resource getNewResourcesOfferedByMesos(String hostname) { @@ -140,7 +146,8 @@ public class NMHeartBeatHandler extends BaseInterceptor { Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers); if (logger.isDebugEnabled()) { - logger.debug("NM on host {} got {} CPUs and {} memory from mesos", hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory()); + logger.debug("NM on host {} got {} CPUs and {} memory from mesos", hostname, fromMesosOffers.getVirtualCores(), + fromMesosOffers.getMemory()); } return fromMesosOffers; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java index 035953e..f9df6af 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/Node.java @@ -20,7 +20,6 @@ package org.apache.myriad.scheduler.fgs; import java.util.HashSet; import java.util.Set; - import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.mesos.Protos; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java index a940a0e..c481b17 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/NodeStore.java @@ -19,7 +19,6 @@ package org.apache.myriad.scheduler.fgs; import java.util.concurrent.ConcurrentHashMap; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java index 521ea57..3caece7 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferFeed.java @@ -19,7 +19,6 @@ package org.apache.myriad.scheduler.fgs; import java.util.concurrent.ConcurrentLinkedQueue; - import org.apache.mesos.Protos; /** http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java index 0e283cb..135158f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/OfferLifecycleManager.java @@ -21,11 +21,10 @@ package org.apache.myriad.scheduler.fgs; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import javax.inject.Inject; - import org.apache.mesos.Protos; import org.apache.mesos.Protos.Offer; +import org.apache.myriad.scheduler.MyriadDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +42,10 @@ public class OfferLifecycleManager { private final Map<String, ConsumedOffer> consumedOfferMap; private final NodeStore nodeStore; - private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + private final MyriadDriver myriadDriver; @Inject - public OfferLifecycleManager(NodeStore nodeStore, org.apache.myriad.scheduler.MyriadDriver myriadDriver) { + public OfferLifecycleManager(NodeStore nodeStore, MyriadDriver myriadDriver) { this.offerFeedMap = new ConcurrentHashMap<>(200, 0.75f, 50); this.consumedOfferMap = new HashMap<>(200, 0.75f); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java index ca094cc..f058d42 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -18,18 +18,13 @@ */ package org.apache.myriad.scheduler.fgs; -import org.apache.myriad.executor.ContainerTaskStatusRequest; -import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.util.HashSet; import java.util.List; import java.util.Set; - import javax.inject.Inject; - import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -45,6 +40,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.mesos.Protos; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.executor.ContainerTaskStatusRequest; +import org.apache.myriad.scheduler.MyriadDriver; +import org.apache.myriad.scheduler.SchedulerUtils; +import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,14 +66,15 @@ public class YarnNodeCapacityManager extends BaseInterceptor { private final AbstractYarnScheduler yarnScheduler; private final RMContext rmContext; - private final org.apache.myriad.scheduler.MyriadDriver myriadDriver; + private final MyriadDriver myriadDriver; private final OfferLifecycleManager offerLifecycleMgr; private final NodeStore nodeStore; - private final org.apache.myriad.state.SchedulerState state; + private final SchedulerState state; @Inject - public YarnNodeCapacityManager(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, org.apache.myriad.scheduler.MyriadDriver myriadDriver, OfferLifecycleManager - offerLifecycleMgr, NodeStore nodeStore, org.apache.myriad.state.SchedulerState state) { + public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, + MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, + SchedulerState state) { if (registry != null) { registry.register(this); } @@ -88,7 +91,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { return new CallBackFilter() { @Override public boolean allowCallBacksForNode(NodeId nodeManager) { - return org.apache.myriad.scheduler.SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); + return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); } }; } @@ -96,7 +99,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { @Override public void afterSchedulerEventHandled(SchedulerEvent event) { switch (event.getType()) { - case NODE_ADDED: { + case NODE_ADDED: if (!(event instanceof NodeAddedSchedulerEvent)) { LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeAddedSchedulerEvent.class.getName()); return; @@ -109,10 +112,9 @@ public class YarnNodeCapacityManager extends BaseInterceptor { SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId); nodeStore.add(node); LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host); - } break; - case NODE_UPDATE: { + case NODE_UPDATE: if (!(event instanceof NodeUpdateSchedulerEvent)) { LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeUpdateSchedulerEvent.class.getName()); return; @@ -120,7 +122,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor { RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode(); handleContainerAllocation(rmNode); - } + break; default: @@ -148,14 +150,16 @@ public class YarnNodeCapacityManager extends BaseInterceptor { Set<RMContainer> containersBeforeSched = node.getContainerSnapshot(); Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers()); - Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(containersAfterSched, containersBeforeSched); + Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference( + containersAfterSched, containersBeforeSched); if (containersAllocatedByMesosOffer.isEmpty()) { LOGGER.debug("No containers allocated using Mesos offers for host: {}", host); for (Protos.Offer offer : consumedOffer.getOffers()) { offerLifecycleMgr.declineOffer(offer); } - setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()))); + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers( + consumedOffer.getOffers()))); } else { LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size()); @@ -195,25 +199,35 @@ public class YarnNodeCapacityManager extends BaseInterceptor { LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); // updates the scheduler with the new capacity for the NM. // the event is handled by the scheduler asynchronously - rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance( + rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); } private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) { Protos.Offer offer = consumedOffer.getOffers().get(0); Container container = rmContainer.getContainer(); - Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue( + ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); // TODO (sdaingade) Remove ExecutorInfo from the Node object // as this is now cached in the NodeTask object in scheduler state. Protos.ExecutorInfo executorInfo = node.getExecInfo(); if (executorInfo == null) { - executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build(); + executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX) + .getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build(); node.setExecInfo(executorInfo); } - return Protos.TaskInfo.newBuilder().setName("task_" + taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar - .newBuilder().setValue(container.getResource().getVirtualCores()))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource() - .getMemory()))).setExecutor(executorInfo).build(); + return Protos.TaskInfo.newBuilder() + .setName("task_" + taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) + .addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar( + Protos.Value.Scalar.newBuilder().setValue(container.getResource().getVirtualCores()))) + .addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar( + Protos.Value.Scalar.newBuilder().setValue(container.getResource().getMemory()))) + .setExecutor(executorInfo) + .build(); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java index f6d24e9..b46eeac 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadCapacityScheduler.java @@ -18,14 +18,14 @@ */ package org.apache.myriad.scheduler.yarn; -import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; -import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; /** * {@link MyriadCapacityScheduler} just extends YARN's {@link CapacityScheduler} and http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java index 35d6aaf..a4b2056 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -18,14 +18,14 @@ */ package org.apache.myriad.scheduler.yarn; -import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; -import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; /** * {@link MyriadFairScheduler} just extends YARN's {@link FairScheduler} and http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java index 1fd3b87..5526a83 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/yarn/MyriadFifoScheduler.java @@ -18,13 +18,14 @@ */ package org.apache.myriad.scheduler.yarn; -import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.YarnSchedulerInterceptor; /** * {@link MyriadFifoScheduler} just extends YARN's {@link FifoScheduler} and @@ -53,7 +54,7 @@ public class MyriadFifoScheduler extends FifoScheduler { @Override public synchronized void setRMContext(RMContext rmContext) { this.rmContext = rmContext; - this.yarnSchedulerInterceptor = new org.apache.myriad.scheduler.yarn.interceptor.CompositeInterceptor(); + this.yarnSchedulerInterceptor = new CompositeInterceptor(); rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); super.setRMContext(rmContext);