http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEvent.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEvent.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEvent.java index f11912f..a7778dc 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEvent.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,32 +25,32 @@ import org.apache.mesos.SchedulerDriver; * mesos register event */ public class RegisteredEvent { - private SchedulerDriver driver; - private Protos.FrameworkID frameworkId; - private Protos.MasterInfo masterInfo; + private SchedulerDriver driver; + private Protos.FrameworkID frameworkId; + private Protos.MasterInfo masterInfo; - public SchedulerDriver getDriver() { - return driver; - } + public SchedulerDriver getDriver() { + return driver; + } - public void setDriver(SchedulerDriver driver) { - this.driver = driver; - } + public void setDriver(SchedulerDriver driver) { + this.driver = driver; + } - public Protos.FrameworkID getFrameworkId() { - return frameworkId; - } + public Protos.FrameworkID getFrameworkId() { + return frameworkId; + } - public void setFrameworkId(Protos.FrameworkID frameworkId) { - this.frameworkId = frameworkId; - } + public void setFrameworkId(Protos.FrameworkID frameworkId) { + this.frameworkId = frameworkId; + } - public Protos.MasterInfo getMasterInfo() { - return masterInfo; - } + public Protos.MasterInfo getMasterInfo() { + return masterInfo; + } - public void setMasterInfo(Protos.MasterInfo masterInfo) { - this.masterInfo = masterInfo; - } + public void setMasterInfo(Protos.MasterInfo masterInfo) { + this.masterInfo = masterInfo; + } }
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEventFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEventFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEventFactory.java index 6ca6a32..6fd5e05 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEventFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/RegisteredEventFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,9 +25,9 @@ import com.lmax.disruptor.EventFactory; */ public class RegisteredEventFactory implements EventFactory<RegisteredEvent> { - @Override - public RegisteredEvent newInstance() { - return new RegisteredEvent(); - } + @Override + public RegisteredEvent newInstance() { + return new RegisteredEvent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEvent.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEvent.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEvent.java index 957c68b..bbe8764 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEvent.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,22 +27,22 @@ import java.util.List; * resource offer event */ public class ResourceOffersEvent { - private SchedulerDriver driver; - private List<Protos.Offer> offers; + private SchedulerDriver driver; + private List<Protos.Offer> offers; - public SchedulerDriver getDriver() { - return driver; - } + public SchedulerDriver getDriver() { + return driver; + } - public void setDriver(SchedulerDriver driver) { - this.driver = driver; - } + public void setDriver(SchedulerDriver driver) { + this.driver = driver; + } - public List<Protos.Offer> getOffers() { - return offers; - } + public List<Protos.Offer> getOffers() { + return offers; + } - public void setOffers(List<Protos.Offer> offers) { - this.offers = offers; - } + public void setOffers(List<Protos.Offer> offers) { + this.offers = offers; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEventFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEventFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEventFactory.java index 255742f..ea259a2 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEventFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/ResourceOffersEventFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,11 +23,10 @@ import com.lmax.disruptor.EventFactory; /** * resource offer event factory */ -public class ResourceOffersEventFactory implements - EventFactory<ResourceOffersEvent> { +public class ResourceOffersEventFactory implements EventFactory<ResourceOffersEvent> { - @Override - public ResourceOffersEvent newInstance() { - return new ResourceOffersEvent(); - } + @Override + public ResourceOffersEvent newInstance() { + return new ResourceOffersEvent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEvent.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEvent.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEvent.java index 11721a5..dce19e9 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEvent.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,22 +25,22 @@ import org.apache.mesos.SchedulerDriver; * mesos slave lost event */ public class SlaveLostEvent { - private SchedulerDriver driver; - private Protos.SlaveID slaveId; + private SchedulerDriver driver; + private Protos.SlaveID slaveId; - public SchedulerDriver getDriver() { - return driver; - } + public SchedulerDriver getDriver() { + return driver; + } - public void setDriver(SchedulerDriver driver) { - this.driver = driver; - } + public void setDriver(SchedulerDriver driver) { + this.driver = driver; + } - public Protos.SlaveID getSlaveId() { - return slaveId; - } + public Protos.SlaveID getSlaveId() { + return slaveId; + } - public void setSlaveId(Protos.SlaveID slaveId) { - this.slaveId = slaveId; - } + public void setSlaveId(Protos.SlaveID slaveId) { + this.slaveId = slaveId; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEventFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEventFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEventFactory.java index 948bd38..fa17341 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEventFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/SlaveLostEventFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,9 +25,9 @@ import com.lmax.disruptor.EventFactory; */ public class SlaveLostEventFactory implements EventFactory<SlaveLostEvent> { - @Override - public SlaveLostEvent newInstance() { - return new SlaveLostEvent(); - } + @Override + public SlaveLostEvent newInstance() { + return new SlaveLostEvent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEvent.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEvent.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEvent.java index 8c2a6a8..bc6a790 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEvent.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,23 +25,23 @@ import org.apache.mesos.SchedulerDriver; * mesos status update event */ public class StatusUpdateEvent { - private SchedulerDriver driver; - private Protos.TaskStatus status; + private SchedulerDriver driver; + private Protos.TaskStatus status; - public SchedulerDriver getDriver() { - return driver; - } + public SchedulerDriver getDriver() { + return driver; + } - public void setDriver(SchedulerDriver driver) { - this.driver = driver; - } + public void setDriver(SchedulerDriver driver) { + this.driver = driver; + } - public Protos.TaskStatus getStatus() { - return status; - } + public Protos.TaskStatus getStatus() { + return status; + } - public void setStatus(Protos.TaskStatus status) { - this.status = status; - } + public void setStatus(Protos.TaskStatus status) { + this.status = status; + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEventFactory.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEventFactory.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEventFactory.java index 672ab0b..6540795 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEventFactory.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/StatusUpdateEventFactory.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,12 +23,11 @@ import com.lmax.disruptor.EventFactory; /** * mesos status update event */ -public class StatusUpdateEventFactory implements - EventFactory<StatusUpdateEvent> { +public class StatusUpdateEventFactory implements EventFactory<StatusUpdateEvent> { - @Override - public StatusUpdateEvent newInstance() { - return new StatusUpdateEvent(); - } + @Override + public StatusUpdateEvent newInstance() { + return new StatusUpdateEvent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/DisconnectedEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/DisconnectedEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/DisconnectedEventHandler.java index cbd501e..195532d 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/DisconnectedEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/DisconnectedEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,11 +27,11 @@ import org.slf4j.LoggerFactory; * handles and logs disconnected events */ public class DisconnectedEventHandler implements EventHandler<DisconnectedEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectedEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectedEventHandler.class); - @Override - public void onEvent(DisconnectedEvent event, long sequence, boolean endOfBatch) throws Exception { - LOGGER.info("Framework disconnected!"); - } + @Override + public void onEvent(DisconnectedEvent event, long sequence, boolean endOfBatch) throws Exception { + LOGGER.info("Framework disconnected!"); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ErrorEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ErrorEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ErrorEventHandler.java index 07bc212..14ef526 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ErrorEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ErrorEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,12 +27,12 @@ import org.slf4j.LoggerFactory; * handles and logs error events */ public class ErrorEventHandler implements EventHandler<ErrorEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(ErrorEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ErrorEventHandler.class); - @Override - public void onEvent(ErrorEvent event, long sequence, boolean endOfBatch) throws Exception { - String message = event.getMessage(); - LOGGER.error(message); - } + @Override + public void onEvent(ErrorEvent event, long sequence, boolean endOfBatch) throws Exception { + String message = event.getMessage(); + LOGGER.error(message); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java index 1ed311c..125a953 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ExecutorLostEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,15 +29,14 @@ import org.slf4j.LoggerFactory; * handles and logs executor lost events */ public class ExecutorLostEventHandler implements EventHandler<ExecutorLostEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorLostEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorLostEventHandler.class); - @Override - public void onEvent(ExecutorLostEvent event, long sequence, boolean endOfBatch) throws Exception { - ExecutorID executorId = event.getExecutorId(); - SlaveID slaveId = event.getSlaveId(); - int exitStatus = event.getExitStatus(); - LOGGER.info("Executor {} of slave {} lost with exit status: {}", - executorId, slaveId, exitStatus); - } + @Override + public void onEvent(ExecutorLostEvent event, long sequence, boolean endOfBatch) throws Exception { + ExecutorID executorId = event.getExecutorId(); + SlaveID slaveId = event.getSlaveId(); + int exitStatus = event.getExitStatus(); + LOGGER.info("Executor {} of slave {} lost with exit status: {}", executorId, slaveId, exitStatus); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java index d3c693e..4ae2e1b 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/FrameworkMessageEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -29,14 +29,13 @@ import org.slf4j.LoggerFactory; * handles and logs mesos framework messages */ public class FrameworkMessageEventHandler implements EventHandler<FrameworkMessageEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(FrameworkMessageEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FrameworkMessageEventHandler.class); - @Override - public void onEvent(FrameworkMessageEvent event, long sequence, boolean endOfBatch) throws Exception { - ExecutorID executorId = event.getExecutorId(); - SlaveID slaveId = event.getSlaveId(); - LOGGER.info("Received framework message from executor {} of slave {}", - executorId, slaveId); - } + @Override + public void onEvent(FrameworkMessageEvent event, long sequence, boolean endOfBatch) throws Exception { + ExecutorID executorId = event.getExecutorId(); + SlaveID slaveId = event.getSlaveId(); + LOGGER.info("Received framework message from executor {} of slave {}", executorId, slaveId); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java index 2f93d41..fc95a9c 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/OfferRescindedEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,11 +27,11 @@ import org.slf4j.LoggerFactory; * handles and logs offer rescinded events */ public class OfferRescindedEventHandler implements EventHandler<OfferRescindedEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(OfferRescindedEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(OfferRescindedEventHandler.class); - @Override - public void onEvent(OfferRescindedEvent event, long sequence, boolean endOfBatch) throws Exception { - LOGGER.info("OfferRescinded event: {}", event); - } + @Override + public void onEvent(OfferRescindedEvent event, long sequence, boolean endOfBatch) throws Exception { + LOGGER.info("OfferRescinded event: {}", event); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java index 3d79b6d..34c3f7f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ReRegisteredEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,17 +30,17 @@ import org.slf4j.LoggerFactory; * handles and logs mesos re-register events */ public class ReRegisteredEventHandler implements EventHandler<ReRegisteredEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(ReRegisteredEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ReRegisteredEventHandler.class); - @Inject - private SchedulerState state; + @Inject + private SchedulerState state; - @Inject - private ReconcileService reconcileService; + @Inject + private ReconcileService reconcileService; - @Override - public void onEvent(ReRegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { - LOGGER.info("Framework re-registered: {}", event); - reconcileService.reconcile(event.getDriver()); - } + @Override + public void onEvent(ReRegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { + LOGGER.info("Framework re-registered: {}", event); + reconcileService.reconcile(event.getDriver()); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java index 066e065..594d7d4 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/RegisteredEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -31,19 +31,19 @@ import javax.inject.Inject; * handles and logs mesos registered events */ public class RegisteredEventHandler implements EventHandler<RegisteredEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(RegisteredEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RegisteredEventHandler.class); - @Inject - private SchedulerState schedulerState; + @Inject + private SchedulerState schedulerState; - @Inject - private ReconcileService reconcileService; + @Inject + private ReconcileService reconcileService; - @Override - public void onEvent(RegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { - LOGGER.info("Received event: {} with frameworkId: {}", event, event.getFrameworkId()); - schedulerState.setFrameworkId(event.getFrameworkId()); - reconcileService.reconcile(event.getDriver()); - } + @Override + public void onEvent(RegisteredEvent event, long sequence, boolean endOfBatch) throws Exception { + LOGGER.info("Received event: {} with frameworkId: {}", event, event.getFrameworkId()); + schedulerState.setFrameworkId(event.getFrameworkId()); + reconcileService.reconcile(event.getDriver()); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 3abf9ab..a961cfe 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -82,13 +82,12 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv @Inject private OfferLifecycleManager offerLifecycleMgr; - + @Inject private TaskConstraintsManager taskConstraintsManager; @Override - public void onEvent(ResourceOffersEvent event, long sequence, - boolean endOfBatch) throws Exception { + public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception { SchedulerDriver driver = event.getDriver(); List<Offer> offers = event.getOffers(); @@ -106,7 +105,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds()); driverOperationLock.lock(); try { - for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext();) { + for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) { Offer offer = iterator.next(); Set<NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId()); for (NodeTask nodeTask : nodeTasks) { @@ -119,8 +118,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); if (CollectionUtils.isNotEmpty(pendingTasks)) { for (Protos.TaskID pendingTaskId : pendingTasks) { - NodeTask taskToLaunch = schedulerState - .getTask(pendingTaskId); + NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId); if (taskToLaunch == null) { missingTasks.add(pendingTaskId); LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId); @@ -134,12 +132,9 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv launchedTasks.addAll(schedulerState.getActiveTasksByType(taskPrefix)); launchedTasks.addAll(schedulerState.getStagingTasksByType(taskPrefix)); - if (matches(offer, taskToLaunch, constraint) - && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { + if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) { try { - final TaskInfo task = - taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, - taskToLaunch); + final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID(), pendingTaskId, taskToLaunch); List<OfferID> offerIds = new ArrayList<>(); offerIds.add(offer.getId()); List<TaskInfo> tasks = new ArrayList<>(); @@ -148,7 +143,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer); driver.launchTasks(offerIds, tasks); schedulerState.makeTaskStaging(pendingTaskId); - + // For every NM Task that we launch, we currently // need to backup the ExecutorInfo for that NM Task in the State Store. // Without this, we will not be able to launch tasks corresponding to yarn @@ -173,8 +168,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv for (Offer offer : offers) { if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", - offer.getHostname()); + LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", offer.getHostname()); } offerLifecycleMgr.addOffers(offer); } else { @@ -204,8 +198,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv if (resourceEvaluators.containsKey(resource.getName())) { resourceEvaluators.get(resource.getName()).eval(resource, results); } else { - LOGGER.warn("Ignoring unknown resource type: {}", - resource.getName()); + LOGGER.warn("Ignoring unknown resource type: {}", resource.getName()); } } double cpus = (Double) results.get(RESOURCES_CPU_KEY); @@ -219,28 +212,24 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv return checkAggregates(offer, taskToLaunch, ports, cpus, mem); } - private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) { - final ServiceResourceProfile profile = taskToLaunch.getProfile(); - final String taskPrefix = taskToLaunch.getTaskPrefix(); - final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu(); - final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory(); - final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); - if (aggrCpu <= cpus - && aggrMem <= mem - && taskConstraints.portsCount() <= ports) { - return true; - } else { - LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", - aggrCpu, aggrMem, ports); - return false; - } + private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) { + final ServiceResourceProfile profile = taskToLaunch.getProfile(); + final String taskPrefix = taskToLaunch.getTaskPrefix(); + final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu(); + final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory(); + final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix); + if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) { + return true; + } else { + LOGGER.info("Offer not sufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports); + return false; } + } private boolean meetsConstraint(Offer offer, Constraint constraint) { if (constraint != null) { switch (constraint.getType()) { - case LIKE: - { + case LIKE: { LikeConstraint likeConstraint = (LikeConstraint) constraint; if (likeConstraint.isConstraintOnHostName()) { return likeConstraint.matchesHostName(offer.getHostname()); @@ -248,8 +237,8 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv return likeConstraint.matchesSlaveAttributes(offer.getAttributesList()); } } - default: - return false; + default: + return false; } } return true; @@ -266,14 +255,13 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv if (resource.getType().equals(Value.Type.SCALAR)) { value = new Double(resource.getScalar().getValue()); } else { - LOGGER.error(id + " resource was not a scalar: {}", resource - .getType().toString()); + LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString()); } return value; } private interface EvalResources { - public void eval(Resource resource, Map<String, Object>results); + public void eval(Resource resource, Map<String, Object> results); } private static Map<String, EvalResources> resourceEvaluators; @@ -282,14 +270,12 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv resourceEvaluators = new HashMap<String, EvalResources>(4); resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() { public void eval(Resource resource, Map<String, Object> results) { - results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + - scalarToDouble(resource, RESOURCES_CPU_KEY)); + results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY)); } }); resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() { public void eval(Resource resource, Map<String, Object> results) { - results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + - scalarToDouble(resource, RESOURCES_MEM_KEY)); + results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY)); } }); resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() { @@ -307,12 +293,10 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } } } else { - LOGGER.error("ports resource was not Ranges: {}", resource - .getType().toString()); + LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString()); } - results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + - Integer.valueOf(ports)); + results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports)); } }); } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java index 099547f..bbd1292 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/SlaveLostEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -28,12 +28,12 @@ import org.slf4j.LoggerFactory; * handles and logs mesos slave lost events */ public class SlaveLostEventHandler implements EventHandler<SlaveLostEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(SlaveLostEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SlaveLostEventHandler.class); - @Override - public void onEvent(SlaveLostEvent event, long sequence, boolean endOfBatch) throws Exception { - SlaveID slaveId = event.getSlaveId(); - LOGGER.info("Slave {} lost!", slaveId); - } + @Override + public void onEvent(SlaveLostEvent event, long sequence, boolean endOfBatch) throws Exception { + SlaveID slaveId = event.getSlaveId(); + LOGGER.info("Slave {} lost!", slaveId); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java index 628f09e..34b8712 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,63 +37,61 @@ import org.slf4j.LoggerFactory; */ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> { - private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdateEventHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdateEventHandler.class); - private final SchedulerState schedulerState; - private final OfferLifecycleManager offerLifecycleManager; + private final SchedulerState schedulerState; + private final OfferLifecycleManager offerLifecycleManager; - @Inject - public StatusUpdateEventHandler(SchedulerState schedulerState, OfferLifecycleManager offerLifecycleManager) { - this.schedulerState = schedulerState; - this.offerLifecycleManager = offerLifecycleManager; - } + @Inject + public StatusUpdateEventHandler(SchedulerState schedulerState, OfferLifecycleManager offerLifecycleManager) { + this.schedulerState = schedulerState; + this.offerLifecycleManager = offerLifecycleManager; + } - @Override - public void onEvent(StatusUpdateEvent event, - long sequence, - boolean endOfBatch) throws Exception { - TaskStatus status = event.getStatus(); - this.schedulerState.updateTask(status); - TaskID taskId = status.getTaskId(); - NodeTask task = schedulerState.getTask(taskId); - if (task == null) { - LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), status.getState()); - schedulerState.removeTask(taskId); - return; - } - LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), status.getState()); - TaskState state = status.getState(); + @Override + public void onEvent(StatusUpdateEvent event, long sequence, boolean endOfBatch) throws Exception { + TaskStatus status = event.getStatus(); + this.schedulerState.updateTask(status); + TaskID taskId = status.getTaskId(); + NodeTask task = schedulerState.getTask(taskId); + if (task == null) { + LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), status.getState()); + schedulerState.removeTask(taskId); + return; + } + LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), status.getState()); + TaskState state = status.getState(); - switch (state) { - case TASK_STAGING: - schedulerState.makeTaskStaging(taskId); - break; - case TASK_STARTING: - schedulerState.makeTaskStaging(taskId); - break; - case TASK_RUNNING: - schedulerState.makeTaskActive(taskId); - break; - case TASK_FINISHED: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.removeTask(taskId); - break; - case TASK_FAILED: - // Add to pending tasks - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.makeTaskPending(taskId); - break; - case TASK_KILLED: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.removeTask(taskId); - break; - case TASK_LOST: - offerLifecycleManager.declineOutstandingOffers(task.getHostname()); - schedulerState.makeTaskPending(taskId); - break; - default: - LOGGER.error("Invalid state: {}", state); - break; - } + switch (state) { + case TASK_STAGING: + schedulerState.makeTaskStaging(taskId); + break; + case TASK_STARTING: + schedulerState.makeTaskStaging(taskId); + break; + case TASK_RUNNING: + schedulerState.makeTaskActive(taskId); + break; + case TASK_FINISHED: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.removeTask(taskId); + break; + case TASK_FAILED: + // Add to pending tasks + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.makeTaskPending(taskId); + break; + case TASK_KILLED: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.removeTask(taskId); + break; + case TASK_LOST: + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); + schedulerState.makeTaskPending(taskId); + break; + default: + LOGGER.error("Invalid state: {}", state); + break; } + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java index 4e8313c..a1e8c33 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/ConsumedOffer.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java index 44c4c39..87a727f 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -58,14 +58,8 @@ public class NMHeartBeatHandler extends BaseInterceptor { private final SchedulerState state; @Inject - public NMHeartBeatHandler( - InterceptorRegistry registry, - AbstractYarnScheduler yarnScheduler, - MyriadDriver myriadDriver, - YarnNodeCapacityManager yarnNodeCapacityMgr, - OfferLifecycleManager offerLifecycleMgr, - NodeStore nodeStore, - SchedulerState state) { + public NMHeartBeatHandler(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, MyriadDriver myriadDriver, YarnNodeCapacityManager yarnNodeCapacityMgr, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState + state) { if (registry != null) { registry.register(this); @@ -95,22 +89,19 @@ public class NMHeartBeatHandler extends BaseInterceptor { case STARTED: { RMNode rmNode = context.getRMNodes().get(event.getNodeId()); Resource totalCapability = rmNode.getTotalCapability(); - if (totalCapability.getMemory() != 0 || - totalCapability.getVirtualCores() != 0) { - logger.warn("FineGrainedScaling feature got invoked for a " + - "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)", - rmNode.getHostName(), - totalCapability.getMemory(), totalCapability.getVirtualCores()); + if (totalCapability.getMemory() != 0 || totalCapability.getVirtualCores() != 0) { + logger.warn("FineGrainedScaling feature got invoked for a " + "NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)", rmNode.getHostName(), totalCapability.getMemory(), totalCapability + .getVirtualCores()); totalCapability.setMemory(0); totalCapability.setVirtualCores(0); } } - break; + break; case STATUS_UPDATE: { handleStatusUpdate(event, context); } - break; + break; default: break; @@ -120,8 +111,7 @@ public class NMHeartBeatHandler extends BaseInterceptor { @VisibleForTesting protected void handleStatusUpdate(RMNodeEvent event, RMContext context) { if (!(event instanceof RMNodeStatusEvent)) { - logger.error("{} not an instance of {}", event.getClass().getName(), - RMNodeStatusEvent.class.getName()); + logger.error("{} not an instance of {}", event.getClass().getName(), RMNodeStatusEvent.class.getName()); return; } @@ -137,9 +127,7 @@ public class NMHeartBeatHandler extends BaseInterceptor { // New capacity of the node = // resources under use on the node (due to previous offers) + // new resources offered by mesos for the node - yarnNodeCapacityMgr.setNodeCapacity(rmNode, - Resources.add(getResourcesUnderUse(statusEvent), - getNewResourcesOfferedByMesos(hostName))); + yarnNodeCapacityMgr.setNodeCapacity(rmNode, Resources.add(getResourcesUnderUse(statusEvent), getNewResourcesOfferedByMesos(hostName))); } private Resource getNewResourcesOfferedByMesos(String hostname) { @@ -157,8 +145,7 @@ public class NMHeartBeatHandler extends BaseInterceptor { Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers); if (logger.isDebugEnabled()) { - logger.debug("NM on host {} got {} CPUs and {} memory from mesos", - hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory()); + logger.debug("NM on host {} got {} CPUs and {} memory from mesos", hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory()); } return fromMesosOffers; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java index 182241f..5d9e1e1 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/Node.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java index bf93b26..fd97e04 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NodeStore.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java index ad93d74..487c571 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferFeed.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.mesos.Protos; /** - * Feed of Mesos offers for a node. + * Feed of Mesos offers for a node. */ public class OfferFeed { private ConcurrentLinkedQueue<Protos.Offer> queue; http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java index cf89be4..387f532 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferLifecycleManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,8 +34,7 @@ import org.slf4j.LoggerFactory; * Manages the Mesos offers tracked by Myriad. */ public class OfferLifecycleManager { - private static final Logger LOGGER = LoggerFactory.getLogger( - OfferLifecycleManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(OfferLifecycleManager.class); private Map<String, OfferFeed> offerFeedMap; @@ -48,8 +47,7 @@ public class OfferLifecycleManager { private final MyriadDriver myriadDriver; @Inject - public OfferLifecycleManager(NodeStore nodeStore, - MyriadDriver myriadDriver) { + public OfferLifecycleManager(NodeStore nodeStore, MyriadDriver myriadDriver) { this.offerFeedMap = new ConcurrentHashMap<>(200, 0.75f, 50); this.consumedOfferMap = new HashMap<>(200, 0.75f); @@ -80,8 +78,7 @@ public class OfferLifecycleManager { node.setSlaveId(offer.getSlaveId()); - LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", - hostname, offer.getId().getValue()); + LOGGER.debug("addResourceOffers: caching offer for host {}, offer id {}", hostname, offer.getId().getValue()); } else { myriadDriver.getDriver().declineOffer(offer.getId()); LOGGER.debug("Declined offer for unregistered host {}", hostname); http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java index 4ae4d0d..ff36c06 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/OfferUtils.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java index ae28bda..936f381 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory; * Manages the capacity exposed by NodeManager. It uses the offers available * from Mesos to inflate the node capacity and lets ResourceManager make the * scheduling decision. After the scheduling decision is done, there are 2 cases: - * + * <p/> * 1. If ResourceManager did not use the expanded capacity, then the node's * capacity is reverted back to original value and the offer is declined. * 2. If ResourceManager ended up using the expanded capacity, then the node's @@ -65,145 +65,129 @@ import org.slf4j.LoggerFactory; * Mesos. */ public class YarnNodeCapacityManager extends BaseInterceptor { - private static final Logger LOGGER = LoggerFactory.getLogger( - YarnNodeCapacityManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class); - private final AbstractYarnScheduler yarnScheduler; - private final RMContext rmContext; - private final MyriadDriver myriadDriver; - private final OfferLifecycleManager offerLifecycleMgr; - private final NodeStore nodeStore; + private final AbstractYarnScheduler yarnScheduler; + private final RMContext rmContext; + private final MyriadDriver myriadDriver; + private final OfferLifecycleManager offerLifecycleMgr; + private final NodeStore nodeStore; private final SchedulerState state; - @Inject - public YarnNodeCapacityManager(InterceptorRegistry registry, - AbstractYarnScheduler yarnScheduler, - RMContext rmContext, - MyriadDriver myriadDriver, - OfferLifecycleManager offerLifecycleMgr, - NodeStore nodeStore, - SchedulerState state) { - if (registry != null) { - registry.register(this); - } - this.yarnScheduler = yarnScheduler; - this.rmContext = rmContext; - this.myriadDriver = myriadDriver; - this.offerLifecycleMgr = offerLifecycleMgr; - this.nodeStore = nodeStore; - this.state = state; - } - - @Override - public CallBackFilter getCallBackFilter() { - return new CallBackFilter() { - @Override - public boolean allowCallBacksForNode(NodeId nodeManager) { - return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); - } - }; + @Inject + public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext, MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore, SchedulerState state) { + if (registry != null) { + registry.register(this); } + this.yarnScheduler = yarnScheduler; + this.rmContext = rmContext; + this.myriadDriver = myriadDriver; + this.offerLifecycleMgr = offerLifecycleMgr; + this.nodeStore = nodeStore; + this.state = state; + } @Override - public void afterSchedulerEventHandled(SchedulerEvent event) { - switch (event.getType()) { - case NODE_ADDED: { - if (!(event instanceof NodeAddedSchedulerEvent)) { - LOGGER.error("{} not an instance of {}", - event.getClass().getName(), - NodeAddedSchedulerEvent.class.getName()); - return; - } - - NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; - NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID(); - String host = nodeId.getHost(); + public CallBackFilter getCallBackFilter() { + return new CallBackFilter() { + @Override + public boolean allowCallBacksForNode(NodeId nodeManager) { + return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state); + } + }; + } - SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId); - nodeStore.add(node); - LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host); - } - break; + @Override + public void afterSchedulerEventHandled(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: { + if (!(event instanceof NodeAddedSchedulerEvent)) { + LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeAddedSchedulerEvent.class.getName()); + return; + } - case NODE_UPDATE: { - if (!(event instanceof NodeUpdateSchedulerEvent)) { - LOGGER.error("{} not an instance of {}", event.getClass().getName(), - NodeUpdateSchedulerEvent.class.getName()); - return; - } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event; + NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID(); + String host = nodeId.getHost(); - RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode(); - handleContainerAllocation(rmNode); - } - break; + SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId); + nodeStore.add(node); + LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host); + } + break; - default: - break; + case NODE_UPDATE: { + if (!(event instanceof NodeUpdateSchedulerEvent)) { + LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeUpdateSchedulerEvent.class.getName()); + return; } - } - /** - * Checks if any containers were allocated in the current scheduler run and - * launches the corresponding Mesos tasks. It also udpates the node - * capacity depending on what portion of the consumed offers were actually - * used. - */ - @VisibleForTesting - protected void handleContainerAllocation(RMNode rmNode) { - String host = rmNode.getNodeID().getHost(); - - ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host); - if (consumedOffer == null) { - LOGGER.debug("No offer consumed for {}", host); - return; + RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode(); + handleContainerAllocation(rmNode); } + break; - Node node = nodeStore.getNode(host); - Set<RMContainer> containersBeforeSched = node.getContainerSnapshot(); - Set<RMContainer> containersAfterSched = new HashSet<>( - node.getNode().getRunningContainers()); + default: + break; + } + } - Set<RMContainer> containersAllocatedByMesosOffer = - (containersBeforeSched == null) - ? containersAfterSched - : Sets.difference(containersAfterSched, containersBeforeSched); + /** + * Checks if any containers were allocated in the current scheduler run and + * launches the corresponding Mesos tasks. It also udpates the node + * capacity depending on what portion of the consumed offers were actually + * used. + */ + @VisibleForTesting + protected void handleContainerAllocation(RMNode rmNode) { + String host = rmNode.getNodeID().getHost(); + + ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host); + if (consumedOffer == null) { + LOGGER.debug("No offer consumed for {}", host); + return; + } - if (containersAllocatedByMesosOffer.isEmpty()) { - LOGGER.debug("No containers allocated using Mesos offers for host: {}", host); - for (Protos.Offer offer : consumedOffer.getOffers()) { - offerLifecycleMgr.declineOffer(offer); - } - setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), - OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()))); - } else { - LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", - host, containersAllocatedByMesosOffer.size()); + Node node = nodeStore.getNode(host); + Set<RMContainer> containersBeforeSched = node.getContainerSnapshot(); + Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers()); - // Identify the Mesos tasks that need to be launched - List<Protos.TaskInfo> tasks = Lists.newArrayList(); - Resource resUsed = Resource.newInstance(0, 0); + Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(containersAfterSched, containersBeforeSched); - for (RMContainer newContainer : containersAllocatedByMesosOffer) { - tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node)); - resUsed = Resources.add(resUsed, newContainer.getAllocatedResource()); - } + if (containersAllocatedByMesosOffer.isEmpty()) { + LOGGER.debug("No containers allocated using Mesos offers for host: {}", host); + for (Protos.Offer offer : consumedOffer.getOffers()) { + offerLifecycleMgr.declineOffer(offer); + } + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()))); + } else { + LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size()); - // Reduce node capacity to account for unused offers - Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()); - Resource resUnused = Resources.subtract(resOffered, resUsed); - setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused)); + // Identify the Mesos tasks that need to be launched + List<Protos.TaskInfo> tasks = Lists.newArrayList(); + Resource resUsed = Resource.newInstance(0, 0); - myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks); + for (RMContainer newContainer : containersAllocatedByMesosOffer) { + tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node)); + resUsed = Resources.add(resUsed, newContainer.getAllocatedResource()); } - // No need to hold on to the snapshot anymore - node.removeContainerSnapshot(); + // Reduce node capacity to account for unused offers + Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()); + Resource resUnused = Resources.subtract(resOffered, resUsed); + setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), resUnused)); + + myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks); } + // No need to hold on to the snapshot anymore + node.removeContainerSnapshot(); + } + /** * 1. Updates {@link RMNode#getTotalCapability()} with newCapacity. * 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler. - * The scheduler updates the corresponding {@link SchedulerNode} with the newCapacity. + * The scheduler updates the corresponding {@link SchedulerNode} with the newCapacity. * * @param rmNode * @param newCapacity @@ -215,43 +199,25 @@ public class YarnNodeCapacityManager extends BaseInterceptor { LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity); // updates the scheduler with the new capacity for the NM. // the event is handled by the scheduler asynchronously - rmContext.getDispatcher().getEventHandler().handle( - new NodeResourceUpdateSchedulerEvent(rmNode, - ResourceOption.newInstance(rmNode.getTotalCapability(), - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); + rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT))); } - private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, - ConsumedOffer consumedOffer, Node node) { - - Protos.Offer offer = consumedOffer.getOffers().get(0); - Container container = rmContainer.getContainer(); - Protos.TaskID taskId = Protos.TaskID.newBuilder() - .setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); + private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) { - // TODO (sdaingade) Remove ExecutorInfo from the Node object - // as this is now cached in the NodeTask object in scheduler state. - Protos.ExecutorInfo executorInfo = node.getExecInfo(); - if (executorInfo == null) { - executorInfo = Protos.ExecutorInfo.newBuilder( - state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()) - .setFrameworkId(offer.getFrameworkId()).build(); - node.setExecInfo(executorInfo); - } + Protos.Offer offer = consumedOffer.getOffers().get(0); + Container container = rmContainer.getContainer(); + Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build(); - return Protos.TaskInfo.newBuilder() - .setName("task_" + taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources(Protos.Resource.newBuilder() - .setName("cpus") - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource().getVirtualCores()))) - .addResources(Protos.Resource.newBuilder() - .setName("mem") - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource().getMemory()))) - .setExecutor(executorInfo) - .build(); + // TODO (sdaingade) Remove ExecutorInfo from the Node object + // as this is now cached in the NodeTask object in scheduler state. + Protos.ExecutorInfo executorInfo = node.getExecInfo(); + if (executorInfo == null) { + executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX).getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build(); + node.setExecInfo(executorInfo); } + + return Protos.TaskInfo.newBuilder().setName("task_" + taskId.getValue()).setTaskId(taskId).setSlaveId(offer.getSlaveId()).addResources(Protos.Resource.newBuilder().setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar + .newBuilder().setValue(container.getResource().getVirtualCores()))).addResources(Protos.Resource.newBuilder().setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(container.getResource() + .getMemory()))).setExecutor(executorInfo).build(); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java index a6fd0b8..c480c09 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadCapacityScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java index 4aa23d0..5991be2 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFairScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -34,53 +34,53 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule */ public class MyriadFairScheduler extends FairScheduler { - private RMContext rmContext; - private YarnSchedulerInterceptor yarnSchedulerInterceptor; - private RMNodeEventHandler rmNodeEventHandler; - private Configuration conf; + private RMContext rmContext; + private YarnSchedulerInterceptor yarnSchedulerInterceptor; + private RMNodeEventHandler rmNodeEventHandler; + private Configuration conf; - public MyriadFairScheduler() { - super(); - } + public MyriadFairScheduler() { + super(); + } - /** - * Register an event handler that receives {@link RMNodeEvent} events. - * This event handler is registered ahead of RM's own event handler for RMNodeEvents. - * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by - * RM and the scheduler. - * - * @param rmContext - */ - @Override - public synchronized void setRMContext(RMContext rmContext) { - this.rmContext = rmContext; - this.yarnSchedulerInterceptor = new CompositeInterceptor(); - rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); - rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); - super.setRMContext(rmContext); - } + /** + * Register an event handler that receives {@link RMNodeEvent} events. + * This event handler is registered ahead of RM's own event handler for RMNodeEvents. + * For e.g. myriad can inspect a node's HB (RMNodeStatusEvent) before the HB is handled by + * RM and the scheduler. + * + * @param rmContext + */ + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + this.yarnSchedulerInterceptor = new CompositeInterceptor(); + rmNodeEventHandler = new RMNodeEventHandler(yarnSchedulerInterceptor, rmContext); + rmContext.getDispatcher().register(RMNodeEventType.class, rmNodeEventHandler); + super.setRMContext(rmContext); + } - /** - * ******** Methods overridden from YARN {@link FairScheduler} ********************* - */ + /** + * ******** Methods overridden from YARN {@link FairScheduler} ********************* + */ - @Override - public synchronized void serviceInit(Configuration conf) throws Exception { - this.conf = conf; - super.serviceInit(conf); - } + @Override + public synchronized void serviceInit(Configuration conf) throws Exception { + this.conf = conf; + super.serviceInit(conf); + } - @Override - public synchronized void serviceStart() throws Exception { - this.yarnSchedulerInterceptor.init(conf, this, rmContext); - super.serviceStart(); - } + @Override + public synchronized void serviceStart() throws Exception { + this.yarnSchedulerInterceptor.init(conf, this, rmContext); + super.serviceStart(); + } - @Override - public synchronized void handle(SchedulerEvent event) { - this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event); - super.handle(event); - this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); - } + @Override + public synchronized void handle(SchedulerEvent event) { + this.yarnSchedulerInterceptor.beforeSchedulerEventHandled(event); + super.handle(event); + this.yarnSchedulerInterceptor.afterSchedulerEventHandled(event); + } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java index 6c73244..008953e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/MyriadFifoScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/67ecf063/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java index e63e42b..c758ea4 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/yarn/RMNodeEventHandler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,17 +27,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; * Passes the {@link RMNodeEvent} events into the {@link YarnSchedulerInterceptor}. */ public class RMNodeEventHandler implements EventHandler<RMNodeEvent> { - private final YarnSchedulerInterceptor interceptor; - private final RMContext rmContext; + private final YarnSchedulerInterceptor interceptor; + private final RMContext rmContext; - public RMNodeEventHandler(YarnSchedulerInterceptor interceptor, RMContext rmContext) { - this.interceptor = interceptor; - this.rmContext = rmContext; - } + public RMNodeEventHandler(YarnSchedulerInterceptor interceptor, RMContext rmContext) { + this.interceptor = interceptor; + this.rmContext = rmContext; + } - @Override - public void handle(RMNodeEvent event) { - interceptor.beforeRMNodeEventHandled(event, rmContext); + @Override + public void handle(RMNodeEvent event) { + interceptor.beforeRMNodeEventHandled(event, rmContext); - } + } }