Author: omalley
Date: Wed Jul 30 14:05:40 2008
New Revision: 681202
URL: http://svn.apache.org/viewvc?rev=681202&view=rev
Log:
HADOOP-1230. Partial update:
1. made Partitioner and OutputFormat abstract classes.
2. fixed reference to WritableComparable
3. made Job set methods throw if they are in the wrong (ie. submitted) state
4. removed the Writable interface from InputSplit, although it remains on
FileSplit
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/InputSplit.java
Wed Jul 30 14:05:40 2008
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -31,12 +30,12 @@
*
* <p>Typically, it presents a byte-oriented view on the input and is the
* responsibility of [EMAIL PROTECTED] RecordReader} of the job to process
this and present
- * a record-oriented view. Although the type is Writable, only the InputFormat
+ * a record-oriented view.
*
* @see InputFormat
* @see RecordReader
*/
-public abstract class InputSplit implements Writable {
+public abstract class InputSplit {
/**
* Get the size of the split, so that the input splits can be sorted by size.
* @return the number of bytes in the split
@@ -47,8 +46,7 @@
/**
* Get the list of nodes by name where the data for the split would be local.
- * The locations do not need to be serialized by calls to
- * [EMAIL PROTECTED] Writable#write(java.io.DataOutput)}
+ * The locations do not need to be serialized.
* @return a new array of the node nodes.
* @throws IOException
* @throws InterruptedException
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Wed Jul
30 14:05:40 2008
@@ -29,7 +29,9 @@
/**
* The job submitter's view of the Job. It allows the user to configure the
- * job, submit it, control its execution, and query the state.
+ * job, submit it, control its execution, and query the state. The set methods
+ * only work until the job is submitted, afterwards they will throw an
+ * IllegalStateException.
*/
public class Job extends JobContext {
@@ -49,8 +51,9 @@
/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
+ * @throws IllegalStateException if the job is submitted
*/
- public void setNumReduceTasks(int tasks) {
+ public void setNumReduceTasks(int tasks) throws IllegalStateException {
conf.setInt(NUM_REDUCES_ATTR, tasks);
}
@@ -58,6 +61,7 @@
* Set the current working directory for the default file system.
*
* @param dir the new current working directory.
+ * @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
dir = dir.makeQualified(FileSystem.get(conf));
@@ -67,48 +71,60 @@
/**
* Set the [EMAIL PROTECTED] InputFormat} for the job.
* @param cls the <code>InputFormat</code> to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls) {
+ public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
}
/**
* Set the [EMAIL PROTECTED] OutputFormat} for the job.
* @param cls the <code>OutputFormat</code> to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls) {
+ public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
}
/**
* Set the [EMAIL PROTECTED] Mapper} for the job.
* @param cls the <code>Mapper</code> to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls) {
+ public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}
/**
* Set the combiner class for the job.
* @param cls the combiner to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+ public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the [EMAIL PROTECTED] Reducer} for the job.
* @param cls the <code>Reducer</code> to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls) {
+ public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the [EMAIL PROTECTED] Partitioner} for the job.
* @param cls the <code>Partitioner</code> to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls) {
+ public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls
+ ) throws IllegalStateException {
conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
}
@@ -118,8 +134,10 @@
* value class.
*
* @param theClass the map output key class.
+ * @throws IllegalStateException if the job is submitted
*/
- public void setMapOutputKeyClass(Class<?> theClass) {
+ public void setMapOutputKeyClass(Class<?> theClass
+ ) throws IllegalStateException {
conf.setClass(MAP_OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
}
@@ -129,8 +147,10 @@
* value class.
*
* @param theClass the map output value class.
+ * @throws IllegalStateException if the job is submitted
*/
- public void setMapOutputValueClass(Class<?> theClass) {
+ public void setMapOutputValueClass(Class<?> theClass
+ ) throws IllegalStateException {
conf.setClass(MAP_OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
}
@@ -138,8 +158,10 @@
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
+ * @throws IllegalStateException if the job is submitted
*/
- public void setOutputKeyClass(Class<?> theClass) {
+ public void setOutputKeyClass(Class<?> theClass
+ ) throws IllegalStateException {
conf.setClass(OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
}
@@ -147,8 +169,10 @@
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
+ * @throws IllegalStateException if the job is submitted
*/
- public void setOutputValueClass(Class<?> theClass) {
+ public void setOutputValueClass(Class<?> theClass
+ ) throws IllegalStateException {
conf.setClass(OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
}
@@ -156,8 +180,10 @@
* Define the comparator that controls how the keys are sorted before they
* are passed to the [EMAIL PROTECTED] Reducer}.
* @param cls the raw comparator
+ * @throws IllegalStateException if the job is submitted
*/
- public void setSortComparatorClass(Class<? extends RawComparator<?>> cls) {
+ public void setSortComparatorClass(Class<? extends RawComparator<?>> cls
+ ) throws IllegalStateException {
conf.setClass(SORT_COMPARATOR_ATTR, cls, RawComparator.class);
}
@@ -166,8 +192,10 @@
* for a single call to
* [EMAIL PROTECTED] Reducer#reduce(Object, Iterable,
org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
+ * @throws IllegalStateException if the job is submitted
*/
- public void setGroupingComparatorClass(Class<? extends RawComparator<?>>
cls){
+ public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls
+ ) throws IllegalStateException {
conf.setClass(GROUPING_COMPARATOR_ATTR, cls, RawComparator.class);
}
@@ -175,8 +203,9 @@
* Set the user-specified job name.
*
* @param name the job's new name.
+ * @throws IllegalStateException if the job is submitted
*/
- public void setJobName(String name) {
+ public void setJobName(String name) throws IllegalStateException {
conf.set(JOB_NAME_ATTR, name);
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
Wed Jul 30 14:05:40 2008
@@ -26,7 +26,6 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -255,7 +254,7 @@
}
/**
- * Get the user defined [EMAIL PROTECTED] WritableComparable} comparator for
+ * Get the user defined [EMAIL PROTECTED] RawComparator} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
Wed Jul 30 14:05:40 2008
@@ -41,7 +41,7 @@
*
* @see RecordWriter
*/
-public interface OutputFormat<K, V> {
+public abstract class OutputFormat<K, V> {
/**
* Get the [EMAIL PROTECTED] RecordWriter} for the given task.
@@ -50,8 +50,9 @@
* @return a [EMAIL PROTECTED] RecordWriter} to write the output for the job.
* @throws IOException
*/
- RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException;
+ public abstract RecordWriter<K, V>
+ getRecordWriter(TaskAttemptContext context
+ ) throws IOException, InterruptedException;
/**
* Check for validity of the output-specification for the job.
@@ -64,7 +65,8 @@
* @param context information about the job
* @throws IOException when output should not be attempted
*/
- void checkOutputSpecs(JobContext context
- ) throws IOException, InterruptedException;
+ public abstract void checkOutputSpecs(JobContext context
+ ) throws IOException,
+ InterruptedException;
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
Wed Jul 30 14:05:40 2008
@@ -30,7 +30,7 @@
*
* @see Reducer
*/
-public interface Partitioner<KEY, VALUE> {
+public abstract class Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key (hence record) given the total
@@ -43,5 +43,5 @@
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
- int getPartition(KEY key, VALUE value, int numPartitions);
+ public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
Wed Jul 30 14:05:40 2008
@@ -37,7 +37,9 @@
}
/**
- * Iterate through the values for the current key.
+ * Iterate through the values for the current key, reusing the same value
+ * object, which is stored in the context.
+ * @return the series of values associated with the current key
*/
public abstract
Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileSplit.java
Wed Jul 30 14:05:40 2008
@@ -27,11 +27,12 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
/** A section of an input file. Returned by [EMAIL PROTECTED]
* InputFormat#getSplits(JobContext)} and passed to
* [EMAIL PROTECTED]
InputFormat#createRecordReader(InputSplit,TaskAttemptContext)}. */
-public class FileSplit extends InputSplit {
+public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
@@ -60,20 +61,24 @@
public long getStart() { return start; }
/** The number of bytes in the file to process. */
+ @Override
public long getLength() { return length; }
+ @Override
public String toString() { return file + ":" + start + "+" + length; }
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
+ @Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, file.toString());
out.writeLong(start);
out.writeLong(length);
}
+ @Override
public void readFields(DataInput in) throws IOException {
file = new Path(Text.readString(in));
start = in.readLong();
@@ -81,6 +86,7 @@
hosts = null;
}
+ @Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[]{};
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
Wed Jul 30 14:05:40 2008
@@ -33,7 +33,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/** A base class for [EMAIL PROTECTED] OutputFormat}s that read from [EMAIL
PROTECTED] FileSystem}s.*/
-public abstract class FileOutputFormat<K, V> implements OutputFormat<K, V> {
+public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
private static final String TEMP_DIR_NAME = "_temp";
/**
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java
Wed Jul 30 14:05:40 2008
@@ -26,7 +26,7 @@
/**
* Consume all outputs and put them in /dev/null.
*/
-public class NullOutputFormat<K, V> implements OutputFormat<K, V> {
+public class NullOutputFormat<K, V> extends OutputFormat<K, V> {
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
return new RecordWriter<K, V>(){
Modified:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java?rev=681202&r1=681201&r2=681202&view=diff
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
(original)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/partition/HashPartitioner.java
Wed Jul 30 14:05:40 2008
@@ -21,7 +21,7 @@
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their [EMAIL PROTECTED] Object#hashCode()}. */
-public class HashPartitioner<K, V> implements Partitioner<K, V> {
+public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use [EMAIL PROTECTED] Object#hashCode()} to partition. */
public int getPartition(K key, V value,