Addressing review comments and another fix

* Addressed review comments
* Fixed problems with NodeManager restart
  Credits to Santosh Marella for this
  If a NodeManager dies, Myriad scheduler will relaunch it.
  Currently we were ignoring the node added event if we had
  a record of a previous NM running on that node. Fixed this.
  Also RM does not immediately detect the loss of a NM.
  It does so after a timeout interval. After the timeout interval
  the NM node is removed from the Scheduler. However during the
  interval between NM dying and RM detecting this, a new NM could be launched
  on the node. After the timeout, this newly running NM was being
  removed form the scheduler. Fixed this.

* Tried failing over RM + scheduler while a job was in progress successfully.
* Tried killing NM. NM was re-launched successfully and continued running
  yarn containers (and corresponding Mesos tasks) after restart.


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/e7c81e4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/e7c81e4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/e7c81e4f

Branch: refs/heads/master
Commit: e7c81e4f690c7f7374dbc866c3c87ec68ad79995
Parents: 9677ef8
Author: Swapnil Daingade <sdaing...@maprtech.com>
Authored: Wed Sep 2 10:45:00 2015 -0700
Committer: Swapnil Daingade <sdaing...@maprtech.com>
Committed: Wed Sep 2 10:45:00 2015 -0700

----------------------------------------------------------------------
 .../java/com/ebay/myriad/executor/MyriadExecutor.java     | 10 ++++------
 .../java/com/ebay/myriad/scheduler/MyriadOperations.java  |  1 +
 .../com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java |  5 -----
 .../myriad/scheduler/fgs/YarnNodeCapacityManager.java     |  6 +-----
 .../main/java/com/ebay/myriad/state/SchedulerState.java   |  5 ++---
 5 files changed, 8 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
----------------------------------------------------------------------
diff --git 
a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java 
b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
index 42636f5..4f1e4e9 100644
--- a/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
+++ b/myriad-executor/src/main/java/com/ebay/myriad/executor/MyriadExecutor.java
@@ -39,9 +39,6 @@ public class MyriadExecutor implements Executor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MyriadExecutor.class);
 
-  private static final String YARN_CONTAINER_TASK_ID_PREFIX
-     = "yarn_";
-
   private Set<String> containerIds;
 
   public MyriadExecutor(Set<String> containerTaskIds) {
@@ -79,14 +76,15 @@ public class MyriadExecutor implements Executor {
     LOGGER.debug("killTask received for taskId: " + taskId.getValue());
     TaskStatus status;
 
-    if (!taskId.toString().contains(YARN_CONTAINER_TASK_ID_PREFIX)) {
+    if (!taskId.toString().contains(
+      MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX)) {
       // Inform mesos of killing all tasks corresponding to yarn containers 
that are
       // currently running 
       synchronized (containerIds) {
         for (String containerId : containerIds) {
           Protos.TaskID containerTaskId = Protos.TaskID.newBuilder()
-            .setValue(YARN_CONTAINER_TASK_ID_PREFIX + containerId)
-            .build();
+            .setValue(MyriadExecutorAuxService.YARN_CONTAINER_TASK_ID_PREFIX
+              + containerId).build();
             status = TaskStatus.newBuilder().setTaskId(containerTaskId)
               .setState(TaskState.TASK_KILLED)
               .build();

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index 8c15bfa..4947647 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -82,6 +82,7 @@ public class MyriadOperations {
             for (NodeTask nodeTask : activeTasks) {
                 if (nodeTask.getHostname().equals(nodeToScaleDown)) {
                     nodePresentInMyriad = true;    
+                    break;
                 }
             }
             if (!nodePresentInMyriad) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
index 47393a4..ba2c3b7 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/NMHeartBeatHandler.java
@@ -89,11 +89,6 @@ public class NMHeartBeatHandler extends BaseInterceptor {
       }
       break;
 
-      case EXPIRE: {
-        nodeStore.remove(event.getNodeId().getHost());
-      }
-      break;
-
       case STATUS_UPDATE: {
         handleStatusUpdate(event, context);
       }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
index 12bbe73..696e82f 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/fgs/YarnNodeCapacityManager.java
@@ -100,10 +100,6 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
               NodeAddedSchedulerEvent nodeAddedEvent = 
(NodeAddedSchedulerEvent) event;
               NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID();
               String host = nodeId.getHost();
-              if (nodeStore.isPresent(host)) {
-                LOGGER.warn("Ignoring duplicate node registration. Host: {}", 
host);
-                return;
-              }
 
               SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId);
               nodeStore.add(node);
@@ -197,7 +193,7 @@ public class YarnNodeCapacityManager extends 
BaseInterceptor {
   public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
     rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
     rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
-    LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
+    LOGGER.debug("Setting capacity for node {} to {}", rmNode.getHostName(), 
newCapacity);
     // updates the scheduler with the new capacity for the NM.
     // the event is handled by the scheduler asynchronously
     rmContext.getDispatcher().getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/e7c81e4f/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index f589056..4b5aff3 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -48,7 +48,6 @@ public class SchedulerState {
     private Set<Protos.TaskID> activeTasks;
     private Set<Protos.TaskID> lostTasks;
     private Set<Protos.TaskID> killableTasks;
-    //private MyriadState myriadState;
     private Protos.FrameworkID frameworkId;
     private MyriadStateStore stateStore;
 
@@ -152,7 +151,7 @@ public class SchedulerState {
         updateStateStore();
     }
 
-    public Set<Protos.TaskID> getKillableTasks() {
+    public synchronized Set<Protos.TaskID> getKillableTasks() {
         return Collections.unmodifiableSet(this.killableTasks);
     }
 
@@ -175,7 +174,7 @@ public class SchedulerState {
         return Collections.unmodifiableSet(this.pendingTasks);
     }
 
-    public Set<Protos.TaskID> getActiveTaskIds() {
+    public synchronized Set<Protos.TaskID> getActiveTaskIds() {
         return Collections.unmodifiableSet(this.activeTasks);
     }
 

Reply via email to