Author: sharad
Date: Mon Jun 1 12:01:28 2009
New Revision: 780621
URL: http://svn.apache.org/viewvc?rev=780621&view=rev
Log:
HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new mapreduce api.
Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 1 12:01:28 2009
@@ -411,6 +411,9 @@
HADOOP-5698. Change org.apache.hadoop.examples.MultiFileWordCount to
use new mapreduce api. (Amareshwari Sriramadasu via sharad)
+ HADOOP-5696. Change org.apache.hadoop.examples.Sort to use new
+ mapreduce api. (Amareshwari Sriramadasu via sharad)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified: hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java
(original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/Sort.java Mon Jun
1 12:01:28 2009
@@ -29,11 +29,15 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.IdentityMapper;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.mapred.lib.InputSampler;
-import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -42,7 +46,7 @@
* other than use the framework to fragment and sort the input values.
*
* To run: bin/hadoop jar build/hadoop-examples.jar sort
- * [-m <i>maps</i>] [-r <i>reduces</i>]
+ * [-r <i>reduces</i>]
* [-inFormat <i>input format class</i>]
* [-outFormat <i>output format class</i>]
* [-outKey <i>output key class</i>]
@@ -51,10 +55,10 @@
* <i>in-dir</i> <i>out-dir</i>
*/
public class Sort<K,V> extends Configured implements Tool {
- private RunningJob jobResult = null;
+ private Job job = null;
static int printUsage() {
- System.out.println("sort [-m <maps>] [-r <reduces>] " +
+ System.out.println("sort [-r <reduces>] " +
"[-inFormat <input format class>] " +
"[-outFormat <output format class>] " +
"[-outKey <output key class>] " +
@@ -62,7 +66,7 @@
"[-totalOrder <pcnt> <num samples> <max splits>] " +
"<input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
- return -1;
+ return 2;
}
/**
@@ -73,16 +77,11 @@
*/
public int run(String[] args) throws Exception {
- JobConf jobConf = new JobConf(getConf(), Sort.class);
- jobConf.setJobName("sorter");
-
- jobConf.setMapperClass(IdentityMapper.class);
- jobConf.setReducerClass(IdentityReducer.class);
-
- JobClient client = new JobClient(jobConf);
+ Configuration conf = getConf();
+ JobClient client = new JobClient(conf);
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
- String sort_reduces = jobConf.get("test.sort.reduces_per_host");
+ String sort_reduces = conf.get("test.sort.reduces_per_host");
if (sort_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(sort_reduces);
@@ -97,9 +96,7 @@
InputSampler.Sampler<K,V> sampler = null;
for(int i=0; i < args.length; ++i) {
try {
- if ("-m".equals(args[i])) {
- jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
- } else if ("-r".equals(args[i])) {
+ if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
} else if ("-inFormat".equals(args[i])) {
inputFormatClass =
@@ -132,15 +129,21 @@
return printUsage(); // exits
}
}
-
// Set user-supplied (possibly default) job configs
- jobConf.setNumReduceTasks(num_reduces);
+ job = new Job(conf);
+ job.setJobName("sorter");
+ job.setJarByClass(Sort.class);
- jobConf.setInputFormat(inputFormatClass);
- jobConf.setOutputFormat(outputFormatClass);
+ job.setMapperClass(Mapper.class);
+ job.setReducerClass(Reducer.class);
- jobConf.setOutputKeyClass(outputKeyClass);
- jobConf.setOutputValueClass(outputValueClass);
+ job.setNumReduceTasks(num_reduces);
+
+ job.setInputFormatClass(inputFormatClass);
+ job.setOutputFormatClass(outputFormatClass);
+
+ job.setOutputKeyClass(outputKeyClass);
+ job.setOutputValueClass(outputValueClass);
// Make sure there are exactly 2 parameters left.
if (otherArgs.size() != 2) {
@@ -148,37 +151,37 @@
otherArgs.size() + " instead of 2.");
return printUsage();
}
- FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
- FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
-
+ FileInputFormat.setInputPaths(job, otherArgs.get(0));
+ FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
+
if (sampler != null) {
System.out.println("Sampling input to effect total-order sort...");
- jobConf.setPartitionerClass(TotalOrderPartitioner.class);
- Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
- inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
+ job.setPartitionerClass(TotalOrderPartitioner.class);
+ Path inputDir = FileInputFormat.getInputPaths(job)[0];
+ inputDir = inputDir.makeQualified(inputDir.getFileSystem(conf));
Path partitionFile = new Path(inputDir, "_sortPartitioning");
- TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
- InputSampler.<K,V>writePartitionFile(jobConf, sampler);
+ TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
+ InputSampler.<K,V>writePartitionFile(job, sampler);
URI partitionUri = new URI(partitionFile.toString() +
"#" + "_sortPartitioning");
- DistributedCache.addCacheFile(partitionUri, jobConf);
- DistributedCache.createSymlink(jobConf);
+ DistributedCache.addCacheFile(partitionUri, conf);
+ DistributedCache.createSymlink(conf);
}
System.out.println("Running on " +
cluster.getTaskTrackers() +
" nodes to sort from " +
- FileInputFormat.getInputPaths(jobConf)[0] + " into " +
- FileOutputFormat.getOutputPath(jobConf) +
+ FileInputFormat.getInputPaths(job)[0] + " into " +
+ FileOutputFormat.getOutputPath(job) +
" with " + num_reduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
- jobResult = JobClient.runJob(jobConf);
+ int ret = job.waitForCompletion(true) ? 0 : 1;
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
- return 0;
+ return ret;
}
@@ -192,7 +195,7 @@
* Get the last job that was run using this instance.
* @return the results of the last job that was run
*/
- public RunningJob getResult() {
- return jobResult;
+ public Job getResult() {
+ return job;
}
}
Modified:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
(original)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
Mon Jun 1 12:01:28 2009
@@ -210,9 +210,9 @@
try {
URI inputURI = new URI(job.get("map.input.file"));
String inputFile = inputURI.getPath();
- partition = Integer.valueOf(
-
inputFile.substring(inputFile.lastIndexOf("part")+5)
- ).intValue();
+ // part file is of the form part-r-xxxxx
+ partition = Integer.valueOf(inputFile.substring(
+ inputFile.lastIndexOf("part") + 7)).intValue();
noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
Modified:
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java?rev=780621&r1=780620&r2=780621&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
(original)
+++
hadoop/core/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRDFSSort.java
Mon Jun 1 12:01:28 2009
@@ -96,7 +96,7 @@
// Run Sort
Sort sort = new Sort();
assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
- Counters counters = sort.getResult().getCounters();
+ org.apache.hadoop.mapreduce.Counters counters =
sort.getResult().getCounters();
long mapInput = counters.findCounter(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.COUNTER_GROUP,
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.BYTES_READ).
@@ -106,7 +106,7 @@
// the hdfs read should be between 100% and 110% of the map input bytes
assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
(hdfsRead < (mapInput * 1.1)) &&
- (hdfsRead > mapInput));
+ (hdfsRead >= mapInput));
}
private static void runSortValidator(JobConf job,