sorted import statements after the namespace change, the import statements were no longer sorted.
Added line wrapping for long lines. Set the line length to 132. This closes: #26 Review: https://github.com/apache/incubator-myriad/pull/26 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/df7d05c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/df7d05c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/df7d05c8 Branch: refs/heads/master Commit: df7d05c8639b371b94a1e94406e2f2446d10eaaf Parents: c47dc23 Author: klucar <klu...@gmail.com> Authored: Tue Nov 3 14:26:49 2015 -0800 Committer: Santosh Marella <mare...@gmail.com> Committed: Tue Nov 3 15:57:08 2015 -0800 ---------------------------------------------------------------------- .../myriad/executor/MyriadExecutorDefaults.java | 6 +- .../apache/myriad/executor/MyriadExecutor.java | 9 +- .../executor/MyriadExecutorAuxService.java | 7 +- .../recovery/MyriadFileSystemRMStateStore.java | 6 +- .../org/apache/myriad/DisruptorManager.java | 78 +++++++++-------- .../src/main/java/org/apache/myriad/Main.java | 91 ++++++++++++-------- .../java/org/apache/myriad/MesosModule.java | 19 ++-- .../java/org/apache/myriad/MyriadModule.java | 33 ++++--- .../org/apache/myriad/api/ClustersResource.java | 43 ++++----- .../myriad/api/ConfigurationResource.java | 3 +- .../myriad/api/SchedulerStateResource.java | 18 ++-- .../api/model/GetSchedulerStateResponse.java | 3 +- .../configuration/MyriadConfiguration.java | 10 +-- .../MyriadExecutorConfiguration.java | 5 +- .../configuration/NodeManagerConfiguration.java | 7 +- .../configuration/OptionalSerializer.java | 22 +++-- .../configuration/ServiceConfiguration.java | 20 ++--- .../apache/myriad/health/HealthCheckUtils.java | 5 +- .../myriad/health/MesosDriverHealthCheck.java | 8 +- .../myriad/health/MesosMasterHealthCheck.java | 10 +-- .../myriad/health/ZookeeperHealthCheck.java | 3 +- .../myriad/policy/LeastAMNodesFirstPolicy.java | 27 +++--- .../myriad/policy/NodeScaleDownPolicy.java | 3 +- .../scheduler/DownloadNMExecutorCLGenImpl.java | 7 +- .../apache/myriad/scheduler/MyriadDriver.java | 3 +- .../myriad/scheduler/MyriadDriverManager.java | 7 +- .../myriad/scheduler/MyriadOperations.java | 56 +++++++----- .../myriad/scheduler/MyriadScheduler.java | 34 ++++---- .../myriad/scheduler/NMExecutorCLGenImpl.java | 14 ++- .../org/apache/myriad/scheduler/NMPorts.java | 4 +- .../myriad/scheduler/NMProfileManager.java | 5 +- .../scheduler/NMTaskFactoryAnnotation.java | 6 +- .../org/apache/myriad/scheduler/Rebalancer.java | 13 +-- .../myriad/scheduler/ReconcileService.java | 11 ++- .../apache/myriad/scheduler/SchedulerUtils.java | 10 +-- .../myriad/scheduler/ServiceProfileManager.java | 4 +- .../scheduler/ServiceResourceProfile.java | 11 ++- .../scheduler/ServiceTaskConstraints.java | 1 - .../scheduler/ServiceTaskFactoryImpl.java | 24 +++--- .../apache/myriad/scheduler/TaskFactory.java | 61 ++++++++----- .../apache/myriad/scheduler/TaskTerminator.java | 12 ++- .../org/apache/myriad/scheduler/TaskUtils.java | 44 +++++----- .../scheduler/constraints/Constraint.java | 4 +- .../scheduler/constraints/LikeConstraint.java | 3 +- .../scheduler/event/ResourceOffersEvent.java | 3 +- .../handlers/DisconnectedEventHandler.java | 5 +- .../event/handlers/ErrorEventHandler.java | 5 +- .../handlers/ExecutorLostEventHandler.java | 5 +- .../handlers/FrameworkMessageEventHandler.java | 5 +- .../handlers/OfferRescindedEventHandler.java | 5 +- .../handlers/ReRegisteredEventHandler.java | 4 +- .../event/handlers/RegisteredEventHandler.java | 8 +- .../handlers/ResourceOffersEventHandler.java | 81 +++++++++-------- .../event/handlers/SlaveLostEventHandler.java | 2 +- .../handlers/StatusUpdateEventHandler.java | 10 +-- .../myriad/scheduler/fgs/ConsumedOffer.java | 1 - .../scheduler/fgs/NMHeartBeatHandler.java | 27 +++--- .../org/apache/myriad/scheduler/fgs/Node.java | 1 - .../apache/myriad/scheduler/fgs/NodeStore.java | 1 - .../apache/myriad/scheduler/fgs/OfferFeed.java | 1 - .../scheduler/fgs/OfferLifecycleManager.java | 7 +- .../scheduler/fgs/YarnNodeCapacityManager.java | 58 ++++++++----- .../scheduler/yarn/MyriadCapacityScheduler.java | 4 +- .../scheduler/yarn/MyriadFairScheduler.java | 4 +- .../scheduler/yarn/MyriadFifoScheduler.java | 5 +- .../scheduler/yarn/RMNodeEventHandler.java | 2 +- .../yarn/interceptor/BaseInterceptor.java | 3 +- .../yarn/interceptor/CompositeInterceptor.java | 6 +- .../MyriadInitializationInterceptor.java | 6 +- .../interceptor/YarnSchedulerInterceptor.java | 3 +- .../java/org/apache/myriad/state/Cluster.java | 1 - .../org/apache/myriad/state/MyriadState.java | 4 +- .../java/org/apache/myriad/state/NodeTask.java | 15 ++-- .../org/apache/myriad/state/SchedulerState.java | 23 ++--- .../myriad/state/utils/ByteBufferSupport.java | 17 ++-- .../apache/myriad/state/utils/StoreContext.java | 26 +++--- .../myriad/webapp/HttpConnectorProvider.java | 5 +- .../myriad/webapp/MyriadServletModule.java | 6 +- .../apache/myriad/webapp/MyriadWebServer.java | 9 +- .../java/org/apache/myriad/MesosModule.java | 15 ++-- .../org/apache/myriad/MultiBindingsTest.java | 12 ++- .../org/apache/myriad/MultiBindingsUsage.java | 2 - .../org/apache/myriad/MyriadTestModule.java | 41 ++++----- .../MyriadBadConfigurationExceptionTest.java | 3 +- .../configuration/MyriadConfigurationTest.java | 13 ++- .../myriad/scheduler/SchedulerUtilsSpec.groovy | 2 +- .../myriad/scheduler/TMSTaskFactoryImpl.java | 2 - .../myriad/scheduler/TestMyriadScheduler.java | 8 -- .../myriad/scheduler/TestRandomPorts.java | 11 ++- .../scheduler/TestServiceCommandLine.java | 27 +++--- .../apache/myriad/scheduler/TestTaskUtils.java | 21 ++--- .../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 2 +- 92 files changed, 666 insertions(+), 631 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java index eee4a60..bda7ff0 100644 --- a/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java +++ b/myriad-commons/src/main/java/org/apache/myriad/executor/MyriadExecutorDefaults.java @@ -29,9 +29,11 @@ public class MyriadExecutorDefaults { */ public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; - public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; + public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; - public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; + public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = + "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; /** * YARN class to help handle LCE resources http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java index cc9b9af..8aa580c 100644 --- a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutor.java @@ -18,6 +18,8 @@ */ package org.apache.myriad.executor; +import java.nio.charset.Charset; +import java.util.Set; import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; import org.apache.mesos.Protos; @@ -28,13 +30,9 @@ import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskInfo; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.Charset; -import java.util.Set; - /** * Myriad's Executor */ @@ -80,7 +78,8 @@ public class MyriadExecutor implements Executor { // currently running synchronized (containerIds) { for (String containerId : containerIds) { - Protos.TaskID containerTaskId = Protos.TaskID.newBuilder().setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX + containerId).build(); + Protos.TaskID containerTaskId = Protos.TaskID.newBuilder().setValue( + MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX + containerId).build(); status = TaskStatus.newBuilder().setTaskId(containerTaskId).setState(TaskState.TASK_KILLED).build(); driver.sendStatusUpdate(status); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java index 322f124..cca81b9 100644 --- a/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/org/apache/myriad/executor/MyriadExecutorAuxService.java @@ -22,25 +22,22 @@ package org.apache.myriad.executor; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; - import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; - import org.apache.mesos.MesosExecutorDriver; +import org.apache.mesos.Protos; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; -import org.apache.mesos.Protos; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Auxillary service wrapper for MyriadExecutor + * Auxillary service wrapper for MyriadExecutor */ public class MyriadExecutorAuxService extends AuxiliaryService { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java index 4991df2..85fed08 100644 --- a/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java +++ b/myriad-scheduler/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MyriadFileSystemRMStateStore.java @@ -20,16 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.myriad.state.MyriadStateStore; import org.apache.myriad.state.utils.StoreContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * StateStore that stores Myriad state in addition to RM state to DFS. http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java index 698f615..e471102 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/DisruptorManager.java @@ -18,6 +18,11 @@ */ package org.apache.myriad; +import com.google.inject.Injector; +import com.lmax.disruptor.dsl.Disruptor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.myriad.scheduler.event.*; import org.apache.myriad.scheduler.event.handlers.DisconnectedEventHandler; import org.apache.myriad.scheduler.event.handlers.ErrorEventHandler; import org.apache.myriad.scheduler.event.handlers.ExecutorLostEventHandler; @@ -28,11 +33,6 @@ import org.apache.myriad.scheduler.event.handlers.RegisteredEventHandler; import org.apache.myriad.scheduler.event.handlers.ResourceOffersEventHandler; import org.apache.myriad.scheduler.event.handlers.SlaveLostEventHandler; import org.apache.myriad.scheduler.event.handlers.StatusUpdateEventHandler; -import com.google.inject.Injector; -import com.lmax.disruptor.dsl.Disruptor; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Disruptor class is an event bus used in high speed financial systems. http://martinfowler.com/articles/lmax.html @@ -44,16 +44,16 @@ public class DisruptorManager { private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64; private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024; - private Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> registeredEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> reRegisteredEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> resourceOffersEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> offerRescindedEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> statusUpdateEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> frameworkMessageEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> disconnectedEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> slaveLostEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> executorLostEventDisruptor; - private Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> errorEventDisruptor; + private Disruptor<RegisteredEvent> registeredEventDisruptor; + private Disruptor<ReRegisteredEvent> reRegisteredEventDisruptor; + private Disruptor<ResourceOffersEvent> resourceOffersEventDisruptor; + private Disruptor<OfferRescindedEvent> offerRescindedEventDisruptor; + private Disruptor<StatusUpdateEvent> statusUpdateEventDisruptor; + private Disruptor<FrameworkMessageEvent> frameworkMessageEventDisruptor; + private Disruptor<DisconnectedEvent> disconnectedEventDisruptor; + private Disruptor<SlaveLostEvent> slaveLostEventDisruptor; + private Disruptor<ExecutorLostEvent> executorLostEventDisruptor; + private Disruptor<ErrorEvent> errorEventDisruptor; @SuppressWarnings("unchecked") public void init(Injector injector) { @@ -62,85 +62,93 @@ public class DisruptorManager { // todo: (kensipe) need to make ringsize configurable (overriding the defaults) - this.registeredEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.registeredEventDisruptor = new Disruptor<>(new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, + disruptorExecutors); this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class)); this.registeredEventDisruptor.start(); - this.reRegisteredEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.reRegisteredEventDisruptor = new Disruptor<>(new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, + disruptorExecutors); this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class)); this.reRegisteredEventDisruptor.start(); - this.resourceOffersEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.resourceOffersEventDisruptor = new Disruptor<>(new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class)); this.resourceOffersEventDisruptor.start(); - this.offerRescindedEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.offerRescindedEventDisruptor = new Disruptor<>(new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class)); this.offerRescindedEventDisruptor.start(); - this.statusUpdateEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.statusUpdateEventDisruptor = new Disruptor<>(new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class)); this.statusUpdateEventDisruptor.start(); - this.frameworkMessageEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.frameworkMessageEventDisruptor = new Disruptor<>(new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class)); this.frameworkMessageEventDisruptor.start(); - this.disconnectedEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.disconnectedEventDisruptor = new Disruptor<>(new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class)); this.disconnectedEventDisruptor.start(); - this.slaveLostEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.slaveLostEventDisruptor = new Disruptor<>(new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class)); this.slaveLostEventDisruptor.start(); - this.executorLostEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.executorLostEventDisruptor = new Disruptor<>(new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, + disruptorExecutors); this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class)); this.executorLostEventDisruptor.start(); - this.errorEventDisruptor = new Disruptor<>(new org.apache.myriad.scheduler.event.ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class)); this.errorEventDisruptor.start(); } - public Disruptor<org.apache.myriad.scheduler.event.RegisteredEvent> getRegisteredEventDisruptor() { + public Disruptor<RegisteredEvent> getRegisteredEventDisruptor() { return registeredEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.ReRegisteredEvent> getReRegisteredEventDisruptor() { + public Disruptor<ReRegisteredEvent> getReRegisteredEventDisruptor() { return reRegisteredEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.ResourceOffersEvent> getResourceOffersEventDisruptor() { + public Disruptor<ResourceOffersEvent> getResourceOffersEventDisruptor() { return resourceOffersEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.OfferRescindedEvent> getOfferRescindedEventDisruptor() { + public Disruptor<OfferRescindedEvent> getOfferRescindedEventDisruptor() { return offerRescindedEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.StatusUpdateEvent> getStatusUpdateEventDisruptor() { + public Disruptor<StatusUpdateEvent> getStatusUpdateEventDisruptor() { return statusUpdateEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.FrameworkMessageEvent> getFrameworkMessageEventDisruptor() { + public Disruptor<FrameworkMessageEvent> getFrameworkMessageEventDisruptor() { return frameworkMessageEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.DisconnectedEvent> getDisconnectedEventDisruptor() { + public Disruptor<DisconnectedEvent> getDisconnectedEventDisruptor() { return disconnectedEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.SlaveLostEvent> getSlaveLostEventDisruptor() { + public Disruptor<SlaveLostEvent> getSlaveLostEventDisruptor() { return slaveLostEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.ExecutorLostEvent> getExecutorLostEventDisruptor() { + public Disruptor<ExecutorLostEvent> getExecutorLostEventDisruptor() { return executorLostEventDisruptor; } - public Disruptor<org.apache.myriad.scheduler.event.ErrorEvent> getErrorEventDisruptor() { + public Disruptor<ErrorEvent> getErrorEventDisruptor() { return errorEventDisruptor; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java index 0f305e8..0f0703e 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/Main.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/Main.java @@ -21,43 +21,53 @@ package org.apache.myriad; import com.codahale.metrics.JmxReporter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.health.HealthCheckRegistry; -import org.apache.myriad.configuration.ServiceConfiguration; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.inject.Guice; +import com.google.inject.Injector; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.myriad.configuration.MyriadBadConfigurationException; import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.health.MesosDriverHealthCheck; import org.apache.myriad.health.MesosMasterHealthCheck; import org.apache.myriad.health.ZookeeperHealthCheck; +import org.apache.myriad.scheduler.ExtendedResourceProfile; +import org.apache.myriad.scheduler.MyriadDriverManager; +import org.apache.myriad.scheduler.MyriadOperations; import org.apache.myriad.scheduler.NMProfile; +import org.apache.myriad.scheduler.Rebalancer; import org.apache.myriad.scheduler.ServiceProfileManager; import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.ServiceTaskConstraints; +import org.apache.myriad.scheduler.TaskConstraintsManager; import org.apache.myriad.scheduler.TaskFactory; +import org.apache.myriad.scheduler.TaskTerminator; +import org.apache.myriad.scheduler.TaskUtils; +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import org.apache.myriad.state.SchedulerState; +import org.apache.myriad.webapp.MyriadWebServer; import org.apache.myriad.webapp.WebAppGuiceModule; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.inject.Guice; -import com.google.inject.Injector; - -import org.apache.commons.collections.MapUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.HashSet; - /** * Main entry point for myriad scheduler */ public class Main { private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); - private org.apache.myriad.webapp.MyriadWebServer webServer; + private MyriadWebServer webServer; private ScheduledExecutorService terminatorService; private ScheduledExecutorService rebalancerService; @@ -65,9 +75,11 @@ public class Main { private static Injector injector; - public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry) throws Exception { + public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, + InterceptorRegistry registry) throws Exception { ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - MyriadConfiguration cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"), MyriadConfiguration.class); + MyriadConfiguration cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource( + "myriad-config-default.yml"), MyriadConfiguration.class); MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry); MesosModule mesosModule = new MesosModule(); @@ -105,7 +117,7 @@ public class Main { private void startMesosDriver(Injector injector) { LOGGER.info("starting mesosDriver.."); - injector.getInstance(org.apache.myriad.scheduler.MyriadDriverManager.class).startDriver(); + injector.getInstance(MyriadDriverManager.class).startDriver(); LOGGER.info("started mesosDriver.."); } @@ -115,7 +127,7 @@ public class Main { * @param injector */ private void initWebApp(Injector injector) throws Exception { - webServer = injector.getInstance(org.apache.myriad.webapp.MyriadWebServer.class); + webServer = injector.getInstance(MyriadWebServer.class); webServer.start(); } @@ -135,10 +147,10 @@ public class Main { private void initProfiles(Injector injector) { LOGGER.info("Initializing Profiles"); ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager = injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class); - taskConstraintsManager.addTaskConstraints(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); + TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); + taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles(); - org.apache.myriad.scheduler.TaskUtils taskUtils = injector.getInstance(org.apache.myriad.scheduler.TaskUtils.class); + TaskUtils taskUtils = injector.getInstance(TaskUtils.class); if (MapUtils.isNotEmpty(profiles)) { for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) { Map<String, String> profileResourceMap = profile.getValue(); @@ -146,7 +158,8 @@ public class Main { Long cpu = Long.parseLong(profileResourceMap.get("cpu")); Long mem = Long.parseLong(profileResourceMap.get("mem")); - ServiceResourceProfile serviceProfile = new org.apache.myriad.scheduler.ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); + ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), + taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus()); serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory()); @@ -179,30 +192,31 @@ public class Main { } } if (maxCpu <= 0 || maxMem <= 0) { - throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources."); + throw new RuntimeException( + "Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources."); } } private void startNMInstances(Injector injector) { Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); - org.apache.myriad.scheduler.MyriadOperations myriadOperations = injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class); + MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - org.apache.myriad.state.SchedulerState schedulerState = injector.getInstance(org.apache.myriad.state.SchedulerState.class); + SchedulerState schedulerState = injector.getInstance(SchedulerState.class); Set<org.apache.myriad.state.NodeTask> launchedNMTasks = new HashSet<>(); - launchedNMTasks.addAll(schedulerState.getPendingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size()); return; } - launchedNMTasks.addAll(schedulerState.getStagingTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size()); return; } - launchedNMTasks.addAll(schedulerState.getActiveTasksByType(org.apache.myriad.configuration.NodeManagerConfiguration.NM_TASK_PREFIX)); + launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); if (!launchedNMTasks.isEmpty()) { LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size()); return; @@ -223,7 +237,7 @@ public class Main { private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) { LOGGER.info("Initializing initServiceConfigurations"); ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - org.apache.myriad.scheduler.TaskConstraintsManager taskConstraintsManager = injector.getInstance(org.apache.myriad.scheduler.TaskConstraintsManager.class); + TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); if (servicesConfigs != null) { @@ -234,7 +248,7 @@ public class Main { final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); - taskConstraintsManager.addTaskConstraints(taskPrefix, new org.apache.myriad.scheduler.ServiceTaskConstraints(cfg, taskPrefix)); + taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); } } } @@ -244,7 +258,7 @@ public class Main { terminatorService = Executors.newScheduledThreadPool(1); final int initialDelay = 100; final int period = 2000; - terminatorService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS); + terminatorService.scheduleAtFixedRate(injector.getInstance(TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS); } private void initRebalancerService(MyriadConfiguration cfg, Injector injector) { @@ -253,7 +267,7 @@ public class Main { rebalancerService = Executors.newScheduledThreadPool(1); final int initialDelay = 100; final int period = 5000; - rebalancerService.scheduleAtFixedRate(injector.getInstance(org.apache.myriad.scheduler.Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS); + rebalancerService.scheduleAtFixedRate(injector.getInstance(Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS); } else { LOGGER.info("Rebalancer is not turned on"); } @@ -271,9 +285,10 @@ public class Main { * @param injector */ private void startJavaBasedTaskInstance(Injector injector) { - Map<String, ServiceConfiguration> auxServicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); + Map<String, ServiceConfiguration> auxServicesConfigs = injector.getInstance(MyriadConfiguration.class) + .getServiceConfigurations(); if (auxServicesConfigs != null) { - org.apache.myriad.scheduler.MyriadOperations myriadOperations = injector.getInstance(org.apache.myriad.scheduler.MyriadOperations.class); + MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { try { myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey()); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java index 2131178..de0f99f 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MesosModule.java @@ -18,11 +18,15 @@ */ package org.apache.myriad; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.protobuf.ByteString; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.concurrent.TimeUnit; - import org.apache.commons.lang.StringUtils; import org.apache.mesos.MesosSchedulerDriver; import org.apache.mesos.Protos.Credential; @@ -32,18 +36,12 @@ import org.apache.mesos.Protos.FrameworkInfo.Builder; import org.apache.mesos.SchedulerDriver; import org.apache.mesos.state.State; import org.apache.mesos.state.ZooKeeperState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.scheduler.MyriadDriver; import org.apache.myriad.scheduler.MyriadScheduler; import org.apache.myriad.state.SchedulerState; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Scopes; -import com.google.inject.Singleton; -import com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Guice Module for Mesos objects. @@ -63,7 +61,8 @@ public class MesosModule extends AbstractModule { @Singleton SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) { - Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint(cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout()); + Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint( + cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout()); if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) { frameworkInfoBuilder.setRole(cfg.getFrameworkRole()); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java index 30773c8..41abb9a 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/MyriadModule.java @@ -18,18 +18,24 @@ */ package org.apache.myriad; -import org.apache.myriad.configuration.ServiceConfiguration; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import com.google.inject.multibindings.MapBinder; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.MyriadExecutorConfiguration; import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.policy.LeastAMNodesFirstPolicy; import org.apache.myriad.policy.NodeScaleDownPolicy; -import org.apache.myriad.scheduler.MyriadDriverManager; -import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler; -import org.apache.myriad.scheduler.fgs.NodeStore; -import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; import org.apache.myriad.scheduler.DownloadNMExecutorCLGenImpl; import org.apache.myriad.scheduler.ExecutorCommandLineGenerator; +import org.apache.myriad.scheduler.MyriadDriverManager; import org.apache.myriad.scheduler.NMExecutorCLGenImpl; import org.apache.myriad.scheduler.NMTaskFactoryAnnotation; import org.apache.myriad.scheduler.ReconcileService; @@ -38,26 +44,18 @@ import org.apache.myriad.scheduler.ServiceTaskFactoryImpl; import org.apache.myriad.scheduler.TaskConstraintsManager; import org.apache.myriad.scheduler.TaskFactory; import org.apache.myriad.scheduler.TaskFactory.NMTaskFactoryImpl; +import org.apache.myriad.scheduler.fgs.NMHeartBeatHandler; +import org.apache.myriad.scheduler.fgs.NodeStore; +import org.apache.myriad.scheduler.fgs.OfferLifecycleManager; import org.apache.myriad.scheduler.fgs.YarnNodeCapacityManager; import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; import org.apache.myriad.state.MyriadStateStore; import org.apache.myriad.state.SchedulerState; import org.apache.myriad.webapp.HttpConnectorProvider; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Scopes; -import com.google.inject.Singleton; -import com.google.inject.multibindings.MapBinder; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.myriad.webapp.MyriadWebServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * Guice Module for Myriad */ @@ -70,7 +68,8 @@ public class MyriadModule extends AbstractModule { private final RMContext rmContext; private InterceptorRegistry interceptorRegistry; - public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry interceptorRegistry) { + public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, + InterceptorRegistry interceptorRegistry) { this.cfg = cfg; this.hadoopConf = hadoopConf; this.yarnScheduler = yarnScheduler; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java index e1bee58..3a4c197 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ClustersResource.java @@ -19,26 +19,10 @@ package org.apache.myriad.api; import com.codahale.metrics.annotation.Timed; -import org.apache.myriad.api.model.FlexDownClusterRequest; -import org.apache.myriad.api.model.FlexDownServiceRequest; -import org.apache.myriad.api.model.FlexUpClusterRequest; -import org.apache.myriad.scheduler.ServiceResourceProfile; -import org.apache.myriad.scheduler.constraints.ConstraintFactory; -import org.apache.myriad.state.SchedulerState; import com.google.common.base.Preconditions; - import java.util.List; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; - -import org.apache.myriad.api.model.FlexUpServiceRequest; -import org.apache.myriad.configuration.MyriadBadConfigurationException; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.myriad.scheduler.ServiceProfileManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.PUT; @@ -48,6 +32,19 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; +import org.apache.myriad.api.model.FlexDownClusterRequest; +import org.apache.myriad.api.model.FlexDownServiceRequest; +import org.apache.myriad.api.model.FlexUpClusterRequest; +import org.apache.myriad.api.model.FlexUpServiceRequest; +import org.apache.myriad.configuration.MyriadBadConfigurationException; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.apache.myriad.scheduler.MyriadOperations; +import org.apache.myriad.scheduler.ServiceProfileManager; +import org.apache.myriad.scheduler.ServiceResourceProfile; +import org.apache.myriad.scheduler.constraints.ConstraintFactory; +import org.apache.myriad.state.SchedulerState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RESTful API to resource manager @@ -60,10 +57,11 @@ public class ClustersResource { private MyriadConfiguration cfg; private SchedulerState schedulerState; private ServiceProfileManager profileManager; - private org.apache.myriad.scheduler.MyriadOperations myriadOperations; + private MyriadOperations myriadOperations; @Inject - public ClustersResource(MyriadConfiguration cfg, SchedulerState state, ServiceProfileManager profileManager, org.apache.myriad.scheduler.MyriadOperations myriadOperations) { + public ClustersResource(MyriadConfiguration cfg, SchedulerState state, ServiceProfileManager profileManager, + MyriadOperations myriadOperations) { this.cfg = cfg; this.schedulerState = state; this.profileManager = profileManager; @@ -91,7 +89,8 @@ public class ClustersResource { Response returnResponse = response.build(); if (returnResponse.getStatus() == Response.Status.ACCEPTED.getStatusCode()) { String constraint = constraints != null && !constraints.isEmpty() ? constraints.get(0) : null; - this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances, ConstraintFactory.createConstraint(constraint)); + this.myriadOperations.flexUpCluster(this.profileManager.get(profile), instances, ConstraintFactory.createConstraint( + constraint)); } return returnResponse; @@ -228,7 +227,8 @@ public class ClustersResource { String[] splits = constraint.split(" LIKE "); // "<key> LIKE <val_regex>" if (splits.length != 2) { - String message = String.format("Invalid format for LIKE operator in constraint: %s. Format: %s", constraint, LIKE_CONSTRAINT_FORMAT); + String message = String.format("Invalid format for LIKE operator in constraint: %s. Format: %s", constraint, + LIKE_CONSTRAINT_FORMAT); response.status(Status.BAD_REQUEST).entity(message); LOGGER.error(message); return false; @@ -247,7 +247,8 @@ public class ClustersResource { private Integer getNumFlexedupNMs(String profile) { ServiceResourceProfile serviceProfile = profileManager.get(profile); - return this.schedulerState.getActiveTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getStagingTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getPendingTaskIDsForProfile(serviceProfile).size(); + return this.schedulerState.getActiveTaskIDsForProfile(serviceProfile).size() + this.schedulerState.getStagingTaskIDsForProfile( + serviceProfile).size() + this.schedulerState.getPendingTaskIDsForProfile(serviceProfile).size(); } @Timed http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java index 336b22f..11f46d0 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/ConfigurationResource.java @@ -19,13 +19,12 @@ package org.apache.myriad.api; import com.codahale.metrics.annotation.Timed; -import org.apache.myriad.configuration.MyriadConfiguration; - import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import org.apache.myriad.configuration.MyriadConfiguration; /** * Defines the REST API to the Myriad configuration. http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java index a3bee85..ae21c69 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/SchedulerStateResource.java @@ -19,19 +19,18 @@ package org.apache.myriad.api; import com.codahale.metrics.annotation.Timed; -import org.apache.myriad.api.model.GetSchedulerStateResponse; -import org.apache.myriad.configuration.MyriadConfiguration; -import org.apache.commons.collections.CollectionUtils; -import org.apache.mesos.Protos; - +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; +import org.apache.commons.collections.CollectionUtils; +import org.apache.mesos.Protos; +import org.apache.myriad.api.model.GetSchedulerStateResponse; +import org.apache.myriad.configuration.MyriadConfiguration; /** * Defines the REST API for the current state of Myriad @@ -51,7 +50,8 @@ public class SchedulerStateResource { @Timed @GET public GetSchedulerStateResponse getState() { - return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection(state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks())); + return new GetSchedulerStateResponse(toStringCollection(state.getPendingTaskIds()), toStringCollection( + state.getStagingTaskIds()), toStringCollection(state.getActiveTaskIds()), toStringCollection(state.getKillableTasks())); } private Collection<String> toStringCollection(Collection<Protos.TaskID> collection) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java index 9ae5cc7..5e76b27 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/api/model/GetSchedulerStateResponse.java @@ -33,7 +33,8 @@ public class GetSchedulerStateResponse { } - public GetSchedulerStateResponse(Collection<String> pendingTasks, Collection<String> stagingTasks, Collection<String> activeTasks, Collection<String> killableTasks) { + public GetSchedulerStateResponse(Collection<String> pendingTasks, Collection<String> stagingTasks, Collection<String> activeTasks, + Collection<String> killableTasks) { this.pendingTasks = pendingTasks; this.stagingTasks = stagingTasks; this.activeTasks = activeTasks; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java index efb185f..78df3b6 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadConfiguration.java @@ -19,13 +19,11 @@ package org.apache.myriad.configuration; import com.fasterxml.jackson.annotation.JsonProperty; -import org.codehaus.jackson.map.annotate.JsonSerialize; import com.google.common.base.Optional; import com.google.common.base.Strings; - -import org.hibernate.validator.constraints.NotEmpty; - import java.util.Map; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.hibernate.validator.constraints.NotEmpty; /** * Myriad Configuration commonly defined in the YML file @@ -104,11 +102,11 @@ public class MyriadConfiguration { private String frameworkRole; @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) private String frameworkUser; @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) private String frameworkSuperUser; @JsonProperty http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java index d68b6fb..4479725 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/MyriadExecutorConfiguration.java @@ -18,11 +18,10 @@ */ package org.apache.myriad.configuration; -import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble; -import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; - +import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble; +import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString; import org.codehaus.jackson.map.annotate.JsonSerialize; import org.hibernate.validator.constraints.NotEmpty; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java index 1c6c096..de709ca 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/NodeManagerConfiguration.java @@ -18,13 +18,12 @@ */ package org.apache.myriad.configuration; -import org.codehaus.jackson.map.annotate.JsonSerialize; - +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerBoolean; import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble; import org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; +import org.codehaus.jackson.map.annotate.JsonSerialize; /** * Node Manager Configuration http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java index e0cba43..2e4eb8e 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/OptionalSerializer.java @@ -18,9 +18,9 @@ */ package org.apache.myriad.configuration; +import com.google.common.base.Optional; import java.io.IOException; import java.util.Map; - import org.codehaus.jackson.JsonFactory; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonProcessingException; @@ -28,8 +28,6 @@ import org.codehaus.jackson.map.JsonSerializer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializerProvider; -import com.google.common.base.Optional; - /** * Custom Serializer that allows to serialize Optional * today Optional does not serialize value, but just state: "present: true/false" @@ -49,7 +47,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { } @Override - public void serialize(Optional<T> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<T> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { if (value.isPresent()) { objMapper.writeValue(jgen, value.get()); } else { @@ -62,7 +61,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { */ public static class OptionalSerializerString extends OptionalSerializer<String> { @Override - public void serialize(Optional<String> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<String> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { super.serialize(value, jgen, provider); } } @@ -72,7 +72,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { */ public static class OptionalSerializerDouble extends OptionalSerializer<Double> { @Override - public void serialize(Optional<Double> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<Double> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { super.serialize(value, jgen, provider); } } @@ -82,7 +83,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { */ public static class OptionalSerializerInt extends OptionalSerializer<Integer> { @Override - public void serialize(Optional<Integer> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<Integer> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { super.serialize(value, jgen, provider); } } @@ -92,7 +94,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { */ public static class OptionalSerializerBoolean extends OptionalSerializer<Boolean> { @Override - public void serialize(Optional<Boolean> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<Boolean> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { super.serialize(value, jgen, provider); } } @@ -102,7 +105,8 @@ public class OptionalSerializer<T> extends JsonSerializer<Optional<T>> { */ public static class OptionalSerializerMap extends OptionalSerializer<Map<?, ?>> { @Override - public void serialize(Optional<Map<?, ?>> value, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException { + public void serialize(Optional<Map<?, ?>> value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { super.serialize(value, jgen, provider); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java index f0b913c..146df42 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/configuration/ServiceConfiguration.java @@ -18,16 +18,14 @@ package org.apache.myriad.configuration; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; import java.util.Map; - import org.codehaus.jackson.map.annotate.JsonSerialize; import org.hibernate.validator.constraints.NotEmpty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; - /** * Configuration for any service/task to be started from Myriad Scheduler */ @@ -43,25 +41,25 @@ public class ServiceConfiguration { * Translates to -Xmx for the JVM. */ @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerDouble.class) protected Double jvmMaxMemoryMB; /** * Amount of CPU share given to JVM. */ @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerDouble.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerDouble.class) protected Double cpus; /** * Translates to jvm opts for the JVM. */ @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) protected String jvmOpts; @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerMap.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerMap.class) protected Map<String, Long> ports; /** @@ -70,7 +68,7 @@ public class ServiceConfiguration { * we can use this one to have a specific implementation */ @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) protected String taskFactoryImplName; @JsonProperty @@ -81,11 +79,11 @@ public class ServiceConfiguration { protected String taskName; @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerInt.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerInt.class) protected Integer maxInstances; @JsonProperty - @JsonSerialize(using = org.apache.myriad.configuration.OptionalSerializer.OptionalSerializerString.class) + @JsonSerialize(using = OptionalSerializer.OptionalSerializerString.class) protected String command; @JsonProperty http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java index e1b6a17..8abbb57 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/HealthCheckUtils.java @@ -18,11 +18,10 @@ */ package org.apache.myriad.health; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.net.Socket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Health Check Utilities http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java index 96dcfe2..dc216ed 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosDriverHealthCheck.java @@ -19,9 +19,9 @@ package org.apache.myriad.health; import com.codahale.metrics.health.HealthCheck; -import org.apache.mesos.Protos.Status; - import javax.inject.Inject; +import org.apache.mesos.Protos.Status; +import org.apache.myriad.scheduler.MyriadDriverManager; /** * Health Check that Mesos Master is running @@ -29,10 +29,10 @@ import javax.inject.Inject; public class MesosDriverHealthCheck extends HealthCheck { public static final String NAME = "mesos-driver"; - private org.apache.myriad.scheduler.MyriadDriverManager driverManager; + private MyriadDriverManager driverManager; @Inject - public MesosDriverHealthCheck(org.apache.myriad.scheduler.MyriadDriverManager driverManager) { + public MesosDriverHealthCheck(MyriadDriverManager driverManager) { this.driverManager = driverManager; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java index a394c30..0500e7d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/MesosMasterHealthCheck.java @@ -19,16 +19,15 @@ package org.apache.myriad.health; import com.codahale.metrics.health.HealthCheck; -import org.apache.myriad.configuration.MyriadConfiguration; +import java.util.concurrent.TimeUnit; +import javax.inject.Inject; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.myriad.configuration.MyriadConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.concurrent.TimeUnit; - /** * Health check for Mesos master */ @@ -58,7 +57,8 @@ public class MesosMasterHealthCheck extends HealthCheck { for (String hostPort : hostPorts) { final int maxRetries = 3; final int baseSleepTimeMs = 1000; - CuratorFramework client = CuratorFrameworkFactory.newClient(hostPort, new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)); + CuratorFramework client = CuratorFrameworkFactory.newClient(hostPort, new ExponentialBackoffRetry(baseSleepTimeMs, + maxRetries)); client.start(); final int blockTime = 5; client.blockUntilConnected(blockTime, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java index 46b86a1..afbb8e8 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/health/ZookeeperHealthCheck.java @@ -19,9 +19,8 @@ package org.apache.myriad.health; import com.codahale.metrics.health.HealthCheck; -import org.apache.myriad.configuration.MyriadConfiguration; - import javax.inject.Inject; +import org.apache.myriad.configuration.MyriadConfiguration; /** * Health Check on ZK http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java index 916bdf0..eb2062d 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/LeastAMNodesFirstPolicy.java @@ -18,7 +18,12 @@ */ package org.apache.myriad.policy; -import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.inject.Inject; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -27,16 +32,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.mesos.Protos; +import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor; +import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry; +import org.apache.myriad.state.SchedulerState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * A scale down policy that maintains returns a list of nodes running least number of AMs. */ @@ -44,7 +45,7 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal private static final Logger LOGGER = LoggerFactory.getLogger(LeastAMNodesFirstPolicy.class); private final AbstractYarnScheduler yarnScheduler; - private final org.apache.myriad.state.SchedulerState schedulerState; + private final SchedulerState schedulerState; //TODO(Santosh): Should figure out the right values for the hashmap properties. // currently it's tuned for 200 nodes and 50 RM RPC threads (Yarn's default). @@ -52,10 +53,11 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal private static final int EXPECTED_CONCURRENT_ACCCESS_COUNT = 50; private static final float LOAD_FACTOR_DEFAULT = 0.75f; - private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, EXPECTED_CONCURRENT_ACCCESS_COUNT); + private Map<String, SchedulerNode> schedulerNodes = new ConcurrentHashMap<>(INITIAL_NODE_SIZE, LOAD_FACTOR_DEFAULT, + EXPECTED_CONCURRENT_ACCCESS_COUNT); @Inject - public LeastAMNodesFirstPolicy(org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, org.apache.myriad.state.SchedulerState schedulerState) { + public LeastAMNodesFirstPolicy(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, SchedulerState schedulerState) { registry.register(this); this.yarnScheduler = yarnScheduler; this.schedulerState = schedulerState; @@ -70,7 +72,8 @@ public class LeastAMNodesFirstPolicy extends BaseInterceptor implements NodeScal public void apply(List<Protos.TaskID> taskIDs) { if (LOGGER.isDebugEnabled()) { for (SchedulerNode node : schedulerNodes.values()) { - LOGGER.debug("Host {} is running {} containers including {} App Masters", node.getNodeID().getHost(), node.getRunningContainers().size(), getNumAMContainers(node.getRunningContainers())); + LOGGER.debug("Host {} is running {} containers including {} App Masters", node.getNodeID().getHost(), + node.getRunningContainers().size(), getNumAMContainers(node.getRunningContainers())); } } // We need to lock the YARN scheduler here. If we don't do that, then the YARN scheduler can http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java index 3d70294..4be96ad 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/policy/NodeScaleDownPolicy.java @@ -18,9 +18,8 @@ */ package org.apache.myriad.policy; -import org.apache.mesos.Protos; - import java.util.List; +import org.apache.mesos.Protos; /** * Policy for scaling down the node managers. http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java index 23e9798..67fd1ae 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/DownloadNMExecutorCLGenImpl.java @@ -19,12 +19,11 @@ package org.apache.myriad.scheduler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.myriad.configuration.MyriadConfiguration; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.myriad.configuration.MyriadConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation assumes NM binaries will be downloaded http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java index b8da0d3..8ff10e3 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriver.java @@ -19,7 +19,6 @@ package org.apache.myriad.scheduler; import javax.inject.Inject; - import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.SchedulerDriver; @@ -43,7 +42,7 @@ public class MyriadDriver { LOGGER.info("Stopping driver"); Status status = driver.stop(failover); LOGGER.info("Driver stopped with status: {}", status); - return status; + return status; } public Status start() { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java index f5ff5da..28232f0 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadDriverManager.java @@ -19,16 +19,15 @@ package org.apache.myriad.scheduler; import com.google.common.base.Preconditions; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.inject.Inject; import org.apache.mesos.Protos; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.inject.Inject; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - /** * Manager for the myriad scheduler driver */ http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/df7d05c8/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java index a6449b1..34367ef 100644 --- a/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/org/apache/myriad/scheduler/MyriadOperations.java @@ -18,29 +18,27 @@ */ package org.apache.myriad.scheduler; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.mesos.Protos; import org.apache.mesos.Protos.Status; import org.apache.myriad.configuration.MyriadBadConfigurationException; -import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.configuration.MyriadConfiguration; import org.apache.myriad.configuration.NodeManagerConfiguration; +import org.apache.myriad.configuration.ServiceConfiguration; import org.apache.myriad.policy.NodeScaleDownPolicy; import org.apache.myriad.scheduler.constraints.Constraint; import org.apache.myriad.scheduler.constraints.LikeConstraint; import org.apache.myriad.state.NodeTask; import org.apache.myriad.state.SchedulerState; -import com.google.common.collect.Lists; -import com.google.inject.Inject; - -import org.apache.mesos.Protos; import org.apache.myriad.webapp.MyriadWebServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - /** * Myriad scheduler operations */ @@ -54,7 +52,8 @@ public class MyriadOperations { private MyriadWebServer myriadWebServer; @Inject - public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy, MyriadDriverManager driverManager, MyriadWebServer myriadWebServer) { + public MyriadOperations(MyriadConfiguration cfg, SchedulerState schedulerState, NodeScaleDownPolicy nodeScaleDownPolicy, + MyriadDriverManager driverManager, MyriadWebServer myriadWebServer) { this.cfg = cfg; this.schedulerState = schedulerState; this.nodeScaleDownPolicy = nodeScaleDownPolicy; @@ -79,15 +78,19 @@ public class MyriadOperations { int numPendingTasksScaledDown = flexDownPendingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown); // Flex down Staging tasks, if any - int numStagingTasksScaledDown = flexDownStagingTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown); + int numStagingTasksScaledDown = flexDownStagingTasks(serviceResourceProfile, constraint, + numInstancesToScaleDown - numPendingTasksScaledDown); // Flex down Active tasks, if any - int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, constraint, numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown); + int numActiveTasksScaledDown = flexDownActiveTasks(serviceResourceProfile, constraint, + numInstancesToScaleDown - numPendingTasksScaledDown - numStagingTasksScaledDown); if (numActiveTasksScaledDown + numStagingTasksScaledDown + numPendingTasksScaledDown == 0) { - LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString()); + LOGGER.info("No Node Managers with profile '{}' and constraint '{}' found for scaling down.", + serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString()); } else { - LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with " + "'{}' profile and constraint '{}'.", numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceResourceProfile.getName(), + LOGGER.info("Flexed down {} active, {} staging and {} pending Node Managers with " + "'{}' profile and constraint '{}'.", + numActiveTasksScaledDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceResourceProfile.getName(), constraint == null ? "null" : constraint.toString()); } } @@ -107,8 +110,11 @@ public class MyriadOperations { // check number of instances // sum of active, staging, pending should be < maxInstances if (totalflexInstances > maxInstances) { - LOGGER.error("Current number of active, staging, pending and requested instances: {}" + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances); - throw new MyriadBadConfigurationException("Current number of active, staging, pending instances and requested: " + totalflexInstances + ", while it is greater then max instances allowed: " + maxInstances); + LOGGER.error("Current number of active, staging, pending and requested instances: {}" + + ", while it is greater then max instances allowed: {}", totalflexInstances, maxInstances); + throw new MyriadBadConfigurationException( + "Current number of active, staging, pending instances and requested: " + totalflexInstances + + ", while it is greater then max instances allowed: " + maxInstances); } } @@ -179,15 +185,18 @@ public class MyriadOperations { } } - LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}", numScaledDown, numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName); + LOGGER.info("Flexed down {} of {} instances including {} staging instances, and {} pending instances of {}", numScaledDown, + numInstancesToScaleDown, numStagingTasksScaledDown, numPendingTasksScaledDown, serviceName); } private int flexDownPendingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { - return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0; + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getPendingTaskIDsForProfile(profile), profile, constraint, + numInstancesToScaleDown) : 0; } private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { - return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0; + return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, + numInstancesToScaleDown) : 0; } private int flexDownActiveTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { @@ -199,7 +208,8 @@ public class MyriadOperations { return 0; } - private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + private int flexDownTasks(Collection<Protos.TaskID> taskIDs, ServiceResourceProfile profile, Constraint constraint, + int numInstancesToScaleDown) { int numInstancesScaledDown = 0; for (Protos.TaskID taskID : taskIDs) { NodeTask nodeTask = schedulerState.getTask(taskID); @@ -236,7 +246,9 @@ public class MyriadOperations { } public Integer getFlexibleInstances(String taskPrefix) { - return this.schedulerState.getActiveTaskIds(taskPrefix).size() + this.schedulerState.getStagingTaskIds(taskPrefix).size() + this.schedulerState.getPendingTaskIds(taskPrefix).size(); + return this.schedulerState.getActiveTaskIds(taskPrefix).size() + + this.schedulerState.getStagingTaskIds(taskPrefix).size() + + this.schedulerState.getPendingTaskIds(taskPrefix).size(); } /**