Author: sharad
Date: Tue Jun 9 05:16:04 2009
New Revision: 782881
URL: http://svn.apache.org/viewvc?rev=782881&view=rev
Log:
HADOOP-5694. Change org.apache.hadoop.examples.dancing to use new mapreduce
api. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=782881&r1=782880&r2=782881&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jun 9 05:16:04 2009
@@ -435,6 +435,9 @@
HADOOP-5961. DataNode process understand generic hadoop command line
options (like -Ddfs.property=value). (Raghu Angadi)
+ HADOOP-5694. Change org.apache.hadoop.examples.dancing to use new
+ mapreduce api. (Amareshwari Sriramadasu via sharad)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java?rev=782881&r1=782880&r2=782881&view=diff
==============================================================================
---
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
(original)
+++
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/dancing/DistributedPentomino.java
Tue Jun 9 05:16:04 2009
@@ -28,8 +28,9 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.*;
/**
@@ -43,21 +44,25 @@
*/
public class DistributedPentomino extends Configured implements Tool {
+ private static final int PENT_DEPTH = 5;
+ private static final int PENT_WIDTH = 9;
+ private static final int PENT_HEIGHT = 10;
+ private static final int DEFAULT_MAPS = 2000;
+
/**
* Each map takes a line, which represents a prefix move and finds all of
* the solutions that start with that prefix. The output is the prefix as
* the key and the solution as the value.
*/
- public static class PentMap extends MapReduceBase
- implements Mapper<WritableComparable, Text, Text, Text> {
+ public static class PentMap extends
+ Mapper<WritableComparable<?>, Text, Text, Text> {
private int width;
private int height;
private int depth;
private Pentomino pent;
private Text prefixString;
- private OutputCollector<Text, Text> output;
- private Reporter reporter;
+ private Context context;
/**
* For each solution, generate the prefix and a string representation
@@ -72,10 +77,12 @@
public void solution(List<List<Pentomino.ColumnName>> answer) {
String board = Pentomino.stringifySolution(width, height, answer);
try {
- output.collect(prefixString, new Text("\n" + board));
- reporter.incrCounter(pent.getCategory(answer), 1);
+ context.write(prefixString, new Text("\n" + board));
+ context.getCounter(pent.getCategory(answer)).increment(1);
} catch (IOException e) {
System.err.println(StringUtils.stringifyException(e));
+ } catch (InterruptedException ie) {
+ System.err.println(StringUtils.stringifyException(ie));
}
}
}
@@ -85,11 +92,8 @@
* will be selected for each column in order). Find all solutions with
* that prefix.
*/
- public void map(WritableComparable key, Text value,
- OutputCollector<Text, Text> output, Reporter reporter
- ) throws IOException {
- this.output = output;
- this.reporter = reporter;
+ public void map(WritableComparable<?> key, Text value,Context context)
+ throws IOException {
prefixString = value;
StringTokenizer itr = new StringTokenizer(prefixString.toString(), ",");
int[] prefix = new int[depth];
@@ -102,10 +106,12 @@
}
@Override
- public void configure(JobConf conf) {
- depth = conf.getInt("pent.depth", -1);
- width = conf.getInt("pent.width", -1);
- height = conf.getInt("pent.height", -1);
+ public void setup(Context context) {
+ this.context = context;
+ Configuration conf = context.getConfiguration();
+ depth = conf.getInt("pent.depth", PENT_DEPTH);
+ width = conf.getInt("pent.width", PENT_WIDTH);
+ height = conf.getInt("pent.height", PENT_HEIGHT);
pent = (Pentomino)
ReflectionUtils.newInstance(conf.getClass("pent.class",
OneSidedPentomino.class),
@@ -123,16 +129,17 @@
* @param pent the puzzle
* @param depth the depth to explore when generating prefixes
*/
- private static void createInputDirectory(FileSystem fs,
+ private static long createInputDirectory(FileSystem fs,
Path dir,
Pentomino pent,
int depth
) throws IOException {
fs.mkdirs(dir);
List<int[]> splits = pent.getSplits(depth);
+ Path input = new Path(dir, "part1");
PrintStream file =
new PrintStream(new BufferedOutputStream
- (fs.create(new Path(dir, "part1")), 64*1024));
+ (fs.create(input), 64*1024));
for(int[] prefix: splits) {
for(int i=0; i < prefix.length; ++i) {
if (i != 0) {
@@ -143,6 +150,7 @@
file.print('\n');
}
file.close();
+ return fs.getFileStatus(input).getLen();
}
/**
@@ -151,57 +159,54 @@
* Splits the job into 2000 maps and 1 reduce.
*/
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(), new DistributedPentomino(),
args);
+ int res = ToolRunner.run(new Configuration(),
+ new DistributedPentomino(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
- JobConf conf;
- int depth = 5;
- int width = 9;
- int height = 10;
- Class<? extends Pentomino> pentClass;
if (args.length == 0) {
System.out.println("pentomino <output>");
ToolRunner.printGenericCommandUsage(System.out);
- return -1;
+ return 2;
}
-
- conf = new JobConf(getConf());
- width = conf.getInt("pent.width", width);
- height = conf.getInt("pent.height", height);
- depth = conf.getInt("pent.depth", depth);
- pentClass = conf.getClass("pent.class", OneSidedPentomino.class,
Pentomino.class);
-
+
+ Configuration conf = getConf();
+ int width = conf.getInt("pent.width", PENT_WIDTH);
+ int height = conf.getInt("pent.height", PENT_HEIGHT);
+ int depth = conf.getInt("pent.depth", PENT_DEPTH);
+ Class<? extends Pentomino> pentClass = conf.getClass("pent.class",
+ OneSidedPentomino.class, Pentomino.class);
+ int numMaps = conf.getInt("mapred.map.tasks", DEFAULT_MAPS);
Path output = new Path(args[0]);
Path input = new Path(output + "_input");
FileSystem fileSys = FileSystem.get(conf);
try {
- FileInputFormat.setInputPaths(conf, input);
- FileOutputFormat.setOutputPath(conf, output);
- conf.setJarByClass(PentMap.class);
+ Job job = new Job(conf);
+ FileInputFormat.setInputPaths(job, input);
+ FileOutputFormat.setOutputPath(job, output);
+ job.setJarByClass(PentMap.class);
- conf.setJobName("dancingElephant");
+ job.setJobName("dancingElephant");
Pentomino pent = ReflectionUtils.newInstance(pentClass, conf);
pent.initialize(width, height);
- createInputDirectory(fileSys, input, pent, depth);
+ long inputSize = createInputDirectory(fileSys, input, pent, depth);
+ // for forcing the number of maps
+ FileInputFormat.setMaxInputSplitSize(job, (inputSize/numMaps));
// the keys are the prefix strings
- conf.setOutputKeyClass(Text.class);
+ job.setOutputKeyClass(Text.class);
// the values are puzzle solutions
- conf.setOutputValueClass(Text.class);
+ job.setOutputValueClass(Text.class);
- conf.setMapperClass(PentMap.class);
- conf.setReducerClass(IdentityReducer.class);
+ job.setMapperClass(PentMap.class);
+ job.setReducerClass(Reducer.class);
- conf.setNumMapTasks(2000);
- conf.setNumReduceTasks(1);
+ job.setNumReduceTasks(1);
- JobClient.runJob(conf);
+ return (job.waitForCompletion(true) ? 0 : 1);
} finally {
fileSys.delete(input, true);
}
- return 0;
}
-
}