Author: omalley
Date: Fri Mar 4 03:53:24 2011
New Revision: 1077223
URL: http://svn.apache.org/viewvc?rev=1077223&view=rev
Log:
commit dc981e7d0d74cc1ea1d0089b3fbc7d89af78a8b8
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Thu Feb 25 20:48:43 2010 +0530
MAPREDUCE:1354 from
https://issues.apache.org/jira/secure/attachment/12437010/mr-1354-y20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1354. Make incremental changes in jobtracker for
+ improving scalability (acmurthy)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Mar 4 03:53:24 2011
@@ -461,7 +461,24 @@ class CapacityTaskScheduler extends Task
Collections.sort(qsiForAssigningTasks, queueComparator);
}
-
+ /**
+ * Ceil of result of dividing two integers.
+ *
+ * This is *not* a utility method.
+ * Neither <code>a</code> or <code>b</code> should be negative.
+ *
+ * @param a
+ * @param b
+ * @return ceil of the result of a/b
+ */
+ private int divideAndCeil(int a, int b) {
+ if (b == 0) {
+ LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+ return 0;
+ }
+ return (a + (b - 1)) / b;
+ }
+
private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
// what is our current capacity? It is equal to the queue-capacity if
// we're running below capacity. If we're running over capacity, then its
@@ -475,13 +492,15 @@ class CapacityTaskScheduler extends Task
else {
currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
}
- int limit = Math.max((int)(Math.ceil((double)currentCapacity/
- (double)qsi.numJobsByUser.size())),
- (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+ int limit =
+ Math.max(divideAndCeil(currentCapacity, qsi.numJobsByUser.size()),
+ divideAndCeil(qsi.ulMin*currentCapacity, 100));
String user = j.getProfile().getUser();
if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
- LOG.debug("User " + user + " is over limit, num slots occupied = " +
- tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User " + user + " is over limit, num slots occupied=" +
+ tsi.numSlotsOccupiedByUser.get(user) + ", limit=" + limit);
+ }
return true;
}
else {
@@ -814,8 +833,7 @@ class CapacityTaskScheduler extends Task
@Override
int getSlotsPerTask(JobInProgress job) {
- return
-
job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
+ return job.getNumSlotsPerTask(TaskType.MAP);
}
@Override
@@ -831,7 +849,7 @@ class CapacityTaskScheduler extends Task
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//Check if job supports speculative map execution first then
//check if job has speculative maps.
- return (job.getJobConf().getMapSpeculativeExecution())&& (
+ return (job.getMapSpeculativeExecution())&& (
hasSpeculativeTask(job.getTasks(TaskType.MAP),
job.getStatus().mapProgress(), tts));
}
@@ -877,8 +895,7 @@ class CapacityTaskScheduler extends Task
@Override
int getSlotsPerTask(JobInProgress job) {
- return
-
job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());
+ return job.getNumSlotsPerTask(TaskType.REDUCE);
}
@Override
@@ -894,7 +911,7 @@ class CapacityTaskScheduler extends Task
boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
//check if the job supports reduce speculative execution first then
//check if the job has speculative tasks.
- return (job.getJobConf().getReduceSpeculativeExecution()) && (
+ return (job.getReduceSpeculativeExecution()) && (
hasSpeculativeTask(job.getTasks(TaskType.REDUCE),
job.getStatus().reduceProgress(), tts));
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri Mar 4 03:53:24 2011
@@ -106,11 +106,11 @@ class MemoryMatcher {
long totalMemUsableOnTT = 0;
long memForThisTask = 0;
if (taskType == TaskType.MAP) {
- memForThisTask = job.getJobConf().getMemoryForMapTask();
+ memForThisTask = job.getMemoryForMapTask();
totalMemUsableOnTT =
scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
} else if (taskType == TaskType.REDUCE) {
- memForThisTask = job.getJobConf().getMemoryForReduceTask();
+ memForThisTask = job.getMemoryForReduceTask();
totalMemUsableOnTT =
scheduler.getMemSizeForReduceSlot()
* taskTracker.getMaxReduceSlots();
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:53:24 2011
@@ -98,8 +98,11 @@ public class JobInProgress {
TaskInProgress setup[] = new TaskInProgress[0];
int numMapTasks = 0;
int numReduceTasks = 0;
- int numSlotsPerMap = 1;
- int numSlotsPerReduce = 1;
+ final long memoryPerMap;
+ final long memoryPerReduce;
+ volatile int numSlotsPerMap = 1;
+ volatile int numSlotsPerReduce = 1;
+ final int maxTaskFailuresPerTracker;
// Counters to track currently running/finished/failed Map/Reduce
task-attempts
int runningMapTasks = 0;
@@ -207,8 +210,8 @@ public class JobInProgress {
private LocalFileSystem localFs;
private FileSystem fs;
private JobID jobId;
- private boolean hasSpeculativeMaps;
- private boolean hasSpeculativeReduces;
+ volatile private boolean hasSpeculativeMaps;
+ volatile private boolean hasSpeculativeReduces;
private long inputLength = 0;
private String user;
private String historyFile = "";
@@ -299,6 +302,10 @@ public class JobInProgress {
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
this.profile = new JobProfile(conf.getUser(), jobid, "", "",
conf.getJobName(), conf.getQueueName());
+ this.memoryPerMap = conf.getMemoryForMapTask();
+ this.memoryPerReduce = conf.getMemoryForReduceTask();
+ this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
try {
@@ -374,12 +381,18 @@ public class JobInProgress {
this.numMapTasks = conf.getNumMapTasks();
this.numReduceTasks = conf.getNumReduceTasks();
+
+ this.memoryPerMap = conf.getMemoryForMapTask();
+ this.memoryPerReduce = conf.getMemoryForReduceTask();
+
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+ this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
MetricsContext metricsContext = MetricsUtil.getContext("mapred");
this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
this.jobMetrics.setTag("user", conf.getUser());
@@ -499,11 +512,27 @@ public class JobInProgress {
return restartCount > 0;
}
+ boolean getMapSpeculativeExecution() {
+ return hasSpeculativeMaps;
+ }
+
+ boolean getReduceSpeculativeExecution() {
+ return hasSpeculativeReduces;
+ }
+
+ long getMemoryForMapTask() {
+ return memoryPerMap;
+ }
+
+ long getMemoryForReduceTask() {
+ return memoryPerReduce;
+ }
+
/**
* Get the number of slots required to run a single map task-attempt.
* @return the number of slots required to run a single map task-attempt
*/
- synchronized int getNumSlotsPerMap() {
+ int getNumSlotsPerMap() {
return numSlotsPerMap;
}
@@ -512,7 +541,7 @@ public class JobInProgress {
* This is typically set by schedulers which support high-ram jobs.
* @param slots the number of slots required to run a single map task-attempt
*/
- synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+ void setNumSlotsPerMap(int numSlotsPerMap) {
this.numSlotsPerMap = numSlotsPerMap;
}
@@ -520,7 +549,7 @@ public class JobInProgress {
* Get the number of slots required to run a single reduce task-attempt.
* @return the number of slots required to run a single reduce task-attempt
*/
- synchronized int getNumSlotsPerReduce() {
+ int getNumSlotsPerReduce() {
return numSlotsPerReduce;
}
@@ -530,7 +559,7 @@ public class JobInProgress {
* @param slots the number of slots required to run a single reduce
* task-attempt
*/
- synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+ void setNumSlotsPerReduce(int numSlotsPerReduce) {
this.numSlotsPerReduce = numSlotsPerReduce;
}
@@ -724,7 +753,7 @@ public class JobInProgress {
return numReduceTasks - runningReduceTasks - failedReduceTIPs -
finishedReduceTasks + speculativeReduceTasks;
}
- public synchronized int getNumSlotsPerTask(TaskType taskType) {
+ public int getNumSlotsPerTask(TaskType taskType) {
if (taskType == TaskType.MAP) {
return numSlotsPerMap;
} else if (taskType == TaskType.REDUCE) {
@@ -1594,7 +1623,7 @@ public class JobInProgress {
trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
// Check if this tasktracker has turned 'flaky'
- if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
+ if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
++flakyTaskTrackers;
// Cancel reservations if appropriate
@@ -1704,7 +1733,7 @@ public class JobInProgress {
List<String> getBlackListedTrackers() {
List<String> blackListedTrackers = new ArrayList<String>();
for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
- if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
+ if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
blackListedTrackers.add(e.getKey());
}
}
@@ -2261,7 +2290,7 @@ public class JobInProgress {
//
int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
- taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+ taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
if (LOG.isDebugEnabled()) {
String flakyTracker = convertTrackerNameToHostName(taskTracker);
LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:53:24 2011
@@ -1840,7 +1840,8 @@ public class JobTracker implements MRCon
//
// All the known jobs. (jobid->JobInProgress)
- Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
+ Map<JobID, JobInProgress> jobs =
+ Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
// (user -> list of JobInProgress)
TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
@@ -3609,15 +3610,20 @@ public class JobTracker implements MRCon
* of the JobTracker. But JobInProgress adds info that's useful for
* the JobTracker alone.
*/
- public synchronized JobStatus submitJob(
- JobID jobId, String jobSubmitDir, TokenStorage ts) throws IOException {
- if(jobs.containsKey(jobId)) {
- //job already running, don't start twice
- return jobs.get(jobId).getStatus();
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
+ throws IOException {
+ JobInfo jobInfo = null;
+ synchronized (this) {
+ if (jobs.containsKey(jobId)) {
+ // job already running, don't start twice
+ return jobs.get(jobId).getStatus();
+ }
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
+ new Path(jobSubmitDir));
}
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
- new Path(jobSubmitDir));
+ // Create the JobInProgress, do not lock the JobTracker since
+ // we are about to copy job.xml from HDFS
JobInProgress job = null;
tokenStorage = ts;
try {
@@ -3626,43 +3632,45 @@ public class JobTracker implements MRCon
throw new IOException(e);
}
- String queue = job.getProfile().getQueueName();
- if(!(queueManager.getQueues().contains(queue))) {
- job.fail();
- throw new IOException("Queue \"" + queue + "\" does not exist");
- }
+ synchronized (this) {
+ String queue = job.getProfile().getQueueName();
+ if (!(queueManager.getQueues().contains(queue))) {
+ job.fail();
+ throw new IOException("Queue \"" + queue + "\" does not exist");
+ }
- // check for access
- try {
- checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
- } catch (IOException ioe) {
- LOG.warn("Access denied for user " + job.getJobConf().getUser()
- + ". Ignoring job " + jobId, ioe);
- job.fail();
- throw ioe;
- }
+ // check for access
+ try {
+ checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+ } catch (IOException ioe) {
+ LOG.warn("Access denied for user " + job.getJobConf().getUser()
+ + ". Ignoring job " + jobId, ioe);
+ job.fail();
+ throw ioe;
+ }
- // Check the job if it cannot run in the cluster because of invalid memory
- // requirements.
- try {
- checkMemoryRequirements(job);
- } catch (IOException ioe) {
- throw ioe;
- }
- boolean recovered = true; //TODO: Once the Job recovery code is there,
- //(MAPREDUCE-873) we
- //must pass the "recovered" flag accurately.
- //This is handled in the trunk/0.22
- if (!recovered) {
- //Store the information in a file so that the job can be recovered
- //later (if at all)
- Path jobDir = getSystemDirectoryForJob(jobId);
- FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
- FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
- jobInfo.write(out);
- out.close();
+ // Check the job if it cannot run in the cluster because of invalid
memory
+ // requirements.
+ try {
+ checkMemoryRequirements(job);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ boolean recovered = true; // TODO: Once the Job recovery code is there,
+ // (MAPREDUCE-873) we
+ // must pass the "recovered" flag accurately.
+ // This is handled in the trunk/0.22
+ if (!recovered) {
+ // Store the information in a file so that the job can be recovered
+ // later (if at all)
+ Path jobDir = getSystemDirectoryForJob(jobId);
+ FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+ FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+ jobInfo.write(out);
+ out.close();
+ }
+ return addJob(jobId, job);
}
- return addJob(jobId, job);
}
/**
@@ -3941,10 +3949,23 @@ public class JobTracker implements MRCon
completedJobStatusStore.store(job);
}
+ /**
+ * Check if the <code>job</code> has been initialized.
+ *
+ * @param job {@link JobInProgress} to be checked
+ * @return <code>true</code> if the job has been initialized,
+ * <code>false</code> otherwise
+ */
+ private boolean isJobInited(JobInProgress job) {
+ return job.inited();
+ }
+
public JobProfile getJobProfile(JobID jobid) {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
+ // Safe to call JobInProgress.getProfile while holding the lock
+ // on the JobTracker since it isn't a synchronized method
return job.getProfile();
} else {
RetireJobInfo info = retireJobs.get(jobid);
@@ -3955,6 +3976,7 @@ public class JobTracker implements MRCon
}
return completedJobStatusStore.readJobProfile(jobid);
}
+
public JobStatus getJobStatus(JobID jobid) {
if (null == jobid) {
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
@@ -3963,6 +3985,8 @@ public class JobTracker implements MRCon
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
+ // Safe to call JobInProgress.getStatus while holding the lock
+ // on the JobTracker since it isn't a synchronized method
return job.getStatus();
} else {
@@ -3974,19 +3998,24 @@ public class JobTracker implements MRCon
}
return completedJobStatusStore.readJobStatus(jobid);
}
+
+ private static final Counters EMPTY_COUNTERS = new Counters();
public Counters getJobCounters(JobID jobid) {
synchronized (this) {
JobInProgress job = jobs.get(jobid);
if (job != null) {
- return job.getCounters();
+ return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
}
}
return completedJobStatusStore.readCounters(jobid);
}
+
+ private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
+
public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
- if (job == null) {
- return new TaskReport[0];
+ if (job == null || !isJobInited(job)) {
+ return EMPTY_TASK_REPORTS;
} else {
Vector<TaskReport> reports = new Vector<TaskReport>();
Vector<TaskInProgress> completeMapTasks =
@@ -4007,8 +4036,8 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
- if (job == null) {
- return new TaskReport[0];
+ if (job == null || !isJobInited(job)) {
+ return EMPTY_TASK_REPORTS;
} else {
Vector<TaskReport> reports = new Vector<TaskReport>();
Vector completeReduceTasks = job.reportTasksInProgress(false, true);
@@ -4027,8 +4056,8 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
- if (job == null) {
- return new TaskReport[0];
+ if (job == null || !isJobInited(job)) {
+ return EMPTY_TASK_REPORTS;
} else {
Vector<TaskReport> reports = new Vector<TaskReport>();
Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
@@ -4050,8 +4079,8 @@ public class JobTracker implements MRCon
public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
JobInProgress job = jobs.get(jobid);
- if (job == null) {
- return new TaskReport[0];
+ if (job == null || !isJobInited(job)) {
+ return EMPTY_TASK_REPORTS;
} else {
Vector<TaskReport> reports = new Vector<TaskReport>();
Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
@@ -4070,8 +4099,6 @@ public class JobTracker implements MRCon
}
}
- TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
-
static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
"mapred.cluster.map.memory.mb";
static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
@@ -4087,21 +4114,22 @@ public class JobTracker implements MRCon
* starting from fromEventId.
* @see
org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String,
int, int)
*/
- public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+ public TaskCompletionEvent[] getTaskCompletionEvents(
JobID jobid, int fromEventId, int maxEvents) throws IOException{
- synchronized (this) {
- JobInProgress job = this.jobs.get(jobid);
- if (null != job) {
- if (job.inited()) {
- return job.getTaskCompletionEvents(fromEventId, maxEvents);
- } else {
- return EMPTY_EVENTS;
- }
- }
+ JobInProgress job = this.jobs.get(jobid);
+
+ if (null != job) {
+ return isJobInited(job) ?
+ job.getTaskCompletionEvents(fromEventId, maxEvents) :
+ TaskCompletionEvent.EMPTY_ARRAY;
}
- return completedJobStatusStore.readJobTaskCompletionEvents(jobid,
fromEventId, maxEvents);
+
+ return completedJobStatusStore.readJobTaskCompletionEvents(jobid,
+ fromEventId,
+ maxEvents);
}
+ private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0];
/**
* Get the diagnostics for a given task
* @param taskId the id of the task
@@ -4113,7 +4141,7 @@ public class JobTracker implements MRCon
JobID jobId = taskId.getJobID();
TaskID tipId = taskId.getTaskID();
JobInProgress job = jobs.get(jobId);
- if (job != null) {
+ if (job != null && isJobInited(job)) {
TaskInProgress tip = job.getTaskInProgress(tipId);
if (tip != null) {
taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
@@ -4121,8 +4149,8 @@ public class JobTracker implements MRCon
}
- return ((taskDiagnosticInfo == null) ? new String[0]
- : taskDiagnosticInfo.toArray(new String[0]));
+ return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS :
+ taskDiagnosticInfo.toArray(new
String[taskDiagnosticInfo.size()]));
}
/** Get all the TaskStatuses from the tipid. */
@@ -4722,8 +4750,8 @@ public class JobTracker implements MRCon
boolean invalidJob = false;
String msg = "";
- long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
- long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+ long maxMemForMapTask = job.getMemoryForMapTask();
+ long maxMemForReduceTask = job.getMemoryForReduceTask();
if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
|| maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {