Author: omalley
Date: Fri Mar 4 03:48:03 2011
New Revision: 1077167
URL: http://svn.apache.org/viewvc?rev=1077167&view=rev
Log:
commit db8f1bf2ddc04574c8a7a1b9844ddfdacafcfa4c
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Sun Feb 14 14:50:27 2010 +0530
HADOOP:2141 from
https://issues.apache.org/jira/secure/attachment/12435253/hadoop-2141-yahoo-v1.4.8.patch
(only test related changes)
+++ b/YAHOO-CHANGES.txt
+ HADOOP-2141. Backport changes made in the original JIRA to aid
+ fast unit tests in Map/Reduce. (Amar Kamat via yhemanth)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Mar 4 03:48:03 2011
@@ -338,7 +338,8 @@ public class TestCapacityScheduler exten
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
boolean isMap, FakeJobInProgress job) {
- super(jId, "", JobSplit.EMPTY_TASK_SPLIT, null, jobConf, job, 0, 1);
+ super(jId, "", JobSplit.EMPTY_TASK_SPLIT, job.jobtracker, jobConf, job,
+ 0, 1);
this.isMap = isMap;
this.fakeJob = job;
activeTasks = new TreeMap<TaskAttemptID, String>();
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Fri Mar 4 03:48:03 2011
@@ -87,15 +87,6 @@ public class FairScheduler extends TaskS
double reduceFairShare = 0; // Fair share of reduce slots at last update
}
- /**
- * A clock class - can be mocked out for testing.
- */
- static class Clock {
- long getTime() {
- return System.currentTimeMillis();
- }
- }
-
public FairScheduler() {
this(new Clock(), true);
}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Fri Mar 4 03:48:03 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.JobStatu
import org.apache.hadoop.mapred.FairScheduler.JobInfo;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
public class TestFairScheduler extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -236,19 +237,6 @@ public class TestFairScheduler extends T
}
}
- protected class FakeClock extends FairScheduler.Clock {
- private long time = 0;
-
- public void advance(long millis) {
- time += millis;
- }
-
- @Override
- long getTime() {
- return time;
- }
- }
-
protected JobConf conf;
protected FairScheduler scheduler;
private FakeTaskTrackerManager taskTrackerManager;
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java?rev=1077167&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
(added)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Clock.java
Fri Mar 4 03:48:03 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+class Clock {
+ long getTime() {
+ return System.currentTimeMillis();
+ }
+}
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
Fri Mar 4 03:48:03 2011
@@ -111,6 +111,15 @@ public class ClusterStatus implements Wr
}
/**
+ * Construct a new cluster status.
+ * @param trackers no. of tasktrackers in the cluster
+ * @param blacklists no of blacklisted task trackers in the cluster
+ * @param ttExpiryInterval the tasktracker expiry interval
+ * @param maps no. of currently running map-tasks in the cluster
+ * @param reduces no. of currently running reduce-tasks in the cluster
+ * @param maxMaps the maximum no. of map tasks in the cluster
+ * @param maxReduces the maximum no. of reduce tasks in the cluster
+ * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
* @param numDecommissionedNodes number of decommission trackers
*/
ClusterStatus(int trackers, int blacklists, long ttExpiryInterval,
@@ -151,6 +160,15 @@ public class ClusterStatus implements Wr
}
/**
+ * Construct a new cluster status.
+ * @param activeTrackers active tasktrackers in the cluster
+ * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+ * @param ttExpiryInterval the tasktracker expiry interval
+ * @param maps no. of currently running map-tasks in the cluster
+ * @param reduces no. of currently running reduce-tasks in the cluster
+ * @param maxMaps the maximum no. of map tasks in the cluster
+ * @param maxReduces the maximum no. of reduce tasks in the cluster
+ * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
* @param numDecommissionNodes number of decommission trackers
*/
ClusterStatus(Collection<String> activeTrackers,
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar 4 03:48:03 2011
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -286,6 +288,17 @@ class JobInProgress {
this.anyCacheLevel = this.maxLevel+1;
this.jobtracker = tracker;
this.restartCount = 0;
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+ this.nonLocalMaps = new LinkedList<TaskInProgress>();
+ this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+ this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+ this.nonRunningReduces = new LinkedList<TaskInProgress>();
+ this.runningReduces = new LinkedHashSet<TaskInProgress>();
+ this.resourceEstimator = new ResourceEstimator(this);
+ this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+ this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+ (numMapTasks + numReduceTasks + 10);
try {
this.userUGI = UserGroupInformation.getCurrentUser();
} catch (IOException ie){
@@ -319,7 +332,7 @@ class JobInProgress {
this.jobtracker = jobtracker;
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
- this.startTime = System.currentTimeMillis();
+ this.startTime = jobtracker.getClock().getTime();
status.setStartTime(startTime);
this.localFs = jobtracker.getLocalFileSystem();
@@ -430,7 +443,7 @@ class JobInProgress {
for (int i = 0; i < splits.length; i++) {
String[] splitLocations = splits[i].getLocations();
- if (splitLocations.length == 0) {
+ if (splitLocations == null || splitLocations.length == 0) {
nonLocalMaps.add(maps[i]);
continue;
}
@@ -588,7 +601,7 @@ class JobInProgress {
}
// set the launch time
- this.launchTime = System.currentTimeMillis();
+ this.launchTime = jobtracker.getClock().getTime();
//
// Create reduce tasks
@@ -1586,7 +1599,7 @@ class JobInProgress {
Map<TaskTracker, FallowSlotInfo> map =
(type == TaskType.MAP) ? trackersReservedForMaps :
trackersReservedForReduces;
- long now = System.currentTimeMillis();
+ long now = jobtracker.getClock().getTime();
FallowSlotInfo info = map.get(taskTracker);
int reservedSlots = 0;
@@ -1632,7 +1645,7 @@ class JobInProgress {
return;
}
- long now = System.currentTimeMillis();
+ long now = jobtracker.getClock().getTime();
Enum<Counter> counter =
(type == TaskType.MAP) ?
@@ -1720,7 +1733,7 @@ class JobInProgress {
String[] splitLocations = tip.getSplitLocations();
// Remove the TIP from the list for running non-local maps
- if (splitLocations.length == 0) {
+ if (splitLocations == null || splitLocations.length == 0) {
nonLocalRunningMaps.remove(tip);
return;
}
@@ -1760,7 +1773,7 @@ class JobInProgress {
* Adds a map tip to the list of running maps.
* @param tip the tip that needs to be scheduled as running
*/
- private synchronized void scheduleMap(TaskInProgress tip) {
+ protected synchronized void scheduleMap(TaskInProgress tip) {
if (runningMapCache == null) {
LOG.warn("Running cache for maps is missing!! "
@@ -1770,7 +1783,7 @@ class JobInProgress {
String[] splitLocations = tip.getSplitLocations();
// Add the TIP to the list of non-local running TIPs
- if (splitLocations.length == 0) {
+ if (splitLocations == null || splitLocations.length == 0) {
nonLocalRunningMaps.add(tip);
return;
}
@@ -1795,7 +1808,7 @@ class JobInProgress {
* Adds a reduce tip to the list of running reduces
* @param tip the tip that needs to be scheduled as running
*/
- private synchronized void scheduleReduce(TaskInProgress tip) {
+ protected synchronized void scheduleReduce(TaskInProgress tip) {
if (runningReduces == null) {
LOG.warn("Running cache for reducers missing!! "
+ "Job details are missing.");
@@ -1822,7 +1835,7 @@ class JobInProgress {
String[] splitLocations = tip.getSplitLocations();
// Add the TIP in the front of the list for non-local non-running maps
- if (splitLocations.length == 0) {
+ if (splitLocations == null || splitLocations.length == 0) {
nonLocalMaps.add(0, tip);
return;
}
@@ -2106,7 +2119,7 @@ class JobInProgress {
//
if (hasSpeculativeMaps) {
- long currentTime = System.currentTimeMillis();
+ long currentTime = jobtracker.getClock().getTime();
// 1. Check bottom up for speculative tasks from the running cache
if (node != null) {
@@ -2214,7 +2227,7 @@ class JobInProgress {
// 2. check for a reduce tip to be speculated
if (hasSpeculativeReduces) {
tip = findSpeculativeTask(runningReduces, tts, avgProgress,
- System.currentTimeMillis(), false);
+ jobtracker.getClock().getTime(), false);
if (tip != null) {
scheduleReduce(tip);
return tip.getIdWithinJob();
@@ -2435,7 +2448,7 @@ class JobInProgress {
if (reduces.length == 0) {
this.status.setReduceProgress(1.0f);
}
- this.finishTime = System.currentTimeMillis();
+ this.finishTime = jobtracker.getClock().getTime();
LOG.info("Job " + this.status.getJobID() +
" has completed successfully.");
@@ -2460,7 +2473,7 @@ class JobInProgress {
private synchronized void terminateJob(int jobTerminationState) {
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
- this.finishTime = System.currentTimeMillis();
+ this.finishTime = jobtracker.getClock().getTime();
this.status.setMapProgress(1.0f);
this.status.setReduceProgress(1.0f);
this.status.setCleanupProgress(1.0f);
@@ -2848,10 +2861,10 @@ class JobInProgress {
// update the actual start-time of the attempt
TaskStatus oldStatus = tip.getTaskStatus(taskid);
long startTime = oldStatus == null
- ? System.currentTimeMillis()
+ ? jobtracker.getClock().getTime()
: oldStatus.getStartTime();
status.setStartTime(startTime);
- status.setFinishTime(System.currentTimeMillis());
+ status.setFinishTime(jobtracker.getClock().getTime());
boolean wasComplete = tip.isComplete();
updateTaskStatus(tip, status);
boolean isComplete = tip.isComplete();
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar 4 03:48:03 2011
@@ -191,6 +191,8 @@ public class JobTracker implements MRCon
// system files should have 700 permission
final static FsPermission SYSTEM_FILE_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx------
+
+ private Clock clock;
private TokenStorage tokenStorage;
private final JobTokenSecretManager jobTokenSecretManager
@@ -225,6 +227,10 @@ public class JobTracker implements MRCon
private int nextJobId = 1;
public static final Log LOG = LogFactory.getLog(JobTracker.class);
+
+ public Clock getClock() {
+ return clock;
+ }
/**
* Start the JobTracker with given configuration.
@@ -314,7 +320,7 @@ public class JobTracker implements MRCon
try {
// Every 3 minutes check for any tasks that are overdue
Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
LOG.debug("Starting launching task sweep");
synchronized (JobTracker.this) {
synchronized (launchingTasks) {
@@ -368,7 +374,7 @@ public class JobTracker implements MRCon
public void addNewTask(TaskAttemptID taskName) {
synchronized (launchingTasks) {
launchingTasks.put(taskName,
- System.currentTimeMillis());
+ clock.getTime());
}
}
@@ -412,7 +418,7 @@ public class JobTracker implements MRCon
synchronized (JobTracker.this) {
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
TaskTrackerStatus leastRecent = null;
while ((trackerExpiryQueue.size() > 0) &&
(leastRecent = trackerExpiryQueue.first()) != null &&
@@ -548,7 +554,7 @@ public class JobTracker implements MRCon
try {
Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
long retireBefore = now - RETIRE_JOB_INTERVAL;
synchronized (jobs) {
@@ -636,9 +642,9 @@ public class JobTracker implements MRCon
private boolean isHealthy;
private HashMap<ReasonForBlackListing, String>rfbMap;
- FaultInfo() {
+ FaultInfo(long time) {
numFaults = 0;
- lastUpdated = System.currentTimeMillis();
+ lastUpdated = time;
blacklisted = false;
rfbMap = new HashMap<ReasonForBlackListing, String>();
}
@@ -730,7 +736,7 @@ public class JobTracker implements MRCon
int numFaults = fi.getFaultCount();
++numFaults;
fi.setFaultCount(numFaults);
- fi.setLastUpdated(System.currentTimeMillis());
+ fi.setLastUpdated(clock.getTime());
if (exceedsFaults(fi)) {
LOG.info("Adding " + hostName + " to the blacklist"
+ " across all jobs");
@@ -814,7 +820,7 @@ public class JobTracker implements MRCon
boolean createIfNeccessary) {
FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
if (fi == null && createIfNeccessary) {
- fi = new FaultInfo();
+ fi = new FaultInfo(clock.getTime());
potentiallyFaultyTrackers.put(hostName, fi);
}
return fi;
@@ -1362,7 +1368,7 @@ public class JobTracker implements MRCon
TaskTrackerStatus ttStatus =
new TaskTrackerStatus(trackerName, trackerHostName, port,
ttStatusList,
0 , 0, 0);
- ttStatus.setLastSeen(System.currentTimeMillis());
+ ttStatus.setLastSeen(clock.getTime());
synchronized (JobTracker.this) {
synchronized (taskTrackers) {
@@ -1688,7 +1694,7 @@ public class JobTracker implements MRCon
}
}
- long recoveryStartTime = System.currentTimeMillis();
+ long recoveryStartTime = clock.getTime();
// II. Recover each job
idIter = jobsToRecover.iterator();
@@ -1745,14 +1751,14 @@ public class JobTracker implements MRCon
}
}
- recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
+ recoveryDuration = clock.getTime() - recoveryStartTime;
hasRecovered = true;
// III. Finalize the recovery
synchronized (trackerExpiryQueue) {
// Make sure that the tracker statuses in the expiry-tracker queue
// are updated
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
int size = trackerExpiryQueue.size();
for (int i = 0; i < size ; ++i) {
// Get the first tasktracker
@@ -1947,6 +1953,12 @@ public class JobTracker implements MRCon
JobTracker(final JobConf conf, String identifier)
throws IOException, InterruptedException {
+ this(conf, identifier, new Clock());
+ }
+
+ JobTracker(final JobConf conf, String identifier, Clock clock)
+ throws IOException, InterruptedException {
+ this.clock = clock;
// find the owner of the process
// get the desired principal to load
String keytabFilename = conf.get(JT_KEYTAB_FILE);
@@ -2064,7 +2076,7 @@ public class JobTracker implements MRCon
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
String infoBindAddress = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
- this.startTime = System.currentTimeMillis();
+ this.startTime = clock.getTime();
infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
tmpInfoPort == 0, conf);
infoServer.setAttribute("job.tracker", this);
@@ -2598,7 +2610,7 @@ public class JobTracker implements MRCon
final JobTrackerInstrumentation metrics = getInstrumentation();
metrics.finalizeJob(conf, id);
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
// mark the job for cleanup at all the trackers
addJobForCleanup(id);
@@ -2979,7 +2991,7 @@ public class JobTracker implements MRCon
// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
- long now = System.currentTimeMillis();
+ long now = clock.getTime();
boolean isBlacklisted = false;
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Fri Mar 4 03:48:03 2011
@@ -226,7 +226,7 @@ class TaskInProgress {
* Initialization common to Map and Reduce
*/
void init(JobID jobId) {
- this.startTime = System.currentTimeMillis();
+ this.startTime = jobtracker.getClock().getTime();
this.id = new TaskID(jobId, isMapTask(), partition);
this.skipping = startSkipping();
}
@@ -635,7 +635,7 @@ class TaskInProgress {
// tasktracker went down and failed time was not reported.
if (0 == status.getFinishTime()){
- status.setFinishTime(System.currentTimeMillis());
+ status.setFinishTime(jobtracker.getClock().getTime());
}
}
@@ -740,7 +740,7 @@ class TaskInProgress {
//
this.completes++;
- this.execFinishTime = System.currentTimeMillis();
+ this.execFinishTime = jobtracker.getClock().getTime();
recomputeProgress();
}
@@ -779,7 +779,7 @@ class TaskInProgress {
}
this.failed = true;
killed = true;
- this.execFinishTime = System.currentTimeMillis();
+ this.execFinishTime = jobtracker.getClock().getTime();
recomputeProgress();
}
@@ -903,7 +903,7 @@ class TaskInProgress {
public Task getTaskToRun(String taskTracker) throws IOException {
if (0 == execStartTime){
// assume task starts running now
- execStartTime = System.currentTimeMillis();
+ execStartTime = jobtracker.getClock().getTime();
}
// Create the 'taskid'; do not count the 'killed' tasks against the job!
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/findbugsExcludeFile.xml
Fri Mar 4 03:48:03 2011
@@ -89,4 +89,16 @@
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
+ <!--
+ JobTracker's static variables should be ignored
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.mapred.JobTracker" />
+ <Or>
+ <Field name="RETIRE_JOB_INTERVAL" />
+ <Field name="TASKTRACKER_EXPIRY_INTERVAL" />
+ <Field name="RETIRE_JOB_CHECK_INTERVAL" />
+ </Or>
+ <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
+ </Match>
</FindBugsFilter>
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
Fri Mar 4 03:48:03 2011
@@ -48,7 +48,7 @@ public class TestResourceEstimation exte
JobSplit.TaskSplitMetaInfo split =
new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip =
- new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+ new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
}
assertEquals(2* singleMapOutputSize, re.getEstimatedMapOutputSize());
@@ -86,7 +86,7 @@ public class TestResourceEstimation exte
new JobSplit.TaskSplitMetaInfo(new String[0], 0,
singleMapInputSize);
TaskInProgress tip =
- new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+ new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
}
@@ -99,7 +99,7 @@ public class TestResourceEstimation exte
JobSplit.TaskSplitMetaInfo split =
new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip =
- new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
+ new TaskInProgress(jid, "", split, jip.jobtracker, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
long expectedTotalMapOutSize = (singleMapOutputSize*11) *
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077167&r1=1077166&r2=1077167&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
(original)
+++
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
Fri Mar 4 03:48:03 2011
@@ -30,7 +30,6 @@ import java.util.Properties;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -678,6 +677,19 @@ public class UtilsForTests {
return job;
}
+ static class FakeClock extends Clock {
+ long time = 0;
+
+ public void advance(long millis) {
+ time += millis;
+ }
+
+ @Override
+ long getTime() {
+ return time;
+ }
+ }
+
// Mapper that fails
static class FailMapper extends MapReduceBase implements
Mapper<WritableComparable, Writable, WritableComparable, Writable> {