spacing changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/67ecf063 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/67ecf063 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/67ecf063 Branch: refs/heads/master Commit: 67ecf0639382d7fad386a392bd0f83bc82ae7167 Parents: e0ca4da Author: klucar <klu...@gmail.com> Authored: Wed Oct 28 09:07:44 2015 -0400 Committer: Santosh Marella <mare...@gmail.com> Committed: Wed Oct 28 08:55:17 2015 -0700 ---------------------------------------------------------------------- config/checkstyle/checkstyle.xml | 6 + myriad-commons/config/checkstyle/checkstyle.xml | 6 + .../executor/ContainerTaskStatusRequest.java | 36 +- .../myriad/executor/MyriadExecutorDefaults.java | 70 +- .../com/ebay/myriad/executor/NMTaskConfig.java | 188 ++--- .../config/checkstyle/checkstyle.xml | 6 + .../ebay/myriad/executor/MyriadExecutor.java | 38 +- .../executor/MyriadExecutorAuxService.java | 25 +- .../config/checkstyle/checkstyle.xml | 6 + .../java/com/ebay/myriad/DisruptorManager.java | 234 +++--- .../src/main/java/com/ebay/myriad/Main.java | 432 ++++++----- .../main/java/com/ebay/myriad/MesosModule.java | 36 +- .../main/java/com/ebay/myriad/MyriadModule.java | 189 +++-- .../com/ebay/myriad/api/ClustersResource.java | 427 ++++++----- .../ebay/myriad/api/ConfigurationResource.java | 26 +- .../ebay/myriad/api/SchedulerStateResource.java | 53 +- .../api/model/FlexDownClusterRequest.java | 74 +- .../api/model/FlexDownServiceRequest.java | 86 +-- .../myriad/api/model/FlexUpClusterRequest.java | 76 +- .../myriad/api/model/FlexUpServiceRequest.java | 100 +-- .../api/model/GetSchedulerStateResponse.java | 70 +- .../MyriadBadConfigurationException.java | 35 +- .../configuration/MyriadConfiguration.java | 18 +- .../MyriadExecutorConfiguration.java | 58 +- .../configuration/NodeManagerConfiguration.java | 108 +-- .../configuration/OptionalSerializer.java | 48 +- .../configuration/ServiceConfiguration.java | 67 +- .../ebay/myriad/health/HealthCheckUtils.java | 30 +- .../myriad/health/MesosDriverHealthCheck.java | 34 +- .../myriad/health/MesosMasterHealthCheck.java | 94 ++- .../myriad/health/ZookeeperHealthCheck.java | 28 +- .../myriad/policy/LeastAMNodesFirstPolicy.java | 228 +++--- .../ebay/myriad/policy/NodeScaleDownPolicy.java | 17 +- .../scheduler/DownloadNMExecutorCLGenImpl.java | 30 +- .../scheduler/ExecutorCommandLineGenerator.java | 14 +- .../scheduler/ExtendedResourceProfile.java | 19 +- .../com/ebay/myriad/scheduler/MyriadDriver.java | 60 +- .../myriad/scheduler/MyriadDriverManager.java | 123 ++-- .../ebay/myriad/scheduler/MyriadOperations.java | 309 ++++---- .../ebay/myriad/scheduler/MyriadScheduler.java | 255 +++---- .../myriad/scheduler/NMExecutorCLGenImpl.java | 135 ++-- .../java/com/ebay/myriad/scheduler/NMPorts.java | 85 +-- .../com/ebay/myriad/scheduler/NMProfile.java | 76 +- .../ebay/myriad/scheduler/NMProfileManager.java | 39 +- .../scheduler/NMTaskFactoryAnnotation.java | 40 +- .../java/com/ebay/myriad/scheduler/Ports.java | 7 +- .../com/ebay/myriad/scheduler/Rebalancer.java | 136 ++-- .../ebay/myriad/scheduler/ReconcileService.java | 80 +- .../ebay/myriad/scheduler/SchedulerUtils.java | 63 +- .../scheduler/ServiceCommandLineGenerator.java | 38 +- .../myriad/scheduler/ServiceProfileManager.java | 20 +- .../scheduler/ServiceResourceProfile.java | 40 +- .../scheduler/ServiceTaskConstraints.java | 11 +- .../scheduler/ServiceTaskFactoryImpl.java | 125 ++-- .../ebay/myriad/scheduler/TaskConstraints.java | 35 +- .../scheduler/TaskConstraintsManager.java | 13 +- .../com/ebay/myriad/scheduler/TaskFactory.java | 113 +-- .../ebay/myriad/scheduler/TaskTerminator.java | 85 +-- .../com/ebay/myriad/scheduler/TaskUtils.java | 266 ++++--- .../scheduler/constraints/Constraint.java | 6 +- .../constraints/ConstraintFactory.java | 6 +- .../scheduler/constraints/LikeConstraint.java | 9 +- .../scheduler/event/DisconnectedEvent.java | 20 +- .../event/DisconnectedEventFactory.java | 14 +- .../ebay/myriad/scheduler/event/ErrorEvent.java | 34 +- .../scheduler/event/ErrorEventFactory.java | 14 +- .../scheduler/event/ExecutorLostEvent.java | 62 +- .../event/ExecutorLostEventFactory.java | 17 +- .../scheduler/event/FrameworkMessageEvent.java | 62 +- .../event/FrameworkMessageEventFactory.java | 17 +- .../scheduler/event/OfferRescindedEvent.java | 36 +- .../event/OfferRescindedEventFactory.java | 17 +- .../scheduler/event/ReRegisteredEvent.java | 34 +- .../event/ReRegisteredEventFactory.java | 17 +- .../myriad/scheduler/event/RegisteredEvent.java | 48 +- .../scheduler/event/RegisteredEventFactory.java | 14 +- .../scheduler/event/ResourceOffersEvent.java | 34 +- .../event/ResourceOffersEventFactory.java | 17 +- .../myriad/scheduler/event/SlaveLostEvent.java | 34 +- .../scheduler/event/SlaveLostEventFactory.java | 14 +- .../scheduler/event/StatusUpdateEvent.java | 34 +- .../event/StatusUpdateEventFactory.java | 17 +- .../handlers/DisconnectedEventHandler.java | 16 +- .../event/handlers/ErrorEventHandler.java | 18 +- .../handlers/ExecutorLostEventHandler.java | 23 +- .../handlers/FrameworkMessageEventHandler.java | 21 +- .../handlers/OfferRescindedEventHandler.java | 16 +- .../handlers/ReRegisteredEventHandler.java | 26 +- .../event/handlers/RegisteredEventHandler.java | 28 +- .../handlers/ResourceOffersEventHandler.java | 82 +-- .../event/handlers/SlaveLostEventHandler.java | 18 +- .../handlers/StatusUpdateEventHandler.java | 112 ++- .../myriad/scheduler/fgs/ConsumedOffer.java | 6 +- .../scheduler/fgs/NMHeartBeatHandler.java | 39 +- .../com/ebay/myriad/scheduler/fgs/Node.java | 6 +- .../ebay/myriad/scheduler/fgs/NodeStore.java | 6 +- .../ebay/myriad/scheduler/fgs/OfferFeed.java | 8 +- .../scheduler/fgs/OfferLifecycleManager.java | 15 +- .../ebay/myriad/scheduler/fgs/OfferUtils.java | 6 +- .../scheduler/fgs/YarnNodeCapacityManager.java | 266 +++---- .../scheduler/yarn/MyriadCapacityScheduler.java | 6 +- .../scheduler/yarn/MyriadFairScheduler.java | 90 +-- .../scheduler/yarn/MyriadFifoScheduler.java | 6 +- .../scheduler/yarn/RMNodeEventHandler.java | 26 +- .../yarn/interceptor/BaseInterceptor.java | 36 +- .../yarn/interceptor/CompositeInterceptor.java | 159 ++-- .../yarn/interceptor/InterceptorRegistry.java | 8 +- .../MyriadInitializationInterceptor.java | 44 +- .../interceptor/YarnSchedulerInterceptor.java | 106 +-- .../java/com/ebay/myriad/state/Cluster.java | 142 ++-- .../java/com/ebay/myriad/state/MyriadState.java | 40 +- .../com/ebay/myriad/state/MyriadStateStore.java | 8 +- .../java/com/ebay/myriad/state/NodeTask.java | 178 ++--- .../com/ebay/myriad/state/SchedulerState.java | 725 +++++++++---------- .../myriad/state/utils/ByteBufferSupport.java | 54 +- .../ebay/myriad/state/utils/StoreContext.java | 42 +- .../myriad/webapp/HttpConnectorProvider.java | 32 +- .../ebay/myriad/webapp/MyriadServletModule.java | 24 +- .../com/ebay/myriad/webapp/MyriadWebServer.java | 63 +- .../ebay/myriad/webapp/WebAppGuiceModule.java | 16 +- .../recovery/MyriadFileSystemRMStateStore.java | 28 +- .../test/java/com/ebay/myriad/MesosModule.java | 17 +- .../java/com/ebay/myriad/MultiBindingsTest.java | 50 +- .../com/ebay/myriad/MultiBindingsUsage.java | 33 +- .../java/com/ebay/myriad/MyriadTestModule.java | 68 +- .../MyriadBadConfigurationExceptionTest.java | 35 +- .../configuration/MyriadConfigurationTest.java | 45 +- .../myriad/scheduler/SchedulerUtilsSpec.groovy | 16 +- .../myriad/scheduler/TMSTaskFactoryImpl.java | 43 +- .../myriad/scheduler/TestMyriadScheduler.java | 12 +- .../scheduler/TestServiceCommandLine.java | 31 +- .../ebay/myriad/scheduler/TestTaskUtils.java | 61 +- .../constraints/LikeConstraintSpec.groovy | 96 +-- .../myriad/scheduler/fgs/FGSTestBaseSpec.groovy | 216 +++--- .../scheduler/fgs/NMHeartBeatHandlerSpec.groovy | 148 ++-- .../fgs/YarnNodeCapacityManagerSpec.groovy | 190 ++--- 136 files changed, 4450 insertions(+), 4899 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/config/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index 0ba6803..6eb5178 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html --> <property name="severity" value="warning"/> </module> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="2"/> + </module> </module> </module> http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/config/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/myriad-commons/config/checkstyle/checkstyle.xml b/myriad-commons/config/checkstyle/checkstyle.xml index 0ba6803..6eb5178 100644 --- a/myriad-commons/config/checkstyle/checkstyle.xml +++ b/myriad-commons/config/checkstyle/checkstyle.xml @@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html --> <property name="severity" value="warning"/> </module> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="2"/> + </module> </module> </module> http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java index c37deeb..a93e3a8 100644 --- a/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java +++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/ContainerTaskStatusRequest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,23 +23,23 @@ package com.ebay.myriad.executor; * the mesos task id (placeholder task). */ public class ContainerTaskStatusRequest { - public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; - private String mesosTaskId; // YARN_CONTAINER_TASK_ID_PREFIX + <container_id> - private String state; // Protos.TaskState.name() + public static final String YARN_CONTAINER_TASK_ID_PREFIX = "yarn_"; + private String mesosTaskId; // YARN_CONTAINER_TASK_ID_PREFIX + <container_id> + private String state; // Protos.TaskState.name() - public String getMesosTaskId() { - return mesosTaskId; - } + public String getMesosTaskId() { + return mesosTaskId; + } - public void setMesosTaskId(String mesosTaskId) { - this.mesosTaskId = mesosTaskId; - } + public void setMesosTaskId(String mesosTaskId) { + this.mesosTaskId = mesosTaskId; + } - public String getState() { - return state; - } + public String getState() { + return state; + } - public void setState(String state) { - this.state = state; - } + public void setState(String state) { + this.state = state; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java index 5479f80..32925e5 100644 --- a/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java +++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/MyriadExecutorDefaults.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,54 +22,54 @@ package com.ebay.myriad.executor; * Myriad's Executor Defaults */ public class MyriadExecutorDefaults { - public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; + public static final String ENV_YARN_NODEMANAGER_OPTS = "YARN_NODEMANAGER_OPTS"; - /** - * YARN container executor class. - */ - public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; + /** + * YARN container executor class. + */ + public static final String KEY_YARN_NM_CONTAINER_EXECUTOR_CLASS = "yarn.nodemanager.container-executor.class"; - public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; + public static final String VAL_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor"; - public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; + public static final String DEFAULT_YARN_NM_CONTAINER_EXECUTOR_CLASS = "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor"; - /** - * YARN class to help handle LCE resources - */ - public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; + /** + * YARN class to help handle LCE resources + */ + public static final String KEY_YARN_NM_LCE_RH_CLASS = "yarn.nodemanager.linux-container-executor.resources-handler.class"; - public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; + public static final String VAL_YARN_NM_LCE_RH_CLASS = "org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler"; - public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; + public static final String KEY_YARN_NM_LCE_CGROUPS_HIERARCHY = "yarn.nodemanager.linux-container-executor.cgroups.hierarchy"; - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT = "yarn.nodemanager.linux-container-executor.cgroups.mount"; - public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; + public static final String KEY_YARN_NM_LCE_CGROUPS_MOUNT_PATH = "yarn.nodemanager.linux-container-executor.cgroups.mount-path"; - public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group"; + public static final String KEY_YARN_NM_LCE_GROUP = "yarn.nodemanager.linux-container-executor.group"; - public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path"; + public static final String KEY_YARN_NM_LCE_PATH = "yarn.nodemanager.linux-container-executor.path"; - public static final String KEY_YARN_HOME = "yarn.home"; + public static final String KEY_YARN_HOME = "yarn.home"; - public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; + public static final String KEY_NM_RESOURCE_CPU_VCORES = "nodemanager.resource.cpu-vcores"; - public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; + public static final String KEY_NM_RESOURCE_MEM_MB = "nodemanager.resource.memory-mb"; - /** - * Allot 10% more memory to account for JVM overhead. - */ - public static final double JVM_OVERHEAD = 0.1; + /** + * Allot 10% more memory to account for JVM overhead. + */ + public static final double JVM_OVERHEAD = 0.1; - /** - * Default -Xmx for executor JVM. - */ + /** + * Default -Xmx for executor JVM. + */ - public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256; - /** - * Default cpus for executor JVM. - */ - public static final double DEFAULT_CPUS = 0.2; + public static final double DEFAULT_JVM_MAX_MEMORY_MB = 256; + /** + * Default cpus for executor JVM. + */ + public static final double DEFAULT_CPUS = 0.2; } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java b/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java index b5d1b23..92626b3 100644 --- a/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java +++ b/myriad-commons/src/main/java/com/ebay/myriad/executor/NMTaskConfig.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,96 +24,96 @@ import java.util.Map; * Node Manger Task Configuraiton */ public class NMTaskConfig { - private String yarnHome; - private Long advertisableCpus; - private Long advertisableMem; - private String jvmOpts; - private Boolean cgroups; - private Long rpcPort; - private Long localizerPort; - private Long webAppHttpPort; - private Long shufflePort; - - private Map<String, String> yarnEnvironment; - - public String getYarnHome() { - return yarnHome; - } - - public void setYarnHome(String yarnHome) { - this.yarnHome = yarnHome; - } - - public Long getAdvertisableCpus() { - return advertisableCpus; - } - - public void setAdvertisableCpus(Long advertisableCpus) { - this.advertisableCpus = advertisableCpus; - } - - public Long getAdvertisableMem() { - return advertisableMem; - } - - public void setAdvertisableMem(Long advertisableMem) { - this.advertisableMem = advertisableMem; - } - - public String getJvmOpts() { - return jvmOpts; - } - - public void setJvmOpts(String jvmOpts) { - this.jvmOpts = jvmOpts; - } - - public Boolean getCgroups() { - return cgroups; - } - - public void setCgroups(Boolean cgroups) { - this.cgroups = cgroups; - } - - public Map<String, String> getYarnEnvironment() { - return yarnEnvironment; - } - - public void setYarnEnvironment(Map<String, String> yarnEnvironment) { - this.yarnEnvironment = yarnEnvironment; - } - - public Long getRpcPort() { - return rpcPort; - } - - public void setRpcPort(long port) { - rpcPort = port; - } - - public Long gettWebAppHttpPort() { - return webAppHttpPort; - } - - public void setWebAppHttpPort(Long port) { - webAppHttpPort = port; - } - - public Long getLocalizerPort() { - return localizerPort; - } - - public void setLocalizerPort(Long localizerPort) { - this.localizerPort = localizerPort; - } - - public Long getShufflePort() { - return shufflePort; - } - - public void setShufflePort(Long shufflePort) { - this.shufflePort = shufflePort; - } + private String yarnHome; + private Long advertisableCpus; + private Long advertisableMem; + private String jvmOpts; + private Boolean cgroups; + private Long rpcPort; + private Long localizerPort; + private Long webAppHttpPort; + private Long shufflePort; + + private Map<String, String> yarnEnvironment; + + public String getYarnHome() { + return yarnHome; + } + + public void setYarnHome(String yarnHome) { + this.yarnHome = yarnHome; + } + + public Long getAdvertisableCpus() { + return advertisableCpus; + } + + public void setAdvertisableCpus(Long advertisableCpus) { + this.advertisableCpus = advertisableCpus; + } + + public Long getAdvertisableMem() { + return advertisableMem; + } + + public void setAdvertisableMem(Long advertisableMem) { + this.advertisableMem = advertisableMem; + } + + public String getJvmOpts() { + return jvmOpts; + } + + public void setJvmOpts(String jvmOpts) { + this.jvmOpts = jvmOpts; + } + + public Boolean getCgroups() { + return cgroups; + } + + public void setCgroups(Boolean cgroups) { + this.cgroups = cgroups; + } + + public Map<String, String> getYarnEnvironment() { + return yarnEnvironment; + } + + public void setYarnEnvironment(Map<String, String> yarnEnvironment) { + this.yarnEnvironment = yarnEnvironment; + } + + public Long getRpcPort() { + return rpcPort; + } + + public void setRpcPort(long port) { + rpcPort = port; + } + + public Long gettWebAppHttpPort() { + return webAppHttpPort; + } + + public void setWebAppHttpPort(Long port) { + webAppHttpPort = port; + } + + public Long getLocalizerPort() { + return localizerPort; + } + + public void setLocalizerPort(Long localizerPort) { + this.localizerPort = localizerPort; + } + + public Long getShufflePort() { + return shufflePort; + } + + public void setShufflePort(Long shufflePort) { + this.shufflePort = shufflePort; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/config/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/myriad-executor/config/checkstyle/checkstyle.xml b/myriad-executor/config/checkstyle/checkstyle.xml index 0ba6803..6eb5178 100644 --- a/myriad-executor/config/checkstyle/checkstyle.xml +++ b/myriad-executor/config/checkstyle/checkstyle.xml @@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html --> <property name="severity" value="warning"/> </module> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="2"/> + </module> </module> </module> http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java index 47463a4..37926d4 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,9 +49,8 @@ public class MyriadExecutor implements Executor { } @Override - public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, - FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) { - LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo); + public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) { + LOGGER.debug("Registered ", executorInfo, " for framework ", frameworkInfo, " on mesos slave ", slaveInfo); } @Override @@ -67,11 +66,8 @@ public class MyriadExecutor implements Executor { @Override public void launchTask(final ExecutorDriver driver, final TaskInfo task) { LOGGER.debug("launchTask received for taskId: " + task.getTaskId()); - TaskStatus status = TaskStatus.newBuilder() - .setTaskId(task.getTaskId()) - .setState(TaskState.TASK_RUNNING) - .build(); - driver.sendStatusUpdate(status); + TaskStatus status = TaskStatus.newBuilder().setTaskId(task.getTaskId()).setState(TaskState.TASK_RUNNING).build(); + driver.sendStatusUpdate(status); } @Override @@ -79,30 +75,22 @@ public class MyriadExecutor implements Executor { LOGGER.debug("killTask received for taskId: " + taskId.getValue()); TaskStatus status; - if (!taskId.toString().contains( - MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) { + if (!taskId.toString().contains(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) { // Inform mesos of killing all tasks corresponding to yarn containers that are // currently running synchronized (containerIds) { for (String containerId : containerIds) { - Protos.TaskID containerTaskId = Protos.TaskID.newBuilder() - .setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX - + containerId).build(); - status = TaskStatus.newBuilder().setTaskId(containerTaskId) - .setState(TaskState.TASK_KILLED) - .build(); - driver.sendStatusUpdate(status); + Protos.TaskID containerTaskId = Protos.TaskID.newBuilder().setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX + containerId).build(); + status = TaskStatus.newBuilder().setTaskId(containerTaskId).setState(TaskState.TASK_KILLED).build(); + driver.sendStatusUpdate(status); } } // Now kill the node manager task - status = TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(TaskState.TASK_KILLED) - .build(); + status = TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build(); driver.sendStatusUpdate(status); LOGGER.info("NodeManager shutdown after receiving" + - " KillTask for taskId " + taskId.getValue()); + " KillTask for taskId " + taskId.getValue()); Runtime.getRuntime().exit(0); } else { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java ---------------------------------------------------------------------- diff --git a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java index 361fc05..86ea60e 100644 --- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java +++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutorAuxService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; /** * Auxillary service wrapper for MyriadExecutor */ -public class MyriadExecutorAuxService extends AuxiliaryService { +public class MyriadExecutorAuxService extends AuxiliaryService { private static final Logger LOGGER = LoggerFactory.getLogger(MyriadExecutor.class); private static final String SERVICE_NAME = "myriad_service"; @@ -65,16 +65,14 @@ public class MyriadExecutorAuxService extends AuxiliaryService { myriadExecutorThread = new Thread(new Runnable() { public void run() { driver = new MesosExecutorDriver(new MyriadExecutor(containerIds)); - LOGGER.error("MyriadExecutor exit with status " + - Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); + LOGGER.error("MyriadExecutor exit with status " + Integer.toString(driver.run() == Status.DRIVER_STOPPED ? 0 : 1)); } }); myriadExecutorThread.start(); } @Override - public void initializeApplication( - ApplicationInitializationContext initAppContext) { + public void initializeApplication(ApplicationInitializationContext initAppContext) { LOGGER.debug("initializeApplication"); } @@ -108,14 +106,9 @@ public class MyriadExecutorAuxService extends AuxiliaryService { } private void sendStatus(ContainerId containerId, TaskState taskState) { - Protos.TaskID taskId = Protos.TaskID.newBuilder() - .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()) - .build(); - - TaskStatus status = TaskStatus.newBuilder() - .setTaskId(taskId) - .setState(taskState) - .build(); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString()).build(); + + TaskStatus status = TaskStatus.newBuilder().setTaskId(taskId).setState(taskState).build(); driver.sendStatusUpdate(status); LOGGER.debug("Sent status " + taskState + " for taskId " + taskId); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/config/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/myriad-scheduler/config/checkstyle/checkstyle.xml b/myriad-scheduler/config/checkstyle/checkstyle.xml index 0ba6803..6eb5178 100644 --- a/myriad-scheduler/config/checkstyle/checkstyle.xml +++ b/myriad-scheduler/config/checkstyle/checkstyle.xml @@ -315,5 +315,11 @@ page at http://checkstyle.sourceforge.net/config.html --> <property name="severity" value="warning"/> </module> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="2"/> + <property name="throwsIndent" value="2"/> + </module> </module> </module> http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java index a38b183..e3fb399 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/DisruptorManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -40,129 +40,109 @@ import java.util.concurrent.Executors; * Here it is used to abstract incoming events. */ public class DisruptorManager { - private ExecutorService disruptorExecutors; - - private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64; - private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024; - - private Disruptor<RegisteredEvent> registeredEventDisruptor; - private Disruptor<ReRegisteredEvent> reRegisteredEventDisruptor; - private Disruptor<ResourceOffersEvent> resourceOffersEventDisruptor; - private Disruptor<OfferRescindedEvent> offerRescindedEventDisruptor; - private Disruptor<StatusUpdateEvent> statusUpdateEventDisruptor; - private Disruptor<FrameworkMessageEvent> frameworkMessageEventDisruptor; - private Disruptor<DisconnectedEvent> disconnectedEventDisruptor; - private Disruptor<SlaveLostEvent> slaveLostEventDisruptor; - private Disruptor<ExecutorLostEvent> executorLostEventDisruptor; - private Disruptor<ErrorEvent> errorEventDisruptor; - - @SuppressWarnings("unchecked") - public void init(Injector injector) { - this.disruptorExecutors = Executors.newCachedThreadPool(); - - // todo: (kensipe) need to make ringsize configurable (overriding the defaults) - - - this.registeredEventDisruptor = new Disruptor<>( - new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); - this.registeredEventDisruptor.handleEventsWith(injector - .getInstance(RegisteredEventHandler.class)); - this.registeredEventDisruptor.start(); - - this.reRegisteredEventDisruptor = new Disruptor<>( - new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); - this.reRegisteredEventDisruptor.handleEventsWith(injector - .getInstance(ReRegisteredEventHandler.class)); - this.reRegisteredEventDisruptor.start(); - - - this.resourceOffersEventDisruptor = new Disruptor<>( - new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.resourceOffersEventDisruptor.handleEventsWith(injector - .getInstance(ResourceOffersEventHandler.class)); - this.resourceOffersEventDisruptor.start(); - - this.offerRescindedEventDisruptor = new Disruptor<>( - new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.offerRescindedEventDisruptor.handleEventsWith(injector - .getInstance(OfferRescindedEventHandler.class)); - this.offerRescindedEventDisruptor.start(); - - this.statusUpdateEventDisruptor = new Disruptor<>( - new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.statusUpdateEventDisruptor.handleEventsWith(injector - .getInstance(StatusUpdateEventHandler.class)); - this.statusUpdateEventDisruptor.start(); - - this.frameworkMessageEventDisruptor = new Disruptor<>( - new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.frameworkMessageEventDisruptor.handleEventsWith(injector - .getInstance(FrameworkMessageEventHandler.class)); - this.frameworkMessageEventDisruptor.start(); - - this.disconnectedEventDisruptor = new Disruptor<>( - new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.disconnectedEventDisruptor.handleEventsWith(injector - .getInstance(DisconnectedEventHandler.class)); - this.disconnectedEventDisruptor.start(); - - this.slaveLostEventDisruptor = new Disruptor<>( - new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.slaveLostEventDisruptor.handleEventsWith(injector - .getInstance(SlaveLostEventHandler.class)); - this.slaveLostEventDisruptor.start(); - - this.executorLostEventDisruptor = new Disruptor<>( - new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.executorLostEventDisruptor.handleEventsWith(injector - .getInstance(ExecutorLostEventHandler.class)); - this.executorLostEventDisruptor.start(); - - this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(), - DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); - this.errorEventDisruptor.handleEventsWith(injector - .getInstance(ErrorEventHandler.class)); - this.errorEventDisruptor.start(); - } - - public Disruptor<RegisteredEvent> getRegisteredEventDisruptor() { - return registeredEventDisruptor; - } - - public Disruptor<ReRegisteredEvent> getReRegisteredEventDisruptor() { - return reRegisteredEventDisruptor; - } - - public Disruptor<ResourceOffersEvent> getResourceOffersEventDisruptor() { - return resourceOffersEventDisruptor; - } - - public Disruptor<OfferRescindedEvent> getOfferRescindedEventDisruptor() { - return offerRescindedEventDisruptor; - } - - public Disruptor<StatusUpdateEvent> getStatusUpdateEventDisruptor() { - return statusUpdateEventDisruptor; - } - - public Disruptor<FrameworkMessageEvent> getFrameworkMessageEventDisruptor() { - return frameworkMessageEventDisruptor; - } - - public Disruptor<DisconnectedEvent> getDisconnectedEventDisruptor() { - return disconnectedEventDisruptor; - } - - public Disruptor<SlaveLostEvent> getSlaveLostEventDisruptor() { - return slaveLostEventDisruptor; - } - - public Disruptor<ExecutorLostEvent> getExecutorLostEventDisruptor() { - return executorLostEventDisruptor; - } - - public Disruptor<ErrorEvent> getErrorEventDisruptor() { - return errorEventDisruptor; - } + private ExecutorService disruptorExecutors; + + private static final int DEFAULT_SMALL_RINGBUFFER_SIZE = 64; + private static final int DEFAULT_LARGE_RINGBUFFER_SIZE = 1024; + + private Disruptor<RegisteredEvent> registeredEventDisruptor; + private Disruptor<ReRegisteredEvent> reRegisteredEventDisruptor; + private Disruptor<ResourceOffersEvent> resourceOffersEventDisruptor; + private Disruptor<OfferRescindedEvent> offerRescindedEventDisruptor; + private Disruptor<StatusUpdateEvent> statusUpdateEventDisruptor; + private Disruptor<FrameworkMessageEvent> frameworkMessageEventDisruptor; + private Disruptor<DisconnectedEvent> disconnectedEventDisruptor; + private Disruptor<SlaveLostEvent> slaveLostEventDisruptor; + private Disruptor<ExecutorLostEvent> executorLostEventDisruptor; + private Disruptor<ErrorEvent> errorEventDisruptor; + + @SuppressWarnings("unchecked") + public void init(Injector injector) { + this.disruptorExecutors = Executors.newCachedThreadPool(); + + // todo: (kensipe) need to make ringsize configurable (overriding the defaults) + + + this.registeredEventDisruptor = new Disruptor<>(new RegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.registeredEventDisruptor.handleEventsWith(injector.getInstance(RegisteredEventHandler.class)); + this.registeredEventDisruptor.start(); + + this.reRegisteredEventDisruptor = new Disruptor<>(new ReRegisteredEventFactory(), DEFAULT_SMALL_RINGBUFFER_SIZE, disruptorExecutors); + this.reRegisteredEventDisruptor.handleEventsWith(injector.getInstance(ReRegisteredEventHandler.class)); + this.reRegisteredEventDisruptor.start(); + + + this.resourceOffersEventDisruptor = new Disruptor<>(new ResourceOffersEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.resourceOffersEventDisruptor.handleEventsWith(injector.getInstance(ResourceOffersEventHandler.class)); + this.resourceOffersEventDisruptor.start(); + + this.offerRescindedEventDisruptor = new Disruptor<>(new OfferRescindedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.offerRescindedEventDisruptor.handleEventsWith(injector.getInstance(OfferRescindedEventHandler.class)); + this.offerRescindedEventDisruptor.start(); + + this.statusUpdateEventDisruptor = new Disruptor<>(new StatusUpdateEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.statusUpdateEventDisruptor.handleEventsWith(injector.getInstance(StatusUpdateEventHandler.class)); + this.statusUpdateEventDisruptor.start(); + + this.frameworkMessageEventDisruptor = new Disruptor<>(new FrameworkMessageEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.frameworkMessageEventDisruptor.handleEventsWith(injector.getInstance(FrameworkMessageEventHandler.class)); + this.frameworkMessageEventDisruptor.start(); + + this.disconnectedEventDisruptor = new Disruptor<>(new DisconnectedEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.disconnectedEventDisruptor.handleEventsWith(injector.getInstance(DisconnectedEventHandler.class)); + this.disconnectedEventDisruptor.start(); + + this.slaveLostEventDisruptor = new Disruptor<>(new SlaveLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.slaveLostEventDisruptor.handleEventsWith(injector.getInstance(SlaveLostEventHandler.class)); + this.slaveLostEventDisruptor.start(); + + this.executorLostEventDisruptor = new Disruptor<>(new ExecutorLostEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.executorLostEventDisruptor.handleEventsWith(injector.getInstance(ExecutorLostEventHandler.class)); + this.executorLostEventDisruptor.start(); + + this.errorEventDisruptor = new Disruptor<>(new ErrorEventFactory(), DEFAULT_LARGE_RINGBUFFER_SIZE, disruptorExecutors); + this.errorEventDisruptor.handleEventsWith(injector.getInstance(ErrorEventHandler.class)); + this.errorEventDisruptor.start(); + } + + public Disruptor<RegisteredEvent> getRegisteredEventDisruptor() { + return registeredEventDisruptor; + } + + public Disruptor<ReRegisteredEvent> getReRegisteredEventDisruptor() { + return reRegisteredEventDisruptor; + } + + public Disruptor<ResourceOffersEvent> getResourceOffersEventDisruptor() { + return resourceOffersEventDisruptor; + } + + public Disruptor<OfferRescindedEvent> getOfferRescindedEventDisruptor() { + return offerRescindedEventDisruptor; + } + + public Disruptor<StatusUpdateEvent> getStatusUpdateEventDisruptor() { + return statusUpdateEventDisruptor; + } + + public Disruptor<FrameworkMessageEvent> getFrameworkMessageEventDisruptor() { + return frameworkMessageEventDisruptor; + } + + public Disruptor<DisconnectedEvent> getDisconnectedEventDisruptor() { + return disconnectedEventDisruptor; + } + + public Disruptor<SlaveLostEvent> getSlaveLostEventDisruptor() { + return slaveLostEventDisruptor; + } + + public Disruptor<ExecutorLostEvent> getExecutorLostEventDisruptor() { + return executorLostEventDisruptor; + } + + public Disruptor<ErrorEvent> getErrorEventDisruptor() { + return errorEventDisruptor; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java index a0ad9cb..8865640 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/Main.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -66,256 +66,234 @@ import java.util.HashSet; /** * Main entry point for myriad scheduler - * */ public class Main { - private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Main.class); - private MyriadWebServer webServer; - private ScheduledExecutorService terminatorService; + private MyriadWebServer webServer; + private ScheduledExecutorService terminatorService; - private ScheduledExecutorService rebalancerService; - private HealthCheckRegistry healthCheckRegistry; + private ScheduledExecutorService rebalancerService; + private HealthCheckRegistry healthCheckRegistry; - private static Injector injector; + private static Injector injector; - public static void initialize(Configuration hadoopConf, - AbstractYarnScheduler yarnScheduler, - RMContext rmContext, - InterceptorRegistry registry) throws Exception { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - MyriadConfiguration cfg = mapper.readValue( - Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"), - MyriadConfiguration.class); + public static void initialize(Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry registry) throws Exception { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + MyriadConfiguration cfg = mapper.readValue(Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"), MyriadConfiguration.class); - MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry); - MesosModule mesosModule = new MesosModule(); - injector = Guice.createInjector( - myriadModule, mesosModule, - new WebAppGuiceModule()); + MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry); + MesosModule mesosModule = new MesosModule(); + injector = Guice.createInjector(myriadModule, mesosModule, new WebAppGuiceModule()); - new Main().run(cfg); - } + new Main().run(cfg); + } + + // TODO (Kannan Rajah) Hack to get injector in unit test. + public static Injector getInjector() { + return injector; + } - // TODO (Kannan Rajah) Hack to get injector in unit test. - public static Injector getInjector() { - return injector; + public void run(MyriadConfiguration cfg) throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Bindings: " + injector.getAllBindings()); } - public void run(MyriadConfiguration cfg) throws Exception { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Bindings: " + injector.getAllBindings()); + JmxReporter.forRegistry(new MetricRegistry()).build().start(); + + initWebApp(injector); + initHealthChecks(injector); + initProfiles(injector); + validateNMInstances(injector); + initServiceConfigurations(cfg, injector); + initDisruptors(injector); + + initRebalancerService(cfg, injector); + initTerminatorService(injector); + startMesosDriver(injector); + startNMInstances(injector); + startJavaBasedTaskInstance(injector); + } + + + private void startMesosDriver(Injector injector) { + LOGGER.info("starting mesosDriver.."); + injector.getInstance(MyriadDriverManager.class).startDriver(); + LOGGER.info("started mesosDriver.."); + } + + /** + * Brings up the embedded jetty webserver for serving REST APIs. + * + * @param injector + */ + private void initWebApp(Injector injector) throws Exception { + webServer = injector.getInstance(MyriadWebServer.class); + webServer.start(); + } + + /** + * Initializes health checks. + * + * @param injector + */ + private void initHealthChecks(Injector injector) { + LOGGER.info("Initializing HealthChecks"); + healthCheckRegistry = new HealthCheckRegistry(); + healthCheckRegistry.register(MesosMasterHealthCheck.NAME, injector.getInstance(MesosMasterHealthCheck.class)); + healthCheckRegistry.register(ZookeeperHealthCheck.NAME, injector.getInstance(ZookeeperHealthCheck.class)); + healthCheckRegistry.register(MesosDriverHealthCheck.NAME, injector.getInstance(MesosDriverHealthCheck.class)); + } + + private void initProfiles(Injector injector) { + LOGGER.info("Initializing Profiles"); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); + taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); + Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles(); + TaskUtils taskUtils = injector.getInstance(TaskUtils.class); + if (MapUtils.isNotEmpty(profiles)) { + for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) { + Map<String, String> profileResourceMap = profile.getValue(); + if (MapUtils.isNotEmpty(profiles) && profileResourceMap.containsKey("cpu") && profileResourceMap.containsKey("mem")) { + Long cpu = Long.parseLong(profileResourceMap.get("cpu")); + Long mem = Long.parseLong(profileResourceMap.get("mem")); + + ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); + serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus()); + serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory()); + + profileManager.add(serviceProfile); + } else { + LOGGER.error("Invalid definition for profile: " + profile.getKey()); } - - JmxReporter.forRegistry(new MetricRegistry()).build().start(); - - initWebApp(injector); - initHealthChecks(injector); - initProfiles(injector); - validateNMInstances(injector); - initServiceConfigurations(cfg, injector); - initDisruptors(injector); - - initRebalancerService(cfg, injector); - initTerminatorService(injector); - startMesosDriver(injector); - startNMInstances(injector); - startJavaBasedTaskInstance(injector); + } } - - - private void startMesosDriver(Injector injector) { - LOGGER.info("starting mesosDriver.."); - injector.getInstance(MyriadDriverManager.class).startDriver(); - LOGGER.info("started mesosDriver.."); + } + + private void validateNMInstances(Injector injector) { + LOGGER.info("Validating nmInstances.."); + Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + + long maxCpu = Long.MIN_VALUE; + long maxMem = Long.MIN_VALUE; + for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { + String profile = entry.getKey(); + ServiceResourceProfile nodeManager = profileManager.get(profile); + if (nodeManager == null) { + throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'"); + } + if (entry.getValue() > 0) { + if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus + maxCpu = nodeManager.getCpus().longValue(); + maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile + } + } } - - /** - * Brings up the embedded jetty webserver for serving REST APIs. - * - * @param injector - */ - private void initWebApp(Injector injector) throws Exception { - webServer = injector.getInstance(MyriadWebServer.class); - webServer.start(); + if (maxCpu <= 0 || maxMem <= 0) { + throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile " + "with non-zero cpu/mem resources."); } - - /** - * Initializes health checks. - * - * @param injector - */ - private void initHealthChecks(Injector injector) { - LOGGER.info("Initializing HealthChecks"); - healthCheckRegistry = new HealthCheckRegistry(); - healthCheckRegistry.register(MesosMasterHealthCheck.NAME, - injector.getInstance(MesosMasterHealthCheck.class)); - healthCheckRegistry.register(ZookeeperHealthCheck.NAME, - injector.getInstance(ZookeeperHealthCheck.class)); - healthCheckRegistry.register(MesosDriverHealthCheck.NAME, - injector.getInstance(MesosDriverHealthCheck.class)); + } + + private void startNMInstances(Injector injector) { + Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); + MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + SchedulerState schedulerState = injector.getInstance(SchedulerState.class); + + Set<NodeTask> launchedNMTasks = new HashSet<>(); + launchedNMTasks.addAll(schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size()); + return; } - private void initProfiles(Injector injector) { - LOGGER.info("Initializing Profiles"); - ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); - taskConstraintsManager.addTaskConstraints(NodeManagerConfiguration.NM_TASK_PREFIX, new TaskFactory.NMTaskConstraints()); - Map<String, Map<String, String>> profiles = injector.getInstance(MyriadConfiguration.class).getProfiles(); - TaskUtils taskUtils = injector.getInstance(TaskUtils.class); - if (MapUtils.isNotEmpty(profiles)) { - for (Map.Entry<String, Map<String, String>> profile : profiles.entrySet()) { - Map<String, String> profileResourceMap = profile.getValue(); - if (MapUtils.isNotEmpty(profiles) - && profileResourceMap.containsKey("cpu") - && profileResourceMap.containsKey("mem")) { - Long cpu = Long.parseLong(profileResourceMap.get("cpu")); - Long mem = Long.parseLong(profileResourceMap.get("mem")); - - ServiceResourceProfile serviceProfile = new ExtendedResourceProfile(new NMProfile(profile.getKey(), cpu, mem), - taskUtils.getNodeManagerCpus(), taskUtils.getNodeManagerMemory()); - serviceProfile.setExecutorCpu(taskUtils.getExecutorCpus()); - serviceProfile.setExecutorMemory(taskUtils.getExecutorMemory()); - - profileManager.add(serviceProfile); - } else { - LOGGER.error("Invalid definition for profile: " + profile.getKey()); - } - } - } + launchedNMTasks.addAll(schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size()); + return; } - private void validateNMInstances(Injector injector) { - LOGGER.info("Validating nmInstances.."); - Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); - ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - - long maxCpu = Long.MIN_VALUE; - long maxMem = Long.MIN_VALUE; - for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { - String profile = entry.getKey(); - ServiceResourceProfile nodeManager = profileManager.get(profile); - if (nodeManager == null) { - throw new RuntimeException("Invalid profile name '" + profile + "' specified in 'nmInstances'"); - } - if (entry.getValue() > 0) { - if (nodeManager.getCpus() > maxCpu) { // find the profile with largest number of cpus - maxCpu = nodeManager.getCpus().longValue(); - maxMem = nodeManager.getMemory().longValue(); // use the memory from the same profile - } - } - } - if (maxCpu <= 0 || maxMem <= 0) { - throw new RuntimeException("Please configure 'nmInstances' with at least one instance/profile " - + "with non-zero cpu/mem resources."); - } + launchedNMTasks.addAll(schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); + if (!launchedNMTasks.isEmpty()) { + LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size()); + return; } - private void startNMInstances(Injector injector) { - Map<String, Integer> nmInstances = injector.getInstance(MyriadConfiguration.class).getNmInstances(); - MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); - ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - SchedulerState schedulerState = injector.getInstance(SchedulerState.class); - - Set<NodeTask> launchedNMTasks = new HashSet<>(); - launchedNMTasks.addAll( - schedulerState.getPendingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); - if (!launchedNMTasks.isEmpty()) { - LOGGER.info("{} NM(s) in pending state. Not launching additional NMs", launchedNMTasks.size()); - return; - } - - launchedNMTasks.addAll( - schedulerState.getStagingTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); - if (!launchedNMTasks.isEmpty()) { - LOGGER.info("{} NM(s) in staging state. Not launching additional NMs", launchedNMTasks.size()); - return; - } - - launchedNMTasks.addAll( - schedulerState.getActiveTasksByType(NodeManagerConfiguration.NM_TASK_PREFIX)); - if (!launchedNMTasks.isEmpty()) { - LOGGER.info("{} NM(s) in active state. Not launching additional NMs", launchedNMTasks.size()); - return; - } - - for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { - LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), entry.getKey()); - myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null); - } + for (Map.Entry<String, Integer> entry : nmInstances.entrySet()) { + LOGGER.info("Launching {} NM(s) with profile {}", entry.getValue(), entry.getKey()); + myriadOperations.flexUpCluster(profileManager.get(entry.getKey()), entry.getValue(), null); } - - /** - * Create ServiceProfile for any configured service - * @param cfg - * @param injector - */ - private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) { - LOGGER.info("Initializing initServiceConfigurations"); - ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); - TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); - - Map<String, ServiceConfiguration> servicesConfigs = - injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); - if (servicesConfigs != null) { - for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) { - final String taskPrefix = entry.getKey(); - ServiceConfiguration config = entry.getValue(); - final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU); - final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); - - profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); - taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); - } + } + + /** + * Create ServiceProfile for any configured service + * + * @param cfg + * @param injector + */ + private void initServiceConfigurations(MyriadConfiguration cfg, Injector injector) { + LOGGER.info("Initializing initServiceConfigurations"); + ServiceProfileManager profileManager = injector.getInstance(ServiceProfileManager.class); + TaskConstraintsManager taskConstraintsManager = injector.getInstance(TaskConstraintsManager.class); + + Map<String, ServiceConfiguration> servicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); + if (servicesConfigs != null) { + for (Map.Entry<String, ServiceConfiguration> entry : servicesConfigs.entrySet()) { + final String taskPrefix = entry.getKey(); + ServiceConfiguration config = entry.getValue(); + final Double cpu = config.getCpus().or(ServiceConfiguration.DEFAULT_CPU); + final Double mem = config.getJvmMaxMemoryMB().or(ServiceConfiguration.DEFAULT_MEMORY); + + profileManager.add(new ServiceResourceProfile(taskPrefix, cpu, mem)); + taskConstraintsManager.addTaskConstraints(taskPrefix, new ServiceTaskConstraints(cfg, taskPrefix)); } } - - private void initTerminatorService(Injector injector) { - LOGGER.info("Initializing Terminator"); - terminatorService = Executors.newScheduledThreadPool(1); - final int initialDelay = 100; - final int period = 2000; - terminatorService.scheduleAtFixedRate( - injector.getInstance(TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS); + } + + private void initTerminatorService(Injector injector) { + LOGGER.info("Initializing Terminator"); + terminatorService = Executors.newScheduledThreadPool(1); + final int initialDelay = 100; + final int period = 2000; + terminatorService.scheduleAtFixedRate(injector.getInstance(TaskTerminator.class), initialDelay, period, TimeUnit.MILLISECONDS); + } + + private void initRebalancerService(MyriadConfiguration cfg, Injector injector) { + if (cfg.isRebalancer()) { + LOGGER.info("Initializing Rebalancer"); + rebalancerService = Executors.newScheduledThreadPool(1); + final int initialDelay = 100; + final int period = 5000; + rebalancerService.scheduleAtFixedRate(injector.getInstance(Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS); + } else { + LOGGER.info("Rebalancer is not turned on"); } - - private void initRebalancerService(MyriadConfiguration cfg, - Injector injector) { - if (cfg.isRebalancer()) { - LOGGER.info("Initializing Rebalancer"); - rebalancerService = Executors.newScheduledThreadPool(1); - final int initialDelay = 100; - final int period = 5000; - rebalancerService.scheduleAtFixedRate( - injector.getInstance(Rebalancer.class), initialDelay, period, TimeUnit.MILLISECONDS); - } else { - LOGGER.info("Rebalancer is not turned on"); + } + + private void initDisruptors(Injector injector) { + LOGGER.info("Initializing Disruptors"); + DisruptorManager disruptorManager = injector.getInstance(DisruptorManager.class); + disruptorManager.init(injector); + } + + /** + * Start tasks for configured services + * + * @param injector + */ + private void startJavaBasedTaskInstance(Injector injector) { + Map<String, ServiceConfiguration> auxServicesConfigs = injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); + if (auxServicesConfigs != null) { + MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); + for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { + try { + myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey()); + } catch (MyriadBadConfigurationException e) { + LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e); } - } - - private void initDisruptors(Injector injector) { - LOGGER.info("Initializing Disruptors"); - DisruptorManager disruptorManager = injector - .getInstance(DisruptorManager.class); - disruptorManager.init(injector); - } - - /** - * Start tasks for configured services - * @param injector - */ - private void startJavaBasedTaskInstance(Injector injector) { - Map<String, ServiceConfiguration> auxServicesConfigs = - injector.getInstance(MyriadConfiguration.class).getServiceConfigurations(); - if (auxServicesConfigs != null) { - MyriadOperations myriadOperations = injector.getInstance(MyriadOperations.class); - for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { - try { - myriadOperations.flexUpAService(entry.getValue().getMaxInstances().or(1), entry.getKey()); - } catch (MyriadBadConfigurationException e) { - LOGGER.warn("Exception while trying to flexup service: {}", entry.getKey(), e); - } - } } } + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java index dc81276..d8f28c5 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MesosModule.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -49,8 +49,7 @@ import com.google.protobuf.ByteString; * Guice Module for Mesos objects. */ public class MesosModule extends AbstractModule { - private static final Logger LOGGER = LoggerFactory.getLogger( - MesosModule.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MesosModule.class); public MesosModule() { } @@ -62,15 +61,9 @@ public class MesosModule extends AbstractModule { @Provides @Singleton - SchedulerDriver providesSchedulerDriver( - MyriadScheduler scheduler, - MyriadConfiguration cfg, - SchedulerState schedulerState) { + SchedulerDriver providesSchedulerDriver(MyriadScheduler scheduler, MyriadConfiguration cfg, SchedulerState schedulerState) { - Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("") - .setName(cfg.getFrameworkName()) - .setCheckpoint(cfg.isCheckpoint()) - .setFailoverTimeout(cfg.getFrameworkFailoverTimeout()); + Builder frameworkInfoBuilder = FrameworkInfo.newBuilder().setUser("").setName(cfg.getFrameworkName()).setCheckpoint(cfg.isCheckpoint()).setFailoverTimeout(cfg.getFrameworkFailoverTimeout()); if (StringUtils.isNotEmpty(cfg.getFrameworkRole())) { frameworkInfoBuilder.setRole(cfg.getFrameworkRole()); @@ -90,9 +83,8 @@ public class MesosModule extends AbstractModule { Credential.Builder credentialBuilder = Credential.newBuilder(); credentialBuilder.setPrincipal(mesosAuthenticationPrincipal); if (StringUtils.isNotEmpty(mesosAuthenticationSecretFilename)) { - try { - credentialBuilder.setSecret(ByteString.readFrom( - new FileInputStream(mesosAuthenticationSecretFilename))); + try { + credentialBuilder.setSecret(ByteString.readFrom(new FileInputStream(mesosAuthenticationSecretFilename))); } catch (FileNotFoundException ex) { LOGGER.error("Mesos authentication secret file was not found", ex); throw new RuntimeException(ex); @@ -101,21 +93,15 @@ public class MesosModule extends AbstractModule { throw new RuntimeException(ex); } } - return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), - cfg.getMesosMaster(), credentialBuilder.build()); + return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster(), credentialBuilder.build()); } else { - return new MesosSchedulerDriver(scheduler, - frameworkInfoBuilder.build(), cfg.getMesosMaster()); + return new MesosSchedulerDriver(scheduler, frameworkInfoBuilder.build(), cfg.getMesosMaster()); } } @Provides @Singleton State providesStateStore(MyriadConfiguration cfg) { - return new ZooKeeperState( - cfg.getZkServers(), - cfg.getZkTimeout(), - TimeUnit.MILLISECONDS, - "/myriad/" + cfg.getFrameworkName()); + return new ZooKeeperState(cfg.getZkServers(), cfg.getZkTimeout(), TimeUnit.MILLISECONDS, "/myriad/" + cfg.getFrameworkName()); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java index 3632334..b28164c 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/MyriadModule.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -62,111 +62,104 @@ import java.util.Map; * Guice Module for Myriad */ public class MyriadModule extends AbstractModule { - private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MyriadModule.class); - private MyriadConfiguration cfg; - private Configuration hadoopConf; - private AbstractYarnScheduler yarnScheduler; - private final RMContext rmContext; - private InterceptorRegistry interceptorRegistry; + private MyriadConfiguration cfg; + private Configuration hadoopConf; + private AbstractYarnScheduler yarnScheduler; + private final RMContext rmContext; + private InterceptorRegistry interceptorRegistry; - public MyriadModule(MyriadConfiguration cfg, - Configuration hadoopConf, - AbstractYarnScheduler yarnScheduler, - RMContext rmContext, - InterceptorRegistry interceptorRegistry) { - this.cfg = cfg; - this.hadoopConf = hadoopConf; - this.yarnScheduler = yarnScheduler; - this.rmContext = rmContext; - this.interceptorRegistry = interceptorRegistry; - } + public MyriadModule(MyriadConfiguration cfg, Configuration hadoopConf, AbstractYarnScheduler yarnScheduler, RMContext rmContext, InterceptorRegistry interceptorRegistry) { + this.cfg = cfg; + this.hadoopConf = hadoopConf; + this.yarnScheduler = yarnScheduler; + this.rmContext = rmContext; + this.interceptorRegistry = interceptorRegistry; + } - @Override - protected void configure() { - LOGGER.debug("Configuring guice"); - bind(MyriadConfiguration.class).toInstance(cfg); - bind(Configuration.class).toInstance(hadoopConf); - bind(RMContext.class).toInstance(rmContext); - bind(AbstractYarnScheduler.class).toInstance(yarnScheduler); - bind(InterceptorRegistry.class).toInstance(interceptorRegistry); - bind(MyriadDriverManager.class).in(Scopes.SINGLETON); - bind(MyriadScheduler.class).in(Scopes.SINGLETON); - bind(ServiceProfileManager.class).in(Scopes.SINGLETON); - bind(DisruptorManager.class).in(Scopes.SINGLETON); - bind(ReconcileService.class).in(Scopes.SINGLETON); - bind(HttpConnectorProvider.class).in(Scopes.SINGLETON); - bind(TaskConstraintsManager.class).in(Scopes.SINGLETON); - // add special binding between TaskFactory and NMTaskFactoryImpl to ease up - // usage of TaskFactory - bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class); - bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON); - bind(NodeStore.class).in(Scopes.SINGLETON); - bind(OfferLifecycleManager.class).in(Scopes.SINGLETON); - bind(NMHeartBeatHandler.class).asEagerSingleton(); + @Override + protected void configure() { + LOGGER.debug("Configuring guice"); + bind(MyriadConfiguration.class).toInstance(cfg); + bind(Configuration.class).toInstance(hadoopConf); + bind(RMContext.class).toInstance(rmContext); + bind(AbstractYarnScheduler.class).toInstance(yarnScheduler); + bind(InterceptorRegistry.class).toInstance(interceptorRegistry); + bind(MyriadDriverManager.class).in(Scopes.SINGLETON); + bind(MyriadScheduler.class).in(Scopes.SINGLETON); + bind(ServiceProfileManager.class).in(Scopes.SINGLETON); + bind(DisruptorManager.class).in(Scopes.SINGLETON); + bind(ReconcileService.class).in(Scopes.SINGLETON); + bind(HttpConnectorProvider.class).in(Scopes.SINGLETON); + bind(TaskConstraintsManager.class).in(Scopes.SINGLETON); + // add special binding between TaskFactory and NMTaskFactoryImpl to ease up + // usage of TaskFactory + bind(TaskFactory.class).annotatedWith(NMTaskFactoryAnnotation.class).to(NMTaskFactoryImpl.class); + bind(YarnNodeCapacityManager.class).in(Scopes.SINGLETON); + bind(NodeStore.class).in(Scopes.SINGLETON); + bind(OfferLifecycleManager.class).in(Scopes.SINGLETON); + bind(NMHeartBeatHandler.class).asEagerSingleton(); - MapBinder<String, TaskFactory> mapBinder - = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); - mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON); - Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); - if (auxServicesConfigs != null) { - for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { - String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull(); - if (taskFactoryClass != null) { - try { - Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass); - mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON); - } catch (ClassNotFoundException e) { - LOGGER.error("ClassNotFoundException", e); - } - } else { - mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); - } + MapBinder<String, TaskFactory> mapBinder = MapBinder.newMapBinder(binder(), String.class, TaskFactory.class); + mapBinder.addBinding(NodeManagerConfiguration.NM_TASK_PREFIX).to(NMTaskFactoryImpl.class).in(Scopes.SINGLETON); + Map<String, ServiceConfiguration> auxServicesConfigs = cfg.getServiceConfigurations(); + if (auxServicesConfigs != null) { + for (Map.Entry<String, ServiceConfiguration> entry : auxServicesConfigs.entrySet()) { + String taskFactoryClass = entry.getValue().getTaskFactoryImplName().orNull(); + if (taskFactoryClass != null) { + try { + Class<? extends TaskFactory> implClass = (Class<? extends TaskFactory>) Class.forName(taskFactoryClass); + mapBinder.addBinding(entry.getKey()).to(implClass).in(Scopes.SINGLETON); + } catch (ClassNotFoundException e) { + LOGGER.error("ClassNotFoundException", e); } + } else { + mapBinder.addBinding(entry.getKey()).to(ServiceTaskFactoryImpl.class).in(Scopes.SINGLETON); } - //TODO(Santosh): Should be configurable as well - bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON); + } } + //TODO(Santosh): Should be configurable as well + bind(NodeScaleDownPolicy.class).to(LeastAMNodesFirstPolicy.class).in(Scopes.SINGLETON); + } - @Provides - @Singleton - SchedulerState providesSchedulerState(MyriadConfiguration cfg) { - LOGGER.debug("Configuring SchedulerState provider"); - MyriadStateStore myriadStateStore = null; - if (cfg.isHAEnabled()) { - myriadStateStore = providesMyriadStateStore(); - if (myriadStateStore == null) { - throw new RuntimeException("Could not find a state store" + - " implementation for Myriad. The 'yarn.resourcemanager.store.class'" + - " property should be set to a class implementing the" + - " MyriadStateStore interface. For e.g." + - " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore"); - } - } - return new SchedulerState(myriadStateStore); + @Provides + @Singleton + SchedulerState providesSchedulerState(MyriadConfiguration cfg) { + LOGGER.debug("Configuring SchedulerState provider"); + MyriadStateStore myriadStateStore = null; + if (cfg.isHAEnabled()) { + myriadStateStore = providesMyriadStateStore(); + if (myriadStateStore == null) { + throw new RuntimeException("Could not find a state store" + + " implementation for Myriad. The 'yarn.resourcemanager.store.class'" + + " property should be set to a class implementing the" + + " MyriadStateStore interface. For e.g." + + " org.apache.hadoop.yarn.server.resourcemanager.recovery.MyriadFileSystemRMStateStore"); + } } + return new SchedulerState(myriadStateStore); + } - private MyriadStateStore providesMyriadStateStore() { - // TODO (sdaingade) Read the implementation class from yml - // once multiple implementations are available. - if (rmContext.getStateStore() instanceof MyriadStateStore) { - return (MyriadStateStore) rmContext.getStateStore(); - } - return null; + private MyriadStateStore providesMyriadStateStore() { + // TODO (sdaingade) Read the implementation class from yml + // once multiple implementations are available. + if (rmContext.getStateStore() instanceof MyriadStateStore) { + return (MyriadStateStore) rmContext.getStateStore(); } + return null; + } - @Provides - @Singleton - ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) { - ExecutorCommandLineGenerator cliGenerator = null; - MyriadExecutorConfiguration myriadExecutorConfiguration = - cfg.getMyriadExecutorConfiguration(); - if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { - cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, - myriadExecutorConfiguration.getNodeManagerUri().get()); - } else { - cliGenerator = new NMExecutorCLGenImpl(cfg); - } - return cliGenerator; - } + @Provides + @Singleton + ExecutorCommandLineGenerator providesCLIGenerator(MyriadConfiguration cfg) { + ExecutorCommandLineGenerator cliGenerator = null; + MyriadExecutorConfiguration myriadExecutorConfiguration = cfg.getMyriadExecutorConfiguration(); + if (myriadExecutorConfiguration.getNodeManagerUri().isPresent()) { + cliGenerator = new DownloadNMExecutorCLGenImpl(cfg, myriadExecutorConfiguration.getNodeManagerUri().get()); + } else { + cliGenerator = new NMExecutorCLGenImpl(cfg); + } + return cliGenerator; + } }