Author: cdouglas
Date: Mon Aug 25 22:12:52 2008
New Revision: 688960
URL: http://svn.apache.org/viewvc?rev=688960&view=rev
Log:
HADOOP-3705. Fix mapred.join parser to accept InputFormats named with
underscore and static, inner classes.
Removed:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/FakeIF.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=688960&r1=688959&r2=688960&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Aug 25 22:12:52 2008
@@ -368,6 +368,9 @@
HADOOP-3506. Fix a rare NPE caused by error handling in S3. (Tom White via
cdouglas)
+ HADOOP-3705. Fix mapred.join parser to accept InputFormats named with
+ underscore and static, inner classes. (cdouglas)
+
Release 0.18.0 - 2008-08-19
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java?rev=688960&r1=688959&r2=688960&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/join/Parser.java Mon
Aug 25 22:12:52 2008
@@ -135,6 +135,8 @@
tok.ordinaryChar(',');
tok.ordinaryChar('(');
tok.ordinaryChar(')');
+ tok.wordChars('$','$');
+ tok.wordChars('_','_');
}
Token next() throws IOException {
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=688960&r1=688959&r2=688960&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Mon Aug 25 22:12:52 2008
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred.join;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
@@ -26,25 +28,31 @@
import junit.extensions.TestSetup;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.util.ReflectionUtils;
public class TestDatamerge extends TestCase {
@@ -285,7 +293,7 @@
if (i + 1 != SOURCES) sb.append(",");
}
sb.append("),outer(");
- sb.append(CompositeInputFormat.compose(FakeIF.class,"foobar"));
+ sb.append(CompositeInputFormat.compose(Fake_IF.class,"foobar"));
sb.append(",");
for (int i = 0; i < SOURCES; ++i) {
sb.append(
@@ -293,13 +301,13 @@
src[i].toString()));
sb.append(",");
}
- sb.append(CompositeInputFormat.compose(FakeIF.class,"raboof") + "))");
+ sb.append(CompositeInputFormat.compose(Fake_IF.class,"raboof") + "))");
job.set("mapred.join.expr", sb.toString());
job.setInputFormat(CompositeInputFormat.class);
Path outf = new Path(base, "out");
FileOutputFormat.setOutputPath(job, outf);
- FakeIF.setKeyClass(job, IntWritable.class);
- FakeIF.setValClass(job, IntWritable.class);
+ Fake_IF.setKeyClass(job, IntWritable.class);
+ Fake_IF.setValClass(job, IntWritable.class);
job.setMapperClass(IdentityMapper.class);
job.setReducerClass(IdentityReducer.class);
@@ -345,7 +353,7 @@
Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
- FakeIF.class, src));
+ Fake_IF.class, src));
job.setInputFormat(CompositeInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(base, "out"));
@@ -357,4 +365,56 @@
JobClient.runJob(job);
base.getFileSystem(job).delete(base, true);
}
+
+ public static class Fake_IF<K,V>
+ implements InputFormat<K,V>, JobConfigurable {
+
+ public static class FakeSplit implements InputSplit {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public static void setKeyClass(JobConf job, Class<?> k) {
+ job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
+ }
+
+ public static void setValClass(JobConf job, Class<?> v) {
+ job.setClass("test.fakeif.valclass", v, Writable.class);
+ }
+
+ private Class<? extends K> keyclass;
+ private Class<? extends V> valclass;
+
+ @SuppressWarnings("unchecked")
+ public void configure(JobConf job) {
+ keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
+ IncomparableKey.class, WritableComparable.class);
+ valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
+ NullWritable.class, WritableComparable.class);
+ }
+
+ public Fake_IF() { }
+
+ public InputSplit[] getSplits(JobConf conf, int splits) {
+ return new InputSplit[] { new FakeSplit() };
+ }
+
+ public RecordReader<K,V> getRecordReader(
+ InputSplit ignored, JobConf conf, Reporter reporter) {
+ return new RecordReader<K,V>() {
+ public boolean next(K key, V value) throws IOException { return false;
}
+ public K createKey() {
+ return ReflectionUtils.newInstance(keyclass, null);
+ }
+ public V createValue() {
+ return ReflectionUtils.newInstance(valclass, null);
+ }
+ public long getPos() throws IOException { return 0L; }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException { return 0.0f; }
+ };
+ }
+ }
}