Author: edwardyoon
Date: Wed Jun 4 03:46:27 2014
New Revision: 1599853
URL: http://svn.apache.org/r1599853
Log:
HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via
edwardyoon)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Jun 4 03:46:27 2014
@@ -17,6 +17,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-909: Improve Mesos Scheduler's Fault Tolerance (Jeff Fenchel via
edwardyoon)
HAMA-823: Remove javadoc warnings (Victor Lee via edwardyoon)
HAMA-886: Refactoring core.bundle package (edwardyoon)
HAMA-899: Add getAdjacentPeerNames() that returns the names of locally
adjacent peers (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Wed Jun 4
03:46:27 2014
@@ -200,6 +200,13 @@ public class BSPMaster implements JobSub
} else {
jip.status.setRunState(JobStatus.FAILED);
jip.failedTask(tip, ts);
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.jobRemoved(jip);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter scheduler a job is moved.", ioe);
+ }
+ }
}
}
if (jip.getStatus().getRunState() == JobStatus.SUCCEEDED) {
@@ -224,6 +231,14 @@ public class BSPMaster implements JobSub
throw new DirectiveException("Error when dispatching kill task"
+ " action.", ioe);
}
+
+ for (JobInProgressListener listener : jobInProgressListeners) {
+ try {
+ listener.jobRemoved(jip);
+ } catch (IOException ioe) {
+ LOG.error("Fail to alter scheduler a job is moved.", ioe);
+ }
+ }
}
}
} else {
Modified:
hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java
(original)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/ResourceManager.java Wed
Jun 4 03:46:27 2014
@@ -23,12 +23,14 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -36,7 +38,6 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hama.bsp.TaskWorkerManager.TaskWorker;
-import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.mesos.Protos.CommandInfo;
@@ -59,11 +60,13 @@ public class ResourceManager {
private Configuration conf;
private static long launchedTasks = 0;
- private Set<JobInProgress> executing = new HashSet<JobInProgress>();
- private Set<TaskInProgress> executingTasks = new HashSet<TaskInProgress>();
+ private Set<JobInProgress> executingJobs = Collections.synchronizedSet(new
HashSet<JobInProgress>());
+ private Set<TaskInProgress> executingTasks = Collections.synchronizedSet(new
HashSet<TaskInProgress>());
private Map<String, java.util.Queue<TaskInProgress>> tasksToRunByGroom;
private Set<TaskInProgress> tasksToRun;
+ private Set<TaskInProgress> recoveryTasks = Collections.synchronizedSet(new
HashSet<TaskInProgress>());
+
private long slotMemory;
// Overhead requirements for the container groom server
double groomCpus;
@@ -75,44 +78,40 @@ public class ResourceManager {
/**
* Constructor for the mesos resource manager
*
- * @param conf
- * The configuration options for hama
- * @param serverManager
- * A reference to the groom server manager
- * @param driver
- * The mesos driver. This is required to terminate tasks
+ * @param conf The configuration options for hama
+ * @param serverManager A reference to the groom server manager
+ * @param driver The mesos driver. This is required to terminate tasks
*/
public ResourceManager(Configuration conf,
AtomicReference<GroomServerManager> serverManager, SchedulerDriver
driver) {
- tasksToRunByGroom = new HashMap<String, java.util.Queue<TaskInProgress>>();
- tasksToRunByGroom.put(anyGroomServer, new LinkedList<TaskInProgress>());
+ tasksToRunByGroom = new ConcurrentHashMap<String,
java.util.Queue<TaskInProgress>>();
+ tasksToRunByGroom.put(anyGroomServer, new
ConcurrentLinkedQueue<TaskInProgress>());
tasksToRun = new HashSet<TaskInProgress>();
slotMemory = parseMemory(conf);
- taskDelegator = new TaskDelegator(serverManager, driver, executingTasks);
+ taskDelegator = new TaskDelegator(serverManager, driver, recoveryTasks);
serverManager.get().addGroomStatusListener(taskDelegator);
this.conf = conf;
-
+
groomCpus = conf.getInt("hama.mesos.groom.cpu", 0);
- groomMem = conf.getInt("hama.mesos.groom.mem", 200);;
- groomDisk = conf.getInt("hama.mesos.groom.disk", 0);;
+ groomMem = conf.getInt("hama.mesos.groom.mem", 200);
+ groomDisk = conf.getInt("hama.mesos.groom.disk", 0);
}
/**
* Handle a resource offer by the mesos framework
*
- * @param schedulerDriver
- * The mesos scheduler driver
- * @param offers
- * A list of offers from mesos
+ * @param schedulerDriver The mesos scheduler driver
+ * @param offers A list of offers from mesos
*/
public void resourceOffers(SchedulerDriver schedulerDriver, List<Offer>
offers) {
if (tasksToRun.isEmpty()) {
- //there is no need to track executing tasks if everything is started
+ // there is no need to track executing tasks if everything is
+ // started
clearQueues();
-
+
for (Offer offer : offers) {
schedulerDriver.declineOffer(offer.getId());
}
@@ -124,14 +123,14 @@ public class ResourceManager {
}
private void clearQueues() {
- synchronized (tasksToRunByGroom) {
- for ( java.util.Queue<TaskInProgress> queue :
tasksToRunByGroom.values()) {
- queue.clear();
- }
- executingTasks.clear();
- }
+ synchronized (tasksToRunByGroom) {
+ for (java.util.Queue<TaskInProgress> queue : tasksToRunByGroom.values())
{
+ queue.clear();
+ }
+ executingTasks.clear();
+ }
}
-
+
private void useOffer(SchedulerDriver schedulerDriver, Offer offer) {
log.debug("Received offer From: " + offer.getHostname());
@@ -189,31 +188,31 @@ public class ResourceManager {
@Override
public Boolean call() throws Exception {
log.debug("Task Worker called: " + jip.tasks.length);
- if (!jip.isRecoveryPending()) {
- for (TaskInProgress tip : jip.tasks) {
- String[] grooms = jip.getPreferredGrooms(tip, null, null);
- if (grooms == null) {
- grooms = new String[] { anyGroomServer };
- }
- log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
- + grooms[0]);
- synchronized (tasksToRunByGroom) {
- for (String groom : grooms) {
- if (!tasksToRunByGroom.containsKey(groom)) {
- tasksToRunByGroom.put(groom, new LinkedList<TaskInProgress>());
- log.info("Received request for groom: " + groom);
- }
- tasksToRunByGroom.get(groom).add(tip);
+ for (TaskInProgress tip : jip.tasks) {
+ if (jip.isRecoveryPending()) {
+ recoveryTasks.add(tip);
+ }
+ String[] grooms = jip.getPreferredGrooms(tip, null, null);
+
+ if (grooms == null) {
+ grooms = new String[] { anyGroomServer };
+ }
+ log.info("Prefered Groom for tip " + tip.idWithinJob() + ": "
+ + grooms[0]);
+ synchronized (tasksToRunByGroom) {
+ for (String groom : grooms) {
+ if (!tasksToRunByGroom.containsKey(groom)) {
+ tasksToRunByGroom.put(groom, new
ConcurrentLinkedQueue<TaskInProgress>());
+ log.info("Received request for groom: " + groom);
}
- tasksToRun.add(tip);
- }
+ tasksToRunByGroom.get(groom).add(tip);
+ }
+ tasksToRun.add(tip);
}
- } else {
- throw new UnsupportedOperationException("This feature is not yet
implemented");
- //TODO: Handle task recovery
}
- executing.add(jip);
+
+ executingJobs.add(jip);
return true;
}
}
@@ -421,8 +420,7 @@ public class ResourceManager {
/**
* Get the amount of memory requested in MiB
*
- * @param javaOpts
- * java options
+ * @param javaOpts java options
* @return mesos formated memory argument
*/
private static long parseMemory(Configuration conf) {
@@ -439,7 +437,8 @@ public class ResourceManager {
value = value * 1024;
}
- // remove memory request from the child java opts so it may be added
later
+ // remove memory request from the child java opts so it may be added
+ // later
conf.set("bsp.child.java.opts", memMatcher.replaceAll(""));
return value;
Modified: hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java?rev=1599853&r1=1599852&r2=1599853&view=diff
==============================================================================
--- hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java
(original)
+++ hama/trunk/mesos/src/main/java/org/apache/hama/bsp/TaskDelegator.java Wed
Jun 4 03:46:27 2014
@@ -18,8 +18,9 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@@ -35,14 +36,15 @@ import org.apache.mesos.SchedulerDriver;
public class TaskDelegator implements GroomStatusListener {
public static final Log LOG = LogFactory.getLog(MesosScheduler.class);
- private Set<TaskInProgress> executingTasks;
+ private Set<TaskInProgress> recoveryTasks;
-
/**
* Map to hold assignments from groomServerNames to TasksInProgress
*/
private MultiValueMap assignments = new MultiValueMap();
+ private MultiValueMap jobAssignments = new MultiValueMap();
+
private AtomicReference<GroomServerManager> groomServerManager;
/**
@@ -56,10 +58,12 @@ public class TaskDelegator implements Gr
private SchedulerDriver driver;
public TaskDelegator(AtomicReference<GroomServerManager> groomServerManager,
- SchedulerDriver driver, Set<TaskInProgress> executingTasks) {
+ SchedulerDriver driver, Set<TaskInProgress> recoveryTasks) {
this.groomServerManager = groomServerManager;
+ groomServerManager.get().addJobInProgressListener(
+ new TaskDelegatorJobListener());
this.driver = driver;
- this.executingTasks = executingTasks;
+ this.recoveryTasks = recoveryTasks;
}
@Override
@@ -82,10 +86,8 @@ public class TaskDelegator implements Gr
/**
* Add a task for execution when the groom server becomes available
*
- * @param tip
- * The TaskInProgress to execute
- * @param hostName
- * The hostname where the resource reservation was made
+ * @param tip The TaskInProgress to execute
+ * @param hostName The hostname where the resource reservation was made
*/
public void addTask(TaskInProgress tip, Protos.TaskID taskId,
String hostName, Integer port) {
@@ -99,16 +101,37 @@ public class TaskDelegator implements Gr
execute(tip, groomServers.get(key));
} else {
assignments.put(key, tip);
+ jobAssignments.put(tip.getJob(), new Pair<Object, Object>(key, tip));
}
}
private void execute(TaskInProgress tip, GroomServerStatus status) {
Task task = tip.constructTask(status);
+ GroomServerAction[] actions;
GroomProtocol worker = groomServerManager.get().findGroomServer(status);
- GroomServerAction[] actions = new GroomServerAction[1];
- actions[0] = new LaunchTaskAction(task);
+ if (!recoveryTasks.contains(tip)) {
+ actions = new GroomServerAction[1];
+ actions[0] = new LaunchTaskAction(task);
+ } else {
+ LOG.trace("Executing a recovery task");
+ recoveryTasks.remove(tip);
+ HashMap<String, GroomServerStatus> groomStatuses = new HashMap<String,
GroomServerStatus>(
+ 1);
+ groomStatuses.put(status.hostName, status);
+ Map<GroomServerStatus, List<GroomServerAction>> actionMap = new
HashMap<GroomServerStatus, List<GroomServerAction>>(
+ 2 * groomStatuses.size());
+ try {
+ tip.getJob().recoverTasks(groomStatuses, actionMap);
+ } catch (IOException e) {
+ LOG.warn("Task recovery failed", e);
+ }
+
+ List<GroomServerAction> actionList = actionMap.get(status);
+ actions = new GroomServerAction[actionList.size()];
+ actionList.toArray(actions);
+ }
Directive d1 = new DispatchTasksDirective(actions);
try {
worker.dispatch(d1);
@@ -125,10 +148,41 @@ public class TaskDelegator implements Gr
status.rpcServer).getPort());
groomServers.put(key, status);
assignments.remove(key, task);
-
+ jobAssignments.remove(task.getJob(), new Pair<Object, Object>(key, task));
+
if (assignments.getCollection(key) == null) {
groomServers.remove(key);
driver.killTask(groomTaskIDs.get(key));
}
}
+
+ private class TaskDelegatorJobListener extends JobInProgressListener {
+
+ @Override
+ public void jobAdded(JobInProgress job) throws IOException {
+
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) throws IOException {
+ @SuppressWarnings("unchecked")
+ Collection<Pair<Object, Object>> remainingTasks = jobAssignments
+ .getCollection(job);
+ if (remainingTasks != null) {
+ for (Pair<Object, Object> taskToRemove : remainingTasks) {
+ assignments.remove(taskToRemove.getKey(), taskToRemove.getValue());
+ if (assignments.getCollection(taskToRemove.getKey()) == null) {
+ groomServers.remove(taskToRemove.getKey());
+ driver.killTask(groomTaskIDs.get(taskToRemove.getKey()));
+ }
+ }
+ jobAssignments.remove(job);
+ }
+ }
+
+ @Override
+ public void recoverTaskInJob(JobInProgress job) throws IOException {
+
+ }
+ }
}