Author: edwardyoon Date: Thu Apr 5 11:21:50 2012 New Revision: 1309761 URL: http://svn.apache.org/viewvc?rev=1309761&view=rev Log: should make an attempt to start the task on the host that has the input split located on it
Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1309761&r1=1309760&r2=1309761&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Thu Apr 5 11:21:50 2012 @@ -16,6 +16,8 @@ Release 0.5 - Unreleased IMPROVEMENTS + HAMA-543: Make best effort to start BSP Task on the host + where the input split is located. (Suraj Menon via edwardyoon) HAMA-527: Update commons-configuration version (edwardyoon) HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut) HAMA-485: Fill Counters with useful information (tjungblut) Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1309761&r1=1309760&r2=1309761&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Thu Apr 5 11:21:50 2012 @@ -19,6 +19,9 @@ package org.apache.hama.bsp; import java.io.DataInputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,6 +62,7 @@ class JobInProgress { Path jobFile = null; Path localJobFile = null; Path localJarFile = null; + private LocalFileSystem localFs; // Indicates how many times the job got restarted private int restartCount; @@ -81,6 +85,10 @@ class JobInProgress { int clusterSize; String jobSplit; + Map<Task, GroomServerStatus> taskToGroomMap; + // Used only for scheduling! + Map<GroomServerStatus, Integer> tasksInGroomMap; + public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master, Configuration conf) throws IOException { this.conf = conf; @@ -89,6 +97,10 @@ class JobInProgress { this.jobFile = jobFile; this.master = master; + this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length); + + this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>(); + this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP.value(), counters); this.startTime = System.currentTimeMillis(); @@ -231,9 +243,17 @@ class JobInProgress { LOG.info("Job is initialized."); } - public synchronized Task obtainNewTask(GroomServerStatus status, - int clusterSize) { - this.clusterSize = clusterSize; + public Iterator<GroomServerStatus> getGroomsForTask() { + return null; + } + + public GroomServerStatus getGroomStatusForTask(Task t) { + return this.taskToGroomMap.get(t); + } + + public synchronized Task obtainNewTask( + Map<String, GroomServerStatus> groomStatuses) { + this.clusterSize = groomStatuses.size(); if (this.status.getRunState() != JobStatus.RUNNING) { LOG.info("Cannot create task split for " + profile.getJobID()); @@ -241,10 +261,18 @@ class JobInProgress { } Task result = null; + try { for (int i = 0; i < tasks.length; i++) { if (!tasks[i].isRunning() && !tasks[i].isComplete()) { - result = tasks[i].getTaskToRun(status); + result = tasks[i].getTaskToRun(groomStatuses, tasksInGroomMap); + if (result != null) + this.taskToGroomMap.put(result, tasks[i].getGroomServerStatus()); + int taskInGroom = 0; + if (tasksInGroomMap.containsKey(tasks[i].getGroomServerStatus())) { + taskInGroom = tasksInGroomMap.get(tasks[i].getGroomServerStatus()); + } + tasksInGroomMap.put(tasks[i].getGroomServerStatus(), taskInGroom + 1); break; } } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1309761&r1=1309760&r2=1309761&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Thu Apr 5 11:21:50 2012 @@ -22,29 +22,34 @@ import static java.util.concurrent.TimeU import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import static java.util.concurrent.TimeUnit.*; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hama.HamaConfiguration; -import org.apache.hama.bsp.GroomServerStatus; import org.apache.hama.ipc.GroomProtocol; import org.apache.hama.monitor.Federator; -import org.apache.hama.monitor.Federator.Act; -import org.apache.hama.monitor.Federator.CollectorHandler; import org.apache.hama.monitor.Metric; import org.apache.hama.monitor.MetricsRecord; import org.apache.hama.monitor.Monitor; import org.apache.hama.monitor.ZKCollector; +import org.apache.hama.monitor.Federator.Act; +import org.apache.hama.monitor.Federator.CollectorHandler; import org.apache.zookeeper.ZooKeeper; /** @@ -108,8 +113,9 @@ class SimpleTaskScheduler extends TaskSc // schedule Collection<GroomServerStatus> glist = groomServerManager .groomServerStatusKeySet(); - schedule(j, (GroomServerStatus[]) glist - .toArray(new GroomServerStatus[glist.size()])); + schedule(j, + (GroomServerStatus[]) glist.toArray(new GroomServerStatus[glist + .size()])); } } @@ -124,25 +130,47 @@ class SimpleTaskScheduler extends TaskSc ClusterStatus clusterStatus = groomServerManager.getClusterStatus(false); final int numGroomServers = clusterStatus.getGroomServers(); final ScheduledExecutorService sched = Executors - .newScheduledThreadPool(statuses.length + 5); - for (GroomServerStatus status : statuses) { - sched - .schedule(new TaskWorker(status, numGroomServers, job), 0, SECONDS); - }// for + .newScheduledThreadPool(1);// statuses.length + 5); + + ScheduledFuture<Boolean> jobScheduleResult = sched.schedule( + new TaskWorker(statuses, numGroomServers, job), 0, SECONDS); + + Boolean jobResult = Boolean.FALSE; + + try { + jobResult = jobScheduleResult.get(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + jobResult = Boolean.FALSE; + LOG.error("Error submitting job", e); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + jobResult = Boolean.FALSE; + LOG.error("Error submitting job", e); + } + if (Boolean.FALSE.equals(jobResult)) { + LOG.error(new StringBuffer(512).append("Scheduling of job ") + .append(job.getJobName()) + .append(" could not be done successfully. Killing it!").toString()); + job.kill(); + } } } - private class TaskWorker implements Runnable { - private final GroomServerStatus stus; + private class TaskWorker implements Callable<Boolean> { + private final Map<String, GroomServerStatus> groomStatuses; private final int groomNum; private final JobInProgress jip; - TaskWorker(final GroomServerStatus stus, final int num, + TaskWorker(final GroomServerStatus[] stus, final int num, final JobInProgress jip) { - this.stus = stus; + this.groomStatuses = new HashMap<String, GroomServerStatus>(2 * num); + for (GroomServerStatus status : stus) { + this.groomStatuses.put(status.hostName, status); + } this.groomNum = num; this.jip = jip; - if (null == this.stus) + if (null == this.groomStatuses) throw new NullPointerException("Target groom server is not " + "specified."); if (-1 == this.groomNum) @@ -151,35 +179,71 @@ class SimpleTaskScheduler extends TaskSc throw new NullPointerException("No job is specified."); } - public void run() { - // obtain tasks - List<GroomServerAction> actions = new ArrayList<GroomServerAction>(); + public Boolean call() { + + // Action to be sent for each task to the respective groom server. + Map<GroomServerStatus, List<LaunchTaskAction>> actionMap = + new HashMap<GroomServerStatus, List<LaunchTaskAction>>( + 2 * this.groomStatuses.size()); + Set<Task> taskSet = new HashSet<Task>(2 * jip.tasks.length); Task t = null; int cnt = 0; - while((t = jip.obtainNewTask(this.stus, groomNum) ) != null) { - actions.add(new LaunchTaskAction(t)); - cnt++; - - if(cnt > (this.stus.getMaxTasks() - 1)) + while ((t = jip.obtainNewTask(this.groomStatuses)) != null) { + taskSet.add(t); + // Scheduled all tasks + if (++cnt == this.jip.tasks.length) { break; + } } - + + // if all tasks could not be scheduled + if (cnt != this.jip.tasks.length) { + return Boolean.FALSE; + } + // assembly into actions - // List<Task> tasks = new ArrayList<Task>(); - if (jip.getStatus().getRunState() == JobStatus.RUNNING) { - GroomProtocol worker = groomServerManager.findGroomServer(this.stus); + Iterator<Task> taskIter = taskSet.iterator(); + while (taskIter.hasNext()) { + Task task = taskIter.next(); + GroomServerStatus groomStatus = jip.getGroomStatusForTask(task); + List<LaunchTaskAction> taskActions = actionMap.get(groomStatus); + if (taskActions == null) { + taskActions = new ArrayList<LaunchTaskAction>( + groomStatus.getMaxTasks()); + } + taskActions.add(new LaunchTaskAction(task)); + actionMap.put(groomStatus, taskActions); + } + + Iterator<GroomServerStatus> groomIter = actionMap.keySet().iterator(); + while (jip.getStatus().getRunState() == JobStatus.RUNNING + && groomIter.hasNext()) { + + GroomServerStatus groomStatus = groomIter.next(); + List<LaunchTaskAction> actionList = actionMap.get(groomStatus); + + GroomProtocol worker = groomServerManager.findGroomServer(groomStatus); try { // dispatch() to the groom server - Directive d1 = new DispatchTasksDirective(actions.toArray(new GroomServerAction[0])); + GroomServerAction[] actions = new GroomServerAction[actionList.size()]; + actionList.toArray(actions); + Directive d1 = new DispatchTasksDirective(actions); worker.dispatch(d1); } catch (IOException ioe) { - LOG.error("Fail to dispatch tasks to GroomServer " - + this.stus.getGroomName(), ioe); + LOG.error( + "Fail to dispatch tasks to GroomServer " + + groomStatus.getGroomName(), ioe); } - } else { + + } + + if (groomIter.hasNext() + && jip.getStatus().getRunState() != JobStatus.RUNNING) { LOG.warn("Currently master only shcedules job in running state. " + "This may be refined in the future. JobId:" + jip.getJobID()); } + + return Boolean.TRUE; } } Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1309761&r1=1309760&r2=1309761&view=diff ============================================================================== --- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original) +++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Thu Apr 5 11:21:50 2012 @@ -18,6 +18,8 @@ package org.apache.hama.bsp; import java.io.IOException; +import java.util.Iterator; +import java.util.Map; import java.util.TreeMap; import java.util.TreeSet; @@ -49,6 +51,8 @@ class TaskInProgress { private TaskID id; private JobInProgress job; private int completes = 0; + + private GroomServerStatus myGroomStatus = null; // Status // private double progress = 0; @@ -114,7 +118,8 @@ class TaskInProgress { /** * Return a Task that can be sent to a GroomServer for execution. */ - public Task getTaskToRun(GroomServerStatus status) throws IOException { + public Task getTaskToRun(Map<String, GroomServerStatus> grooms, + Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException { Task t = null; TaskAttemptID taskid = null; @@ -131,13 +136,42 @@ class TaskInProgress { String splitClass = null; BytesWritable split = null; + GroomServerStatus selectedGroom = null; if(rawSplit != null){ splitClass = rawSplit.getClassName(); split = rawSplit.getBytes(); + String[] possibleLocations = rawSplit.getLocations(); + for (int i = 0; i < possibleLocations.length; ++i){ + String location = possibleLocations[i]; + GroomServerStatus groom = grooms.get(location); + Integer taskInGroom = tasksInGroomMap.get(groom); + taskInGroom = (taskInGroom == null)?0:taskInGroom; + if(taskInGroom < groom.getMaxTasks() && + location.equals(groom.getGroomHostName())){ + selectedGroom = groom; + t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split); + activeTasks.put(taskid, groom.getGroomName()); + + break; + } + } + } + //Failed in attempt to get data locality or there was no input split. + if(selectedGroom == null){ + Iterator<String> groomIter = grooms.keySet().iterator(); + while(groomIter.hasNext()) { + GroomServerStatus groom = grooms.get(groomIter.next()); + Integer taskInGroom = tasksInGroomMap.get(groom); + taskInGroom = (taskInGroom == null)?0:taskInGroom; + if(taskInGroom < groom.getMaxTasks()){ + selectedGroom = groom; + t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split); + activeTasks.put(taskid, groom.getGroomName()); + } + } } - t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split); - activeTasks.put(taskid, status.getGroomName()); + myGroomStatus = selectedGroom; return t; } @@ -170,6 +204,10 @@ class TaskInProgress { public TreeMap<TaskAttemptID, String> getTasks() { return activeTasks; } + + public GroomServerStatus getGroomServerStatus(){ + return myGroomStatus; + } /** * Is the Task associated with taskid is the first attempt of the tip?