Author: acmurthy
Date: Mon Jul 21 14:50:22 2008
New Revision: 678579
URL: http://svn.apache.org/viewvc?rev=678579&view=rev
Log:
HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files, this
ensures we can now run more than one instance of SleepJob simultaneously.
Contributed by Chris Douglas.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=678579&r1=678578&r2=678579&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jul 21 14:50:22 2008
@@ -146,6 +146,10 @@
HADOOP-3771. Ensure that Lzo compressors/decompressors correctly handle the
case where native libraries aren't available. (Chris Douglas via acmurthy)
+ HADOOP-3728. Fix SleepJob so that it doesn't depend on temporary files,
+ this ensures we can now run more than one instance of SleepJob
+ simultaneously. (Chris Douglas via acmurthy)
+
Release 0.18.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java?rev=678579&r1=678578&r2=678579&view=diff
==============================================================================
--- hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java
(original)
+++ hadoop/core/trunk/src/examples/org/apache/hadoop/examples/SleepJob.java Mon
Jul 21 14:50:22 2008
@@ -18,6 +18,8 @@
package org.apache.hadoop.examples;
import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.util.Iterator;
import java.util.Random;
@@ -26,16 +28,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Partitioner;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -48,71 +43,120 @@
* some disk space.
*/
public class SleepJob extends Configured implements Tool,
- Mapper<IntWritable, IntWritable, IntWritable, IntWritable>,
- Reducer<IntWritable, IntWritable, IntWritable, IntWritable>,
- Partitioner<IntWritable, IntWritable>{
-
- private long mapSleepTime = 100;
- private long reduceSleepTime = 100;
- private long mapSleepCount = 1;
- private long reduceSleepCount = 1;
- private int numReduce;
-
- private boolean firstRecord = true;
- private long count = 0;
-
- public int getPartition(IntWritable key, IntWritable value, int
numPartitions) {
- return key.get() % numPartitions;
+ Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
+ Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
+ Partitioner<IntWritable,NullWritable> {
+
+ private long mapSleepDuration = 100;
+ private long reduceSleepDuration = 100;
+ private int mapSleepCount = 1;
+ private int reduceSleepCount = 1;
+ private int count = 0;
+
+ public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
+ return k.get() % numPartitions;
}
+ public static class EmptySplit implements InputSplit {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public static class SleepInputFormat extends Configured
+ implements InputFormat<IntWritable,IntWritable> {
+ public void validateInput(JobConf conf) { }
+ public InputSplit[] getSplits(JobConf conf, int numSplits) {
+ InputSplit[] ret = new InputSplit[numSplits];
+ for (int i = 0; i < numSplits; ++i) {
+ ret[i] = new EmptySplit();
+ }
+ return ret;
+ }
+ public RecordReader<IntWritable,IntWritable> getRecordReader(
+ InputSplit ignored, JobConf conf, Reporter reporter)
+ throws IOException {
+ final int count = conf.getInt("sleep.job.map.sleep.count", 1);
+ if (count < 0) throw new IOException("Invalid map count: " + count);
+ final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
+ if (redcount < 0)
+ throw new IOException("Invalid reduce count: " + redcount);
+ final int emitPerMapTask = (redcount * conf.getNumReduceTasks());
+ return new RecordReader<IntWritable,IntWritable>() {
+ private int records = 0;
+ private int emitCount = 0;
+
+ public boolean next(IntWritable key, IntWritable value)
+ throws IOException {
+ key.set(emitCount);
+ int emit = emitPerMapTask / count;
+ if ((emitPerMapTask) % count > records) {
+ ++emit;
+ }
+ emitCount += emit;
+ value.set(emit);
+ return records++ < count;
+ }
+ public IntWritable createKey() { return new IntWritable(); }
+ public IntWritable createValue() { return new IntWritable(); }
+ public long getPos() throws IOException { return records; }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException {
+ return records / ((float)count);
+ }
+ };
+ }
+ }
+
public void map(IntWritable key, IntWritable value,
- OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
throws IOException {
+ OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
//it is expected that every map processes mapSleepCount number of records.
try {
- long left = mapSleepCount - count ;
- if(left < 0) left = 0;
- reporter.setStatus("Sleeping... (" + ( mapSleepTime / mapSleepCount *
left) + ") ms left");
- Thread.sleep(mapSleepTime / mapSleepCount);
+ reporter.setStatus("Sleeping... (" +
+ (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
+ Thread.sleep(mapSleepDuration);
}
catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
}
- count++;
- if(firstRecord) {
-
- //output reduceSleepCount * numReduce number of random values, so that
each reducer will get
- //reduceSleepCount number of keys.
- for(int i=0; i < reduceSleepCount * numReduce; i++) {
- output.collect(new IntWritable(i), value);
- }
+ ++count;
+ // output reduceSleepCount * numReduce number of random values, so that
+ // each reducer will get reduceSleepCount number of keys.
+ int k = key.get();
+ for (int i = 0; i < value.get(); ++i) {
+ output.collect(new IntWritable(k + i), NullWritable.get());
}
- firstRecord = false;
}
- public void reduce(IntWritable key, Iterator<IntWritable> values,
- OutputCollector<IntWritable, IntWritable> output, Reporter reporter)
throws IOException {
-
+ public void reduce(IntWritable key, Iterator<NullWritable> values,
+ OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
+ throws IOException {
try {
- long left = reduceSleepCount - count ;
- if(left < 0) left = 0;
-
- reporter.setStatus("Sleeping... ("
- +( reduceSleepTime / reduceSleepCount * left) + ") ms left");
- Thread.sleep(reduceSleepTime / reduceSleepCount);
+ reporter.setStatus("Sleeping... (" +
+ (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
+ Thread.sleep(reduceSleepDuration);
}
catch (InterruptedException ex) {
+ throw (IOException)new IOException(
+ "Interrupted while sleeping").initCause(ex);
}
- firstRecord = false;
count++;
}
public void configure(JobConf job) {
- this.mapSleepTime = job.getLong("sleep.job.map.sleep.time" , mapSleepTime);
- this.reduceSleepTime = job.getLong("sleep.job.reduce.sleep.time" ,
reduceSleepTime);
- this.mapSleepCount = job.getLong("sleep.job.map.sleep.count",
mapSleepCount);
- this.reduceSleepCount = job.getLong("sleep.job.reduce.sleep.count",
reduceSleepCount);
- numReduce = job.getNumReduceTasks();
+ this.mapSleepCount =
+ job.getInt("sleep.job.map.sleep.count", mapSleepCount);
+ this.reduceSleepCount =
+ job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+ this.mapSleepDuration =
+ job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
+ this.reduceSleepDuration =
+ job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
}
public void close() throws IOException {
@@ -123,41 +167,28 @@
System.exit(res);
}
- public int run(int numMapper, int numReducer, long mapSleepTime
- , long mapSleepCount, long reduceSleepTime
- , long reduceSleepCount) throws Exception {
- Random random = new Random();
- FileSystem fs = FileSystem.get(getConf());
- Path tempPath = new Path("/tmp/sleep.job.data");
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf()
- , tempPath, IntWritable.class, IntWritable.class);
- for(int i=0; i<numMapper * mapSleepCount ;i++) {
- writer.append(new IntWritable(random.nextInt()), new
IntWritable(random.nextInt()));
- }
- writer.close();
- try {
- JobConf job = new JobConf(getConf(), SleepJob.class);
- job.setNumMapTasks(numMapper);
- job.setNumReduceTasks(numReducer);
- job.setMapperClass(SleepJob.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setReducerClass(SleepJob.class);
- job.setOutputFormat(NullOutputFormat.class);
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setSpeculativeExecution(false);
- job.setJobName("Sleep job");
- FileInputFormat.addInputPath(job, tempPath);
- job.setLong("sleep.job.map.sleep.time", mapSleepTime);
- job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
- job.setLong("sleep.job.map.sleep.count", mapSleepCount);
- job.setLong("sleep.job.reduce.sleep.count", reduceSleepCount);
-
- JobClient.runJob(job);
- }
- finally {
- fs.delete(tempPath, true);
- }
+ public int run(int numMapper, int numReducer, long mapSleepTime,
+ int mapSleepCount, long reduceSleepTime,
+ int reduceSleepCount) throws IOException {
+ JobConf job = new JobConf(getConf(), SleepJob.class);
+ job.setNumMapTasks(numMapper);
+ job.setNumReduceTasks(numReducer);
+ job.setMapperClass(SleepJob.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setReducerClass(SleepJob.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setInputFormat(SleepInputFormat.class);
+ job.setPartitionerClass(SleepJob.class);
+ job.setSpeculativeExecution(false);
+ job.setJobName("Sleep job");
+ FileInputFormat.addInputPath(job, new Path("ignored"));
+ job.setLong("sleep.job.map.sleep.time", mapSleepTime);
+ job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
+ job.setInt("sleep.job.map.sleep.count", mapSleepCount);
+ job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
+
+ JobClient.runJob(job);
return 0;
}
@@ -165,14 +196,15 @@
if(args.length < 1) {
System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
- " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)] ");
+ " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
+ " [-recordt recordSleepTime (msec)]");
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
int numMapper = 1, numReducer = 1;
- long mapSleepTime = 100, reduceSleepTime = 100;
- long mapSleepCount = 1, reduceSleepCount = 1;
+ long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
+ int mapSleepCount = 1, reduceSleepCount = 1;
for(int i=0; i < args.length; i++ ) {
if(args[i].equals("-m")) {
@@ -187,13 +219,17 @@
else if(args[i].equals("-rt")) {
reduceSleepTime = Long.parseLong(args[++i]);
}
+ else if (args[i].equals("-recordt")) {
+ recSleepTime = Long.parseLong(args[++i]);
+ }
}
- mapSleepCount = (long)Math.ceil(mapSleepTime / 100.0d);
- reduceSleepCount = (long)Math.ceil(reduceSleepTime / 100.0d);
+ // sleep for *SleepTime duration in Task by recSleepTime per record
+ mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
+ reduceSleepCount = (int)Math.ceil(reduceSleepTime /
((double)recSleepTime));
- return run(numMapper, numReducer, mapSleepTime, mapSleepCount
- , reduceSleepTime, reduceSleepCount);
+ return run(numMapper, numReducer, mapSleepTime, mapSleepCount,
+ reduceSleepTime, reduceSleepCount);
}
}