Repository: incubator-myriad
Updated Branches:
  refs/heads/master fceaf7301 -> d29ee338e


MYRIAD-157 Concurrency issues while trying to flexup and flexdown NMs…

… at the same time

When someone tries to flexup and flex down multiple NMs at the same time there 
are multiple issues that  surface up starting form ConcurrentModification 
Exception and ending with NodeTask being null.

This closes: #12
Review: https://github.com/apache/incubator-myriad/pull/12


Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/d29ee338
Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/d29ee338
Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/d29ee338

Branch: refs/heads/master
Commit: d29ee338e017673e6714e6f39eb02cc9e27fc116
Parents: fceaf73
Author: Yuliya Feldman <yfeld...@maprtech.com>
Authored: Mon Oct 19 17:46:31 2015 -0700
Committer: Santosh Marella <mare...@gmail.com>
Committed: Mon Oct 19 17:46:31 2015 -0700

----------------------------------------------------------------------
 .../ebay/myriad/scheduler/MyriadOperations.java  | 11 +++++------
 .../ebay/myriad/scheduler/TaskTerminator.java    | 15 +++++++++++++--
 .../handlers/ResourceOffersEventHandler.java     | 13 +++++++++++++
 .../event/handlers/StatusUpdateEventHandler.java | 15 ++++++++++-----
 .../com/ebay/myriad/state/SchedulerState.java    | 19 +++++++++++++------
 5 files changed, 54 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
index aff604c..903120a 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java
@@ -28,7 +28,6 @@ import com.ebay.myriad.scheduler.constraints.LikeConstraint;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import com.google.inject.Inject;
 
 import org.apache.mesos.Protos;
@@ -141,7 +140,7 @@ public class MyriadOperations {
       
       // Flex down Pending tasks, if any
       if (numScaledDown < numInstancesToScaleDown) {
-        Set<Protos.TaskID> pendingTasks = 
Sets.newHashSet(this.schedulerState.getPendingTaskIds(serviceName));
+        Collection<Protos.TaskID> pendingTasks = 
this.schedulerState.getPendingTaskIds(serviceName);
 
         for (Protos.TaskID taskId : pendingTasks) {
             this.schedulerState.makeTaskKillable(taskId);
@@ -155,7 +154,7 @@ public class MyriadOperations {
       
       // Flex down Staging tasks, if any
       if (numScaledDown < numInstancesToScaleDown) {
-          Set<Protos.TaskID> stagingTasks = 
Sets.newHashSet(this.schedulerState.getStagingTaskIds(serviceName));
+          Collection<Protos.TaskID> stagingTasks = 
this.schedulerState.getStagingTaskIds(serviceName);
 
           for (Protos.TaskID taskId : stagingTasks) {
               this.schedulerState.makeTaskKillable(taskId);
@@ -167,7 +166,7 @@ public class MyriadOperations {
       }
       int numStagingTasksScaledDown = numScaledDown - 
numPendingTasksScaledDown;
 
-      Set<NodeTask> activeTasks = 
Sets.newHashSet(this.schedulerState.getActiveTasksByType(serviceName));
+      Set<NodeTask> activeTasks = 
this.schedulerState.getActiveTasksByType(serviceName);
       if (numScaledDown < numInstancesToScaleDown) {
         for (NodeTask nodeTask : activeTasks) {
           
this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId());
@@ -191,7 +190,7 @@ public class MyriadOperations {
           profile, constraint, numInstancesToScaleDown) : 0;
     }
 
-  private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint 
constraint, int numInstancesToScaleDown) {
+    private int flexDownStagingTasks(ServiceResourceProfile profile, 
Constraint constraint, int numInstancesToScaleDown) {
       return numInstancesToScaleDown > 0 ? 
flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile),
           profile, constraint, numInstancesToScaleDown) : 0;
     }
@@ -220,7 +219,7 @@ public class MyriadOperations {
         }
       }
       return numInstancesScaledDown;
-    }
+  }
 
   private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) {
     if (constraint != null) {

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
index 9840099..a048f30 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java
@@ -19,11 +19,15 @@
 package com.ebay.myriad.scheduler;
 
 import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
+import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+
 import java.util.Set;
+
 import javax.inject.Inject;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.mesos.Protos.Status;
 import org.apache.mesos.Protos.TaskID;
@@ -71,8 +75,15 @@ public class TaskTerminator implements Runnable {
               this.schedulerState.removeTask(taskIdToKill);
             } else {
               Status status = this.driverManager.kill(taskIdToKill);
-              
offerLifeCycleManager.declineOutstandingOffers(schedulerState.getTask(taskIdToKill).getHostname());
-              this.schedulerState.removeTask(taskIdToKill);
+              NodeTask task = schedulerState.getTask(taskIdToKill);
+              if (task != null) {
+                offerLifeCycleManager.declineOutstandingOffers(
+                    task.getHostname());
+                this.schedulerState.removeTask(taskIdToKill);
+              } else {
+                schedulerState.removeTask(taskIdToKill);
+                LOGGER.warn("NodeTask with taskId: {} does not exist", 
taskIdToKill);
+              }
               Preconditions.checkState(status == Status.DRIVER_RUNNING);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
index 1ca9ec0..3abf9ab 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java
@@ -30,6 +30,7 @@ import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
 import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
 import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
+import com.google.common.collect.Sets;
 import com.lmax.disruptor.EventHandler;
 
 import java.util.Iterator;
@@ -111,11 +112,20 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
         for (NodeTask nodeTask : nodeTasks) {
           nodeTask.setSlaveAttributes(offer.getAttributesList());
         }
+        // keep this in case SchedulerState gets out of sync. This should not 
happen with 
+        // synchronizing addNodes method in SchedulerState
+        // but to keep it safe
+        final Set<Protos.TaskID> missingTasks = Sets.newHashSet();
         Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
         if (CollectionUtils.isNotEmpty(pendingTasks)) {
           for (Protos.TaskID pendingTaskId : pendingTasks) {
             NodeTask taskToLaunch = schedulerState
                 .getTask(pendingTaskId);
+            if (taskToLaunch == null) {
+              missingTasks.add(pendingTaskId);
+              LOGGER.warn("Node task for TaskID: {} does not exist", 
pendingTaskId);
+              continue;
+            }
             String taskPrefix = taskToLaunch.getTaskPrefix();
             ServiceResourceProfile profile = taskToLaunch.getProfile();
             Constraint constraint = taskToLaunch.getConstraint();
@@ -154,6 +164,9 @@ public class ResourceOffersEventHandler implements 
EventHandler<ResourceOffersEv
               }
             }
           }
+          for (Protos.TaskID taskId : missingTasks) {
+            schedulerState.removeTask(taskId);
+          }
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
index bd46747..628f09e 100644
--- 
a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
+++ 
b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java
@@ -20,9 +20,12 @@ package com.ebay.myriad.scheduler.event.handlers;
 
 import com.ebay.myriad.scheduler.event.StatusUpdateEvent;
 import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
+import com.ebay.myriad.state.NodeTask;
 import com.ebay.myriad.state.SchedulerState;
 import com.lmax.disruptor.EventHandler;
+
 import javax.inject.Inject;
+
 import org.apache.mesos.Protos.TaskID;
 import org.apache.mesos.Protos.TaskState;
 import org.apache.mesos.Protos.TaskStatus;
@@ -52,8 +55,10 @@ public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent>
         TaskStatus status = event.getStatus();
         this.schedulerState.updateTask(status);
         TaskID taskId = status.getTaskId();
-        if (!schedulerState.hasTask(taskId)) {
+        NodeTask task = schedulerState.getTask(taskId);
+        if (task == null) {
             LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), 
status.getState());
+            schedulerState.removeTask(taskId);
             return;
         }
         LOGGER.info("Status Update for task: {} | state: {}", 
taskId.getValue(), status.getState());
@@ -70,20 +75,20 @@ public class StatusUpdateEventHandler implements 
EventHandler<StatusUpdateEvent>
                 schedulerState.makeTaskActive(taskId);
                 break;
             case TASK_FINISHED:
-                
offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname());
+                
offerLifecycleManager.declineOutstandingOffers(task.getHostname());
                 schedulerState.removeTask(taskId);
                 break;
             case TASK_FAILED:
                 // Add to pending tasks
-              
offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname());
+              
offerLifecycleManager.declineOutstandingOffers(task.getHostname());
               schedulerState.makeTaskPending(taskId);
                 break;
             case TASK_KILLED:
-              
offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname());
+              
offerLifecycleManager.declineOutstandingOffers(task.getHostname());
                 schedulerState.removeTask(taskId);
                 break;
             case TASK_LOST:
-              
offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname());
+              
offerLifecycleManager.declineOutstandingOffers(task.getHostname());
                 schedulerState.makeTaskPending(taskId);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
----------------------------------------------------------------------
diff --git 
a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java 
b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
index f0ea778..08a4dfa 100644
--- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
+++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java
@@ -60,7 +60,12 @@ public class SchedulerState {
         loadStateStore();
     }
 
-    public void addNodes(Collection<NodeTask> nodes) {
+    /**
+     * Making method synchronized, so if someone tries flexup/down at the same 
time
+     * addNodes and removeTask will not put data into an inconsistent state
+     * @param nodes
+     */
+    public synchronized void addNodes(Collection<NodeTask> nodes) {
         if (CollectionUtils.isEmpty(nodes)) {
             LOGGER.info("No nodes to add");
             return;
@@ -453,11 +458,13 @@ public class SchedulerState {
 
     public SchedulerStateForType(String taskPrefix) {
       this.taskPrefix = taskPrefix;
-      this.pendingTasks = new HashSet<>();
-      this.stagingTasks = new HashSet<>();
-      this.activeTasks = new HashSet<>();
-      this.lostTasks = new HashSet<>();
-      this.killableTasks = new HashSet<>();
+      // Since Sets.newConcurrentHashSet is available only starting form Guava 
version 15
+      // and so far (Hadoop 2.7) uses guava 13 we can not easily use it
+      this.pendingTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.stagingTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.activeTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.lostTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
+      this.killableTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<Protos.TaskID, Boolean>());
 
     }
     @SuppressWarnings("unused")

Reply via email to