Author: tomwhite Date: Wed Apr 18 13:51:47 2007 New Revision: 530153 URL: http://svn.apache.org/viewvc?view=rev&rev=530153 Log: HADOOP-1190. Fix unchecked warnings in mapred package.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/DefaultJobHistoryParser.java Wed Apr 18 13:51:47 2007 @@ -158,9 +158,11 @@ // call this only for jobs that succeeded for better results. static class BadNodesFilter implements JobHistory.Listener { - private Map<String, Set<String>> badNodesToNumFaiedTasks = new HashMap(); + private Map<String, Set<String>> badNodesToNumFailedTasks = + new HashMap<String, Set<String>>(); + Map<String, Set<String>> getValues(){ - return badNodesToNumFaiedTasks; + return badNodesToNumFailedTasks; } public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values) throws IOException { @@ -171,11 +173,11 @@ if( Values.FAILED.name().equals(values.get(Keys.TASK_STATUS) ) ){ String hostName = values.get(Keys.HOSTNAME) ; String taskid = values.get(Keys.TASKID); - Set tasks = badNodesToNumFaiedTasks.get(hostName); + Set<String> tasks = badNodesToNumFailedTasks.get(hostName); if( null == tasks ){ - tasks = new TreeSet(); + tasks = new TreeSet<String>(); tasks.add(taskid); - badNodesToNumFaiedTasks.put(hostName, tasks); + badNodesToNumFailedTasks.put(hostName, tasks); }else{ tasks.add(taskid); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Wed Apr 18 13:51:47 2007 @@ -81,7 +81,7 @@ if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } - List<Path> result = new ArrayList(); + List<Path> result = new ArrayList<Path>(); for (Path p: dirs) { FileSystem fs = p.getFileSystem(job); Path[] matches = @@ -100,7 +100,7 @@ throw new IOException("No input paths specified in input"); } - List<IOException> result = new ArrayList(); + List<IOException> result = new ArrayList<IOException>(); int totalFiles = 0; for (Path p: inputDirs) { FileSystem fs = p.getFileSystem(job); @@ -161,7 +161,8 @@ long minSize = Math.max(job.getLong("mapred.min.split.size", 1), minSplitSize); - ArrayList splits = new ArrayList(numSplits); // generate splits + // generate splits + ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); for (int i = 0; i < files.length; i++) { Path file = files[i]; FileSystem fs = file.getFileSystem(job); @@ -188,7 +189,7 @@ } } LOG.debug( "Total # of splits: " + splits.size() ); - return (FileSplit[])splits.toArray(new FileSplit[splits.size()]); + return splits.toArray(new FileSplit[splits.size()]); } private static long computeSplitSize(long goalSize, long minSize, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Apr 18 13:51:47 2007 @@ -88,7 +88,7 @@ private static ClassLoader makeClassLoader(JobConf conf, File workDir) throws IOException { - List cp = new ArrayList(); + List<URL> cp = new ArrayList<URL>(); String jar = conf.getJar(); if (jar != null) { // if jar exists, it into workDir @@ -101,7 +101,7 @@ cp.add(new URL("file:" + new File(workDir, "classes/").toString())); cp.add(new URL("file:" + workDir.toString() + "/")); } - return new URLClassLoader((URL[]) cp.toArray(new URL[cp.size()])); + return new URLClassLoader(cp.toArray(new URL[cp.size()])); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Apr 18 13:51:47 2007 @@ -334,11 +334,11 @@ job.getInputFormat().getSplits(job, job.getNumMapTasks()); // sort the splits into order based on size, so that the biggest // go first - Arrays.sort(splits, new Comparator() { - public int compare(Object a, Object b) { + Arrays.sort(splits, new Comparator<InputSplit>() { + public int compare(InputSplit a, InputSplit b) { try { - long left = ((InputSplit) a).getLength(); - long right = ((InputSplit) b).getLength(); + long left = a.getLength(); + long right = b.getLength(); if (left == right) { return 0; } else if (left < right) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Apr 18 13:51:47 2007 @@ -64,7 +64,8 @@ int failedMapTasks = 0 ; int failedReduceTasks = 0 ; JobTracker jobtracker = null; - Map<String,List<TaskInProgress>> hostToMaps = new HashMap(); + Map<String,List<TaskInProgress>> hostToMaps = + new HashMap<String,List<TaskInProgress>>(); private int taskCompletionEventTracker = 0 ; List<TaskCompletionEvent> taskCompletionEvents ; @@ -120,7 +121,7 @@ this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); - this.taskCompletionEvents = new ArrayList( + this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>( numMapTasks + numReduceTasks + 10); JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), @@ -184,7 +185,7 @@ for(String host: splits[i].getLocations()) { List<TaskInProgress> hostMaps = hostToMaps.get(host); if (hostMaps == null) { - hostMaps = new ArrayList(); + hostMaps = new ArrayList<TaskInProgress>(); hostToMaps.put(host, hostMaps); } hostMaps.add(maps[i]); @@ -280,10 +281,12 @@ } /** - * Return a treeset of completed TaskInProgress objects + * Return a vector of completed TaskInProgress objects */ - public Vector reportTasksInProgress(boolean shouldBeMap, boolean shouldBeComplete) { - Vector results = new Vector(); + public Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap, + boolean shouldBeComplete) { + + Vector<TaskInProgress> results = new Vector<TaskInProgress>(); TaskInProgress tips[] = null; if (shouldBeMap) { tips = maps; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Apr 18 13:51:47 2007 @@ -158,7 +158,8 @@ * but that have not yet been seen in a status report. * map: task-id (String) -> time-assigned (Long) */ - private Map launchingTasks = new LinkedHashMap(); + private Map<String, Long> launchingTasks = + new LinkedHashMap<String, Long>(); public void run() { while (shouldRun) { @@ -169,16 +170,17 @@ LOG.debug("Starting launching task sweep"); synchronized (JobTracker.this) { synchronized (launchingTasks) { - Iterator itr = launchingTasks.entrySet().iterator(); + Iterator<Map.Entry<String, Long>> itr = + launchingTasks.entrySet().iterator(); while (itr.hasNext()) { - Map.Entry pair = (Map.Entry) itr.next(); - String taskId = (String) pair.getKey(); - long age = now - ((Long) pair.getValue()).longValue(); + Map.Entry<String, Long> pair = itr.next(); + String taskId = pair.getKey(); + long age = now - (pair.getValue()).longValue(); LOG.info(taskId + " is " + age + " ms debug."); if (age > TASKTRACKER_EXPIRY_INTERVAL) { LOG.info("Launching task " + taskId + " timed out."); TaskInProgress tip = null; - tip = (TaskInProgress) taskidToTIPMap.get(taskId); + tip = taskidToTIPMap.get(taskId); if (tip != null) { JobInProgress job = tip.getJob(); String trackerName = getAssignedTracker(taskId); @@ -269,7 +271,7 @@ long now = System.currentTimeMillis(); TaskTrackerStatus leastRecent = null; while ((trackerExpiryQueue.size() > 0) && - ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) && + ((leastRecent = trackerExpiryQueue.first()) != null) && (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) { // Remove profile from head of queue @@ -277,7 +279,7 @@ String trackerName = leastRecent.getTrackerName(); // Figure out if last-seen time should be updated, or if tracker is dead - TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName()); + TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName()); // Items might leave the taskTracker set through other means; the // status stored in 'taskTrackers' might be null, which means the // tracker has already been destroyed. @@ -328,7 +330,7 @@ while (shouldRun) { try { Thread.sleep(RETIRE_JOB_CHECK_INTERVAL); - List<JobInProgress> retiredJobs = new ArrayList(); + List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>(); long retireBefore = System.currentTimeMillis() - RETIRE_JOB_INTERVAL; synchronized (jobsByArrival) { @@ -507,27 +509,31 @@ // // All the known jobs. (jobid->JobInProgress) - Map<String, JobInProgress> jobs = new TreeMap(); - List<JobInProgress> jobsByArrival = new ArrayList(); + Map<String, JobInProgress> jobs = new TreeMap<String, JobInProgress>(); + List<JobInProgress> jobsByArrival = new ArrayList<JobInProgress>(); // (user -> list of JobInProgress) - TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap(); + TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = + new TreeMap<String, ArrayList<JobInProgress>>(); // All the known TaskInProgress items, mapped to by taskids (taskid->TIP) - Map<String, TaskInProgress> taskidToTIPMap = new TreeMap(); + Map<String, TaskInProgress> taskidToTIPMap = + new TreeMap<String, TaskInProgress>(); // (taskid --> trackerID) - TreeMap taskidToTrackerMap = new TreeMap(); + TreeMap<String, String> taskidToTrackerMap = new TreeMap<String, String>(); // (trackerID->TreeSet of taskids running at that tracker) - TreeMap trackerToTaskMap = new TreeMap(); + TreeMap<String, Set<String>> trackerToTaskMap = + new TreeMap<String, Set<String>>(); // (trackerID -> TreeSet of completed taskids running at that tracker) - TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap(); + TreeMap<String, Set<String>> trackerToMarkedTasksMap = + new TreeMap<String, Set<String>>(); // (trackerID --> last sent HeartBeatResponse) Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = - new TreeMap(); + new TreeMap<String, HeartbeatResponse>(); // // Watch and expire TaskTracker objects using these structures. @@ -535,8 +541,9 @@ // int totalMaps = 0; int totalReduces = 0; - private TreeMap taskTrackers = new TreeMap(); - List<JobInProgress> jobInitQueue = new ArrayList(); + private TreeMap<String, TaskTrackerStatus> taskTrackers = + new TreeMap<String, TaskTrackerStatus>(); + List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>(); ExpireTrackers expireTrackers = new ExpireTrackers(); Thread expireTrackersThread = null; RetireJobs retireJobs = new RetireJobs(); @@ -556,19 +563,20 @@ * object has been updated in the taskTracker table, the latest status is * reinserted. Otherwise, we assume the tracker has expired. */ - TreeSet trackerExpiryQueue = new TreeSet(new Comparator() { - public int compare(Object o1, Object o2) { - TaskTrackerStatus p1 = (TaskTrackerStatus) o1; - TaskTrackerStatus p2 = (TaskTrackerStatus) o2; - if (p1.getLastSeen() < p2.getLastSeen()) { - return -1; - } else if (p1.getLastSeen() > p2.getLastSeen()) { - return 1; - } else { - return (p1.getTrackerName().compareTo(p2.getTrackerName())); + TreeSet<TaskTrackerStatus> trackerExpiryQueue = + new TreeSet<TaskTrackerStatus>( + new Comparator<TaskTrackerStatus>() { + public int compare(TaskTrackerStatus p1, TaskTrackerStatus p2) { + if (p1.getLastSeen() < p2.getLastSeen()) { + return -1; + } else if (p1.getLastSeen() > p2.getLastSeen()) { + return 1; + } else { + return (p1.getTrackerName().compareTo(p2.getTrackerName())); + } } } - }); + ); // Used to provide an HTML view on Job, Task, and TaskTracker structures StatusHttpServer infoServer; @@ -746,9 +754,9 @@ taskidToTrackerMap.put(taskid, taskTracker); // tracker --> taskid - TreeSet taskset = (TreeSet) trackerToTaskMap.get(taskTracker); + Set<String> taskset = trackerToTaskMap.get(taskTracker); if (taskset == null) { - taskset = new TreeSet(); + taskset = new TreeSet<String>(); trackerToTaskMap.put(taskTracker, taskset); } taskset.add(taskid); @@ -759,11 +767,11 @@ void removeTaskEntry(String taskid) { // taskid --> tracker - String tracker = (String) taskidToTrackerMap.remove(taskid); + String tracker = taskidToTrackerMap.remove(taskid); // tracker --> taskid if (tracker != null) { - TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker); + Set<String> trackerSet = trackerToTaskMap.get(tracker); if (trackerSet != null) { trackerSet.remove(taskid); } @@ -784,9 +792,9 @@ */ void markCompletedTaskAttempt(String taskTracker, String taskid) { // tracker --> taskid - TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker); + Set<String> taskset = trackerToMarkedTasksMap.get(taskTracker); if (taskset == null) { - taskset = new TreeSet(); + taskset = new TreeSet<String>(); trackerToMarkedTasksMap.put(taskTracker, taskset); } taskset.add(taskid); @@ -828,8 +836,8 @@ */ private void removeMarkedTasks(String taskTracker) { // Purge all the 'marked' tasks which were running at taskTracker - TreeSet<String> markedTaskSet = - (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker); + Set<String> markedTaskSet = + trackerToMarkedTasksMap.get(taskTracker); if (markedTaskSet != null) { for (String taskid : markedTaskSet) { removeTaskEntry(taskid); @@ -954,8 +962,8 @@ public long getStartTime() { return startTime; } - public Vector runningJobs() { - Vector v = new Vector(); + public Vector<JobInProgress> runningJobs() { + Vector<JobInProgress> v = new Vector<JobInProgress>(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); @@ -974,8 +982,8 @@ return (List<JobInProgress>) runningJobs(); } } - public Vector failedJobs() { - Vector v = new Vector(); + public Vector<JobInProgress> failedJobs() { + Vector<JobInProgress> v = new Vector<JobInProgress>(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); @@ -985,8 +993,8 @@ } return v; } - public Vector completedJobs() { - Vector v = new Vector(); + public Vector<JobInProgress> completedJobs() { + Vector<JobInProgress> v = new Vector<JobInProgress>(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); @@ -1003,7 +1011,7 @@ } public TaskTrackerStatus getTaskTracker(String trackerID) { synchronized (taskTrackers) { - return (TaskTrackerStatus) taskTrackers.get(trackerID); + return taskTrackers.get(trackerID); } } @@ -1075,7 +1083,7 @@ // Initialize the response to be sent for the heartbeat HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); - List<TaskTrackerAction> actions = new ArrayList(); + List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); // Check for new tasks to be executed on the tasktracker if (acceptNewTasks) { @@ -1140,8 +1148,7 @@ */ private boolean updateTaskTrackerStatus(String trackerName, TaskTrackerStatus status) { - TaskTrackerStatus oldStatus = - (TaskTrackerStatus) taskTrackers.get(trackerName); + TaskTrackerStatus oldStatus = taskTrackers.get(trackerName); if (oldStatus != null) { totalMaps -= oldStatus.countMapTasks(); totalReduces -= oldStatus.countReduceTasks(); @@ -1214,7 +1221,7 @@ synchronized (taskTrackers) { numTaskTrackers = taskTrackers.size(); - tts = (TaskTrackerStatus) taskTrackers.get(taskTracker); + tts = taskTrackers.get(taskTracker); } if (tts == null) { LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker); @@ -1346,13 +1353,15 @@ * A tracker wants to know if any of its Tasks have been * closed (because the job completed, whether successfully or not) */ - private synchronized List getTasksToKill(String taskTracker) { - Set<String> taskIds = (TreeSet) trackerToTaskMap.get(taskTracker); + private synchronized List<TaskTrackerAction> getTasksToKill( + String taskTracker) { + + Set<String> taskIds = trackerToTaskMap.get(taskTracker); if (taskIds != null) { - List<TaskTrackerAction> killList = new ArrayList(); - Set<String> killJobIds = new TreeSet(); + List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>(); + Set<String> killJobIds = new TreeSet<String>(); for (String killTaskId : taskIds ) { - TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId); + TaskInProgress tip = taskidToTIPMap.get(killTaskId); if (tip.shouldCloseForClosedJob(killTaskId)) { // // This is how the JobTracker ends a task at the TaskTracker. @@ -1447,12 +1456,12 @@ } public synchronized void killJob(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); job.kill(); } public synchronized JobProfile getJobProfile(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); if (job != null) { return job.getProfile(); } else { @@ -1460,7 +1469,7 @@ } } public synchronized JobStatus getJobStatus(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); if (job != null) { return job.getStatus(); } else { @@ -1468,7 +1477,7 @@ } } public synchronized Counters getJobCounters(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); if (job != null) { return job.getCounters(); } else { @@ -1476,31 +1485,33 @@ } } public synchronized TaskReport[] getMapTaskReports(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); if (job == null) { return new TaskReport[0]; } else { - Vector reports = new Vector(); - Vector completeMapTasks = job.reportTasksInProgress(true, true); + Vector<TaskReport> reports = new Vector<TaskReport>(); + Vector<TaskInProgress> completeMapTasks = + job.reportTasksInProgress(true, true); for (Iterator it = completeMapTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } - Vector incompleteMapTasks = job.reportTasksInProgress(true, false); + Vector<TaskInProgress> incompleteMapTasks = + job.reportTasksInProgress(true, false); for (Iterator it = incompleteMapTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } - return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]); + return reports.toArray(new TaskReport[reports.size()]); } } public synchronized TaskReport[] getReduceTaskReports(String jobid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); if (job == null) { return new TaskReport[0]; } else { - Vector reports = new Vector(); + Vector<TaskReport> reports = new Vector<TaskReport>(); Vector completeReduceTasks = job.reportTasksInProgress(false, true); for (Iterator it = completeReduceTasks.iterator(); it.hasNext(); ) { TaskInProgress tip = (TaskInProgress) it.next(); @@ -1511,7 +1522,7 @@ TaskInProgress tip = (TaskInProgress) it.next(); reports.add(tip.generateSingleReport()); } - return (TaskReport[]) reports.toArray(new TaskReport[reports.size()]); + return reports.toArray(new TaskReport[reports.size()]); } } @@ -1523,7 +1534,7 @@ public synchronized TaskCompletionEvent[] getTaskCompletionEvents( String jobid, int fromEventId, int maxEvents) throws IOException{ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY; - JobInProgress job = (JobInProgress)this.jobs.get(jobid); + JobInProgress job = this.jobs.get(jobid); if (null != job) { events = job.getTaskCompletionEvents(fromEventId, maxEvents); } @@ -1540,7 +1551,7 @@ public synchronized List<String> getTaskDiagnostics(String jobId, String tipId, String taskId) { - JobInProgress job = (JobInProgress) jobs.get(jobId); + JobInProgress job = jobs.get(jobId); if (job == null) { throw new IllegalArgumentException("Job " + jobId + " not found."); } @@ -1577,7 +1588,7 @@ * Returns specified TaskInProgress, or null. */ private TaskInProgress getTip(String jobid, String tipid) { - JobInProgress job = (JobInProgress) jobs.get(jobid); + JobInProgress job = jobs.get(jobid); return (job == null ? null : (TaskInProgress) job.getTaskInProgress(tipid)); } @@ -1588,11 +1599,11 @@ * @return The name of the task tracker */ public synchronized String getAssignedTracker(String taskId) { - return (String) taskidToTrackerMap.get(taskId); + return taskidToTrackerMap.get(taskId); } public JobStatus[] jobsToComplete() { - Vector v = new Vector(); + Vector<JobStatus> v = new Vector<JobStatus>(); for (Iterator it = jobs.values().iterator(); it.hasNext(); ) { JobInProgress jip = (JobInProgress) it.next(); JobStatus status = jip.getStatus(); @@ -1603,14 +1614,14 @@ v.add(status); } } - return (JobStatus[]) v.toArray(new JobStatus[v.size()]); + return v.toArray(new JobStatus[v.size()]); } /////////////////////////////////////////////////////////////// // JobTracker methods /////////////////////////////////////////////////////////////// public JobInProgress getJob(String jobid) { - return (JobInProgress) jobs.get(jobid); + return jobs.get(jobid); } /** * Grab random num for job id @@ -1633,7 +1644,7 @@ for (TaskStatus report : status.getTaskReports()) { report.setTaskTracker(status.getTrackerName()); String taskId = report.getTaskId(); - TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); + TaskInProgress tip = taskidToTIPMap.get(taskId); if (tip == null) { LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskId()); } else { @@ -1650,13 +1661,13 @@ */ void lostTaskTracker(String trackerName, String hostname) { LOG.info("Lost tracker '" + trackerName + "'"); - TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName); + Set<String> lostTasks = trackerToTaskMap.get(trackerName); trackerToTaskMap.remove(trackerName); if (lostTasks != null) { for (Iterator it = lostTasks.iterator(); it.hasNext(); ) { String taskId = (String) it.next(); - TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId); + TaskInProgress tip = taskidToTIPMap.get(taskId); // Completed reduce tasks never need to be failed, because // their outputs go to dfs Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Wed Apr 18 13:51:47 2007 @@ -37,7 +37,7 @@ LogFactory.getLog("org.apache.hadoop.mapred.LocalJobRunner"); private FileSystem fs; - private HashMap jobs = new HashMap(); + private HashMap<String, Job> jobs = new HashMap<String, Job>(); private Configuration conf; private int map_tasks = 0; private int reduce_tasks = 0; @@ -56,7 +56,7 @@ private Random random = new Random(); private JobStatus status; - private ArrayList mapIds = new ArrayList(); + private ArrayList<String> mapIds = new ArrayList<String>(); private MapOutputFile mapoutputFile; private JobProfile profile; private Path localFile; @@ -132,7 +132,7 @@ // move map output to reduce input String reduceId = "reduce_" + newId(); for (int i = 0; i < mapIds.size(); i++) { - String mapId = (String)mapIds.get(i); + String mapId = mapIds.get(i); Path mapOut = this.mapoutputFile.getOutputFile(mapId); Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId); if (!localFs.mkdirs(reduceIn.getParent())) { @@ -252,11 +252,11 @@ } public void killJob(String id) { - ((Thread)jobs.get(id)).stop(); + jobs.get(id).stop(); } public JobProfile getJobProfile(String id) { - Job job = (Job)jobs.get(id); + Job job = jobs.get(id); return job.getProfile(); } @@ -268,12 +268,12 @@ } public JobStatus getJobStatus(String id) { - Job job = (Job)jobs.get(id); + Job job = jobs.get(id); return job.status; } public Counters getJobCounters(String id) { - Job job = (Job)jobs.get(id); + Job job = jobs.get(id); return job.currentCounters; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Apr 18 13:51:47 2007 @@ -496,7 +496,8 @@ Sorter sorter = new Sorter(localFs, keyClass, valClass, job); for (int parts = 0; parts < partitions; parts++){ - List<SegmentDescriptor> segmentList = new ArrayList(numSpills); + List<SegmentDescriptor> segmentList = + new ArrayList<SegmentDescriptor>(numSpills); for(int i = 0; i < numSpills; i++) { FSDataInputStream indexIn = localFs.open(indexFileName[i]); indexIn.seek(parts * 16); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Apr 18 13:51:47 2007 @@ -266,7 +266,7 @@ // since we don't know how many map outputs got merged in memory, we have // to check whether a given map output exists, and if it does, add it in // the list of files to merge, otherwise not. - List <Path> mapFilesList = new ArrayList(); + List<Path> mapFilesList = new ArrayList<Path>(); for(int i=0; i < numMaps; i++) { Path f = mapOutputFile.getInputFile(i, getTaskId()); if (lfs.exists(f)) @@ -423,12 +423,12 @@ /** * the list of map outputs currently being copied */ - private List scheduledCopies; + private List<MapOutputLocation> scheduledCopies; /** * the results of dispatched copy attempts */ - private List copyResults; + private List<CopyResult> copyResults; /** * the number of outputs to copy in parallel @@ -446,12 +446,12 @@ * busy hosts from which copies are being backed off * Map of host -> next contact time */ - private Map penaltyBox; + private Map<String, Long> penaltyBox; /** * the set of unique hosts from which we are copying */ - private Set uniqueHosts; + private Set<String> uniqueHosts; /** * the last time we polled the job tracker @@ -511,7 +511,8 @@ /** * a hashmap from mapId to MapOutputLocation for retrials */ - private Map<Integer, MapOutputLocation> retryFetches = new HashMap(); + private Map<Integer, MapOutputLocation> retryFetches = + new HashMap<Integer, MapOutputLocation>(); /** * a TreeSet for needed map outputs @@ -632,7 +633,7 @@ while (scheduledCopies.isEmpty()) { scheduledCopies.wait(); } - loc = (MapOutputLocation)scheduledCopies.remove(0); + loc = scheduledCopies.remove(0); } try { @@ -774,8 +775,8 @@ configureClasspath(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; - this.scheduledCopies = new ArrayList(100); - this.copyResults = new ArrayList(100); + this.scheduledCopies = new ArrayList<MapOutputLocation>(100); + this.copyResults = new ArrayList<CopyResult>(100); this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5); this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300); this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000); @@ -793,10 +794,10 @@ conf.getMapOutputValueClass(), conf); // hosts -> next contact time - this.penaltyBox = new Hashtable(); + this.penaltyBox = new Hashtable<String, Long>(); // hostnames - this.uniqueHosts = new HashSet(); + this.uniqueHosts = new HashSet<String>(); this.lastPollTime = 0; @@ -896,7 +897,7 @@ while (locIt.hasNext()) { MapOutputLocation loc = (MapOutputLocation)locIt.next(); - Long penaltyEnd = (Long)penaltyBox.get(loc.getHost()); + Long penaltyEnd = penaltyBox.get(loc.getHost()); boolean penalized = false, duplicate = false; if (penaltyEnd != null && currentTime < penaltyEnd.longValue()) { @@ -1086,7 +1087,7 @@ if (copyResults.isEmpty()) { return null; } else { - return (CopyResult) copyResults.remove(0); + return copyResults.remove(0); } } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Apr 18 13:51:47 2007 @@ -89,18 +89,19 @@ // Map from task Id -> TaskTracker Id, contains tasks that are // currently runnings - private TreeMap<String, String> activeTasks = new TreeMap(); + private TreeMap<String, String> activeTasks = new TreeMap<String, String>(); private JobConf conf; private boolean runSpeculative; - private Map<String,List<String>> taskDiagnosticData = new TreeMap(); + private Map<String,List<String>> taskDiagnosticData = + new TreeMap<String,List<String>>(); /** * Map from taskId -> TaskStatus */ private TreeMap<String,TaskStatus> taskStatuses = new TreeMap<String,TaskStatus>(); - private TreeSet machinesWhereFailed = new TreeSet(); - private TreeSet tasksReportedClosed = new TreeSet(); + private TreeSet<String> machinesWhereFailed = new TreeSet<String>(); + private TreeSet<String> tasksReportedClosed = new TreeSet<String>(); private Counters counters = new Counters(); @@ -288,13 +289,13 @@ * component task-threads that have ever been started. */ synchronized TaskReport generateSingleReport() { - ArrayList diagnostics = new ArrayList(); - for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) { - diagnostics.addAll((List)i.next()); + ArrayList<String> diagnostics = new ArrayList<String>(); + for (List<String> l : taskDiagnosticData.values()) { + diagnostics.addAll(l); } TaskReport report = new TaskReport (getTIPId(), (float)progress, state, - (String[])diagnostics.toArray(new String[diagnostics.size()]), + diagnostics.toArray(new String[diagnostics.size()]), execStartTime, execFinishTime, counters); return report ; @@ -326,9 +327,9 @@ boolean changed = true; if (diagInfo != null && diagInfo.length() > 0) { LOG.info("Error from "+taskid+": "+diagInfo); - List diagHistory = (List) taskDiagnosticData.get(taskid); + List<String> diagHistory = taskDiagnosticData.get(taskid); if (diagHistory == null) { - diagHistory = new ArrayList(); + diagHistory = new ArrayList<String>(); taskDiagnosticData.put(taskid, diagHistory); } diagHistory.add(diagInfo); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Apr 18 13:51:47 2007 @@ -115,7 +115,7 @@ boolean shuttingDown = false; - Map<String, TaskInProgress> tasks = new HashMap(); + Map<String, TaskInProgress> tasks = new HashMap<String, TaskInProgress>(); /** * Map from taskId -> TaskInProgress. */ @@ -126,7 +126,7 @@ boolean justStarted = true; //dir -> DF - Map localDirsDf = new HashMap(); + Map<String, DF> localDirsDf = new HashMap<String, DF>(); long minSpaceStart = 0; //must have this much space free to start new tasks boolean acceptNewTasks = true; @@ -186,7 +186,7 @@ * A list of tips that should be cleaned up. */ private BlockingQueue<TaskTrackerAction> tasksToCleanup = - new LinkedBlockingQueue(); + new LinkedBlockingQueue<TaskTrackerAction>(); /** * A daemon-thread that pulls tips off the list of things to cleanup. @@ -231,7 +231,7 @@ if (!runningJobs.containsKey(jobId)) { rJob = new RunningJob(jobId, localJobFile); rJob.localized = false; - rJob.tasks = new HashSet(); + rJob.tasks = new HashSet<TaskInProgress>(); rJob.jobFile = localJobFile; runningJobs.put(jobId, rJob); } else { @@ -297,8 +297,8 @@ // Clear out state tables this.tasks.clear(); - this.runningTasks = new TreeMap(); - this.runningJobs = new TreeMap(); + this.runningTasks = new TreeMap<String, TaskInProgress>(); + this.runningJobs = new TreeMap<String, RunningJob>(); this.mapTotal = 0; this.reduceTotal = 0; this.acceptNewTasks = true; @@ -611,10 +611,10 @@ // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose', // because calling jobHasFinished() may result in an edit to 'tasks'. // - TreeMap tasksToClose = new TreeMap(); + TreeMap<String, TaskInProgress> tasksToClose = + new TreeMap<String, TaskInProgress>(); tasksToClose.putAll(tasks); - for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) { - TaskInProgress tip = (TaskInProgress) it.next(); + for (TaskInProgress tip : tasksToClose.values()) { tip.jobHasFinished(); } @@ -1048,7 +1048,7 @@ for (int i = 0; i < localDirs.length; i++) { DF df = null; if (localDirsDf.containsKey(localDirs[i])) { - df = (DF) localDirsDf.get(localDirs[i]); + df = localDirsDf.get(localDirs[i]); } else { df = new DF(new File(localDirs[i]), fConf); localDirsDf.put(localDirs[i], df); @@ -1648,7 +1648,7 @@ RunningJob(String jobid, Path jobFile) { this.jobid = jobid; localized = false; - tasks = new HashSet(); + tasks = new HashSet<TaskInProgress>(); this.jobFile = jobFile; keepJobFiles = false; } @@ -1768,7 +1768,7 @@ * @return a copy of the list of TaskStatus objects */ synchronized List<TaskStatus> getRunningTaskStatuses() { - List<TaskStatus> result = new ArrayList(runningTasks.size()); + List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size()); for(TaskInProgress tip: runningTasks.values()) { result.add(tip.createStatus()); } @@ -1780,7 +1780,7 @@ * @return */ synchronized List<TaskStatus> getNonRunningTasks() { - List<TaskStatus> result = new ArrayList(tasks.size()); + List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size()); for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) { if (!runningTasks.containsKey(task.getKey())) { result.add(task.getValue().createStatus()); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Apr 18 13:51:47 2007 @@ -51,7 +51,7 @@ /** */ public TaskTrackerStatus() { - taskReports = new ArrayList(); + taskReports = new ArrayList<TaskStatus>(); } /** @@ -63,7 +63,7 @@ this.host = host; this.httpPort = httpPort; - this.taskReports = new ArrayList(taskReports); + this.taskReports = new ArrayList<TaskStatus>(taskReports); this.failures = failures; } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java?view=diff&rev=530153&r1=530152&r2=530153 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/JobControl.java Wed Apr 18 13:51:47 2007 @@ -48,11 +48,11 @@ private int runnerState; // the thread state - private Hashtable waitingJobs; - private Hashtable readyJobs; - private Hashtable runningJobs; - private Hashtable successfulJobs; - private Hashtable failedJobs; + private Hashtable<String, Job> waitingJobs; + private Hashtable<String, Job> readyJobs; + private Hashtable<String, Job> runningJobs; + private Hashtable<String, Job> successfulJobs; + private Hashtable<String, Job> failedJobs; private long nextJobID; private String groupName; @@ -62,23 +62,22 @@ * @param groupName a name identifying this group */ public JobControl(String groupName) { - this.waitingJobs = new Hashtable(); - this.readyJobs = new Hashtable(); - this.runningJobs = new Hashtable(); - this.successfulJobs = new Hashtable(); - this.failedJobs = new Hashtable(); + this.waitingJobs = new Hashtable<String, Job>(); + this.readyJobs = new Hashtable<String, Job>(); + this.runningJobs = new Hashtable<String, Job>(); + this.successfulJobs = new Hashtable<String, Job>(); + this.failedJobs = new Hashtable<String, Job>(); this.nextJobID = -1; this.groupName = groupName; this.runnerState = JobControl.READY; } - private static ArrayList toArrayList(Hashtable jobs) { - ArrayList retv = new ArrayList(); + private static ArrayList<Job> toArrayList(Hashtable<String, Job> jobs) { + ArrayList<Job> retv = new ArrayList<Job>(); synchronized (jobs) { - Iterator iter = jobs.values().iterator(); - while (iter.hasNext()) { - retv.add(iter.next()); + for (Job job : jobs.values()) { + retv.add(job); } } @@ -88,32 +87,32 @@ /** * @return the jobs in the waiting state */ - public ArrayList getWaitingJobs() { + public ArrayList<Job> getWaitingJobs() { return JobControl.toArrayList(this.waitingJobs); } /** * @return the jobs in the running state */ - public ArrayList getRunningJobs() { + public ArrayList<Job> getRunningJobs() { return JobControl.toArrayList(this.runningJobs); } /** * @return the jobs in the ready state */ - public ArrayList getReadyJobs() { + public ArrayList<Job> getReadyJobs() { return JobControl.toArrayList(this.readyJobs); } /** * @return the jobs in the success state */ - public ArrayList getSuccessfulJobs() { + public ArrayList<Job> getSuccessfulJobs() { return JobControl.toArrayList(this.successfulJobs); } - public ArrayList getFailedJobs() { + public ArrayList<Job> getFailedJobs() { return JobControl.toArrayList(this.failedJobs); } @@ -122,19 +121,19 @@ return this.groupName + this.nextJobID; } - private static void addToQueue(Job aJob, Hashtable queue) { + private static void addToQueue(Job aJob, Hashtable<String, Job> queue) { synchronized(queue) { queue.put(aJob.getJobID(), aJob); } } private void addToQueue(Job aJob) { - Hashtable queue = getQueue(aJob.getState()); + Hashtable<String, Job> queue = getQueue(aJob.getState()); addToQueue(aJob, queue); } - private Hashtable getQueue(int state) { - Hashtable retv = null; + private Hashtable<String, Job> getQueue(int state) { + Hashtable<String, Job> retv = null; if (state == Job.WAITING) { retv = this.waitingJobs; } else if (state == Job.READY) { @@ -208,13 +207,11 @@ synchronized private void checkRunningJobs() { - Hashtable oldJobs = null; + Hashtable<String, Job> oldJobs = null; oldJobs = this.runningJobs; - this.runningJobs = new Hashtable(); + this.runningJobs = new Hashtable<String, Job>(); - Iterator jobs = oldJobs.values().iterator(); - while (jobs.hasNext()) { - Job nextJob = (Job)jobs.next(); + for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.RUNNING) { @@ -227,13 +224,11 @@ } synchronized private void checkWaitingJobs() { - Hashtable oldJobs = null; + Hashtable<String, Job> oldJobs = null; oldJobs = this.waitingJobs; - this.waitingJobs = new Hashtable(); + this.waitingJobs = new Hashtable<String, Job>(); - Iterator jobs = oldJobs.values().iterator(); - while (jobs.hasNext()) { - Job nextJob = (Job)jobs.next(); + for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.WAITING) { @@ -246,13 +241,11 @@ } synchronized private void startReadyJobs() { - Hashtable oldJobs = null; + Hashtable<String, Job> oldJobs = null; oldJobs = this.readyJobs; - this.readyJobs = new Hashtable(); + this.readyJobs = new Hashtable<String, Job>(); - Iterator jobs = oldJobs.values().iterator(); - while (jobs.hasNext()) { - Job nextJob = (Job)jobs.next(); + for (Job nextJob : oldJobs.values()) { //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName()); nextJob.submit(); //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());