Author: cdouglas
Date: Thu Jul 3 16:47:01 2008
New Revision: 673869
URL: http://svn.apache.org/viewvc?rev=673869&view=rev
Log:
HADOOP-3665. Modify WritableComparator so that it only creates instances of the
keytype if the type does not define a WritableComparator. Calling the
superclass compare will throw a NullPointerException. Also define a
RawComparator for NullWritable and permit it to be written as a key to
SequenceFiles.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/NullWritable.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableComparator.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=673869&r1=673868&r2=673869&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jul 3 16:47:01 2008
@@ -160,6 +160,12 @@
if they are not necessary e.g. for Maps with no side-effect files.
(acmurthy)
+ HADOOP-3665. Modify WritableComparator so that it only creates instances
+ of the keytype if the type does not define a WritableComparator. Calling
+ the superclass compare will throw a NullPointerException. Also define
+ a RawComparator for NullWritable and permit it to be written as a key
+ to SequenceFiles. (cdouglas)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/NullWritable.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/NullWritable.java?rev=673869&r1=673868&r2=673869&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/NullWritable.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/NullWritable.java Thu Jul
3 16:47:01 2008
@@ -45,5 +45,26 @@
public boolean equals(Object other) { return other instanceof NullWritable; }
public void readFields(DataInput in) throws IOException {}
public void write(DataOutput out) throws IOException {}
+
+ /** A Comparator "optimized" for NullWritable. */
+ public static class Comparator extends WritableComparator {
+ public Comparator() {
+ super(NullWritable.class);
+ }
+
+ /**
+ * Compare the buffers in serialized form.
+ */
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ assert 0 == l1;
+ assert 0 == l2;
+ return 0;
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(NullWritable.class, new Comparator());
+ }
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java?rev=673869&r1=673868&r2=673869&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/SequenceFile.java Thu Jul
3 16:47:01 2008
@@ -992,8 +992,8 @@
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed: " + key);
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
// Append the 'value'
if (compress) {
@@ -1014,8 +1014,8 @@
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed: " + keyLength);
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " +
keyLength);
int valLength = val.getSize();
@@ -1119,8 +1119,8 @@
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed: " + key);
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
// Compress 'value' and append it
deflateFilter.resetState();
@@ -1139,8 +1139,8 @@
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed");
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " +
keyLength);
int valLength = val.getSize();
@@ -1302,8 +1302,8 @@
int oldKeyLength = keyBuffer.getLength();
keySerializer.serialize(key);
int keyLength = keyBuffer.getLength() - oldKeyLength;
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed: " + key);
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed: " + key);
WritableUtils.writeVInt(keyLenBuffer, keyLength);
int oldValLength = valBuffer.getLength();
@@ -1325,8 +1325,8 @@
public synchronized void appendRaw(byte[] keyData, int keyOffset,
int keyLength, ValueBytes val) throws IOException {
- if (keyLength == 0)
- throw new IOException("zero length keys not allowed");
+ if (keyLength < 0)
+ throw new IOException("negative length keys not allowed");
int valLength = val.getSize();
@@ -2229,7 +2229,7 @@
/** Sort and merge files containing the named classes. */
public Sorter(FileSystem fs, Class keyClass, Class valClass, Configuration
conf) {
- this(fs, new WritableComparator(keyClass), keyClass, valClass, conf);
+ this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
}
/** Sort and merge using an arbitrary [EMAIL PROTECTED] RawComparator}. */
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableComparator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableComparator.java?rev=673869&r1=673868&r2=673869&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableComparator.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/WritableComparator.java Thu
Jul 3 16:47:01 2008
@@ -21,6 +21,8 @@
import java.io.*;
import java.util.*;
+import org.apache.hadoop.util.ReflectionUtils;
+
/** A Comparator for [EMAIL PROTECTED] WritableComparable}s.
*
* <p>This base implemenation uses the natural ordering. To define alternate
@@ -39,7 +41,7 @@
public static synchronized WritableComparator get(Class c) {
WritableComparator comparator = comparators.get(c);
if (comparator == null)
- comparator = new WritableComparator(c);
+ comparator = new WritableComparator(c, true);
return comparator;
}
@@ -51,17 +53,26 @@
}
- private DataInputBuffer buffer = new DataInputBuffer();
-
- private Class keyClass;
- private WritableComparable key1;
- private WritableComparable key2;
+ private final Class keyClass;
+ private final WritableComparable key1;
+ private final WritableComparable key2;
+ private final DataInputBuffer buffer;
/** Construct for a [EMAIL PROTECTED] WritableComparable} implementation. */
protected WritableComparator(Class keyClass) {
+ this(keyClass, false);
+ }
+
+ private WritableComparator(Class keyClass, boolean createInstances) {
this.keyClass = keyClass;
- this.key1 = newKey();
- this.key2 = newKey();
+ if (createInstances) {
+ key1 = newKey();
+ key2 = newKey();
+ buffer = new DataInputBuffer();
+ } else {
+ key1 = key2 = null;
+ buffer = null;
+ }
}
/** Returns the WritableComparable implementation class. */
@@ -69,13 +80,8 @@
/** Construct a new [EMAIL PROTECTED] WritableComparable} instance. */
public WritableComparable newKey() {
- try {
- return (WritableComparable)keyClass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
+ return (WritableComparable)
+ ReflectionUtils.newInstance(keyClass, null);
}
/** Optimization hook. Override this to make SequenceFile.Sorter's scream.
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=673869&r1=673868&r2=673869&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Jul
3 16:47:01 2008
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@@ -346,7 +347,58 @@
}
assertFalse("should fail for partition >= numPartitions", pass);
}
-
+
+ public static class NullMapper
+ implements Mapper<NullWritable,Text,NullWritable,Text> {
+ public void map(NullWritable key, Text val,
+ OutputCollector<NullWritable,Text> output, Reporter reporter)
+ throws IOException {
+ output.collect(NullWritable.get(), val);
+ }
+ public void configure(JobConf conf) { }
+ public void close() { }
+ }
+
+ public void testNullKeys() throws Exception {
+ JobConf conf = new JobConf(TestMapRed.class);
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path testdir = new Path(
+ System.getProperty("test.build.data","/tmp")).makeQualified(fs);
+ fs.delete(testdir, true);
+ Path inFile = new Path(testdir, "nullin/blah");
+ SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
+ NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
+ Text t = new Text();
+ t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
+ t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
+ t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
+ t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
+ t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
+ t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
+ t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
+ t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
+ w.close();
+ FileInputFormat.setInputPaths(conf, inFile);
+ FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
+ conf.setMapperClass(NullMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+ conf.setOutputKeyClass(NullWritable.class);
+ conf.setOutputValueClass(Text.class);
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+ conf.setNumReduceTasks(1);
+
+ JobClient.runJob(conf);
+
+ SequenceFile.Reader r = new SequenceFile.Reader(fs,
+ new Path(testdir, "nullout/part-00000"), conf);
+ String m = "AAAAAAAAAAAAAA";
+ for (int i = 1; r.next(NullWritable.get(), t); ++i) {
+ assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
+ m = m.replace((char)('A' + i - 1), (char)('A' + i));
+ }
+ }
+
private void checkCompression(boolean compressMapOutputs,
CompressionType redCompression,
boolean includeCombine