Repository: incubator-myriad Updated Branches: refs/heads/master fceaf7301 -> d29ee338e
MYRIAD-157 Concurrency issues while trying to flexup and flexdown NMs⦠⦠at the same time When someone tries to flexup and flex down multiple NMs at the same time there are multiple issues that surface up starting form ConcurrentModification Exception and ending with NodeTask being null. This closes: #12 Review: https://github.com/apache/incubator-myriad/pull/12 Project: http://git-wip-us.apache.org/repos/asf/incubator-myriad/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-myriad/commit/d29ee338 Tree: http://git-wip-us.apache.org/repos/asf/incubator-myriad/tree/d29ee338 Diff: http://git-wip-us.apache.org/repos/asf/incubator-myriad/diff/d29ee338 Branch: refs/heads/master Commit: d29ee338e017673e6714e6f39eb02cc9e27fc116 Parents: fceaf73 Author: Yuliya Feldman <yfeld...@maprtech.com> Authored: Mon Oct 19 17:46:31 2015 -0700 Committer: Santosh Marella <mare...@gmail.com> Committed: Mon Oct 19 17:46:31 2015 -0700 ---------------------------------------------------------------------- .../ebay/myriad/scheduler/MyriadOperations.java | 11 +++++------ .../ebay/myriad/scheduler/TaskTerminator.java | 15 +++++++++++++-- .../handlers/ResourceOffersEventHandler.java | 13 +++++++++++++ .../event/handlers/StatusUpdateEventHandler.java | 15 ++++++++++----- .../com/ebay/myriad/state/SchedulerState.java | 19 +++++++++++++------ 5 files changed, 54 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java index aff604c..903120a 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/MyriadOperations.java @@ -28,7 +28,6 @@ import com.ebay.myriad.scheduler.constraints.LikeConstraint; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.google.inject.Inject; import org.apache.mesos.Protos; @@ -141,7 +140,7 @@ public class MyriadOperations { // Flex down Pending tasks, if any if (numScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> pendingTasks = Sets.newHashSet(this.schedulerState.getPendingTaskIds(serviceName)); + Collection<Protos.TaskID> pendingTasks = this.schedulerState.getPendingTaskIds(serviceName); for (Protos.TaskID taskId : pendingTasks) { this.schedulerState.makeTaskKillable(taskId); @@ -155,7 +154,7 @@ public class MyriadOperations { // Flex down Staging tasks, if any if (numScaledDown < numInstancesToScaleDown) { - Set<Protos.TaskID> stagingTasks = Sets.newHashSet(this.schedulerState.getStagingTaskIds(serviceName)); + Collection<Protos.TaskID> stagingTasks = this.schedulerState.getStagingTaskIds(serviceName); for (Protos.TaskID taskId : stagingTasks) { this.schedulerState.makeTaskKillable(taskId); @@ -167,7 +166,7 @@ public class MyriadOperations { } int numStagingTasksScaledDown = numScaledDown - numPendingTasksScaledDown; - Set<NodeTask> activeTasks = Sets.newHashSet(this.schedulerState.getActiveTasksByType(serviceName)); + Set<NodeTask> activeTasks = this.schedulerState.getActiveTasksByType(serviceName); if (numScaledDown < numInstancesToScaleDown) { for (NodeTask nodeTask : activeTasks) { this.schedulerState.makeTaskKillable(nodeTask.getTaskStatus().getTaskId()); @@ -191,7 +190,7 @@ public class MyriadOperations { profile, constraint, numInstancesToScaleDown) : 0; } - private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { + private int flexDownStagingTasks(ServiceResourceProfile profile, Constraint constraint, int numInstancesToScaleDown) { return numInstancesToScaleDown > 0 ? flexDownTasks(schedulerState.getStagingTaskIDsForProfile(profile), profile, constraint, numInstancesToScaleDown) : 0; } @@ -220,7 +219,7 @@ public class MyriadOperations { } } return numInstancesScaledDown; - } + } private boolean meetsConstraint(NodeTask nodeTask, Constraint constraint) { if (constraint != null) { http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java index 9840099..a048f30 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/TaskTerminator.java @@ -19,11 +19,15 @@ package com.ebay.myriad.scheduler; import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager; +import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; + import java.util.Set; + import javax.inject.Inject; + import org.apache.commons.collections.CollectionUtils; import org.apache.mesos.Protos.Status; import org.apache.mesos.Protos.TaskID; @@ -71,8 +75,15 @@ public class TaskTerminator implements Runnable { this.schedulerState.removeTask(taskIdToKill); } else { Status status = this.driverManager.kill(taskIdToKill); - offerLifeCycleManager.declineOutstandingOffers(schedulerState.getTask(taskIdToKill).getHostname()); - this.schedulerState.removeTask(taskIdToKill); + NodeTask task = schedulerState.getTask(taskIdToKill); + if (task != null) { + offerLifeCycleManager.declineOutstandingOffers( + task.getHostname()); + this.schedulerState.removeTask(taskIdToKill); + } else { + schedulerState.removeTask(taskIdToKill); + LOGGER.warn("NodeTask with taskId: {} does not exist", taskIdToKill); + } Preconditions.checkState(status == Status.DRIVER_RUNNING); } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java index 1ca9ec0..3abf9ab 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/ResourceOffersEventHandler.java @@ -30,6 +30,7 @@ import com.ebay.myriad.scheduler.event.ResourceOffersEvent; import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager; import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; +import com.google.common.collect.Sets; import com.lmax.disruptor.EventHandler; import java.util.Iterator; @@ -111,11 +112,20 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv for (NodeTask nodeTask : nodeTasks) { nodeTask.setSlaveAttributes(offer.getAttributesList()); } + // keep this in case SchedulerState gets out of sync. This should not happen with + // synchronizing addNodes method in SchedulerState + // but to keep it safe + final Set<Protos.TaskID> missingTasks = Sets.newHashSet(); Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds(); if (CollectionUtils.isNotEmpty(pendingTasks)) { for (Protos.TaskID pendingTaskId : pendingTasks) { NodeTask taskToLaunch = schedulerState .getTask(pendingTaskId); + if (taskToLaunch == null) { + missingTasks.add(pendingTaskId); + LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId); + continue; + } String taskPrefix = taskToLaunch.getTaskPrefix(); ServiceResourceProfile profile = taskToLaunch.getProfile(); Constraint constraint = taskToLaunch.getConstraint(); @@ -154,6 +164,9 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv } } } + for (Protos.TaskID taskId : missingTasks) { + schedulerState.removeTask(taskId); + } } } http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java index bd46747..628f09e 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/scheduler/event/handlers/StatusUpdateEventHandler.java @@ -20,9 +20,12 @@ package com.ebay.myriad.scheduler.event.handlers; import com.ebay.myriad.scheduler.event.StatusUpdateEvent; import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager; +import com.ebay.myriad.state.NodeTask; import com.ebay.myriad.state.SchedulerState; import com.lmax.disruptor.EventHandler; + import javax.inject.Inject; + import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; @@ -52,8 +55,10 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> TaskStatus status = event.getStatus(); this.schedulerState.updateTask(status); TaskID taskId = status.getTaskId(); - if (!schedulerState.hasTask(taskId)) { + NodeTask task = schedulerState.getTask(taskId); + if (task == null) { LOGGER.warn("Task: {} not found, status: {}", taskId.getValue(), status.getState()); + schedulerState.removeTask(taskId); return; } LOGGER.info("Status Update for task: {} | state: {}", taskId.getValue(), status.getState()); @@ -70,20 +75,20 @@ public class StatusUpdateEventHandler implements EventHandler<StatusUpdateEvent> schedulerState.makeTaskActive(taskId); break; case TASK_FINISHED: - offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname()); + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); schedulerState.removeTask(taskId); break; case TASK_FAILED: // Add to pending tasks - offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname()); + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); schedulerState.makeTaskPending(taskId); break; case TASK_KILLED: - offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname()); + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); schedulerState.removeTask(taskId); break; case TASK_LOST: - offerLifecycleManager.declineOutstandingOffers(schedulerState.getTask(taskId).getHostname()); + offerLifecycleManager.declineOutstandingOffers(task.getHostname()); schedulerState.makeTaskPending(taskId); break; default: http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/d29ee338/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java ---------------------------------------------------------------------- diff --git a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java index f0ea778..08a4dfa 100644 --- a/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java +++ b/myriad-scheduler/src/main/java/com/ebay/myriad/state/SchedulerState.java @@ -60,7 +60,12 @@ public class SchedulerState { loadStateStore(); } - public void addNodes(Collection<NodeTask> nodes) { + /** + * Making method synchronized, so if someone tries flexup/down at the same time + * addNodes and removeTask will not put data into an inconsistent state + * @param nodes + */ + public synchronized void addNodes(Collection<NodeTask> nodes) { if (CollectionUtils.isEmpty(nodes)) { LOGGER.info("No nodes to add"); return; @@ -453,11 +458,13 @@ public class SchedulerState { public SchedulerStateForType(String taskPrefix) { this.taskPrefix = taskPrefix; - this.pendingTasks = new HashSet<>(); - this.stagingTasks = new HashSet<>(); - this.activeTasks = new HashSet<>(); - this.lostTasks = new HashSet<>(); - this.killableTasks = new HashSet<>(); + // Since Sets.newConcurrentHashSet is available only starting form Guava version 15 + // and so far (Hadoop 2.7) uses guava 13 we can not easily use it + this.pendingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); + this.stagingTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); + this.activeTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); + this.lostTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); + this.killableTasks = Collections.newSetFromMap(new ConcurrentHashMap<Protos.TaskID, Boolean>()); } @SuppressWarnings("unused")