Author: cdouglas
Date: Tue Jul 8 17:55:58 2008
New Revision: 675072
URL: http://svn.apache.org/viewvc?rev=675072&view=rev
Log:
HADOOP-3630. Fix NullPointerException in CompositeRecordReader from empty
sources.
Added:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/FakeIF.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/IncomparableKey.java
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=675072&r1=675071&r2=675072&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Tue Jul 8 17:55:58 2008
@@ -734,6 +734,9 @@
HADOOP-3691. Fix streaming and tutorial docs. (Jothi Padmanabhan via ddas)
+ HADOOP-3630. Fix NullPointerException in CompositeRecordReader from empty
+ sources (cdouglas)
+
Release 0.17.2 - Unreleased
BUG FIXES
Modified:
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=675072&r1=675071&r2=675072&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
(original)
+++
hadoop/core/branches/branch-0.18/src/mapred/org/apache/hadoop/mapred/join/CompositeRecordReader.java
Tue Jul 8 17:55:58 2008
@@ -135,7 +135,9 @@
}
});
}
- q.add(rr);
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
}
/**
@@ -357,7 +359,9 @@
}
for (ComposableRecordReader<K,?> rr : tmp) {
rr.skip(key);
- q.add(rr);
+ if (rr.hasNext()) {
+ q.add(rr);
+ }
}
}
Added:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/FakeIF.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/FakeIF.java?rev=675072&view=auto
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/FakeIF.java
(added)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/FakeIF.java
Tue Jul 8 17:55:58 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+public class FakeIF
+ implements InputFormat<IncomparableKey,NullWritable> {
+
+ public static class FakeSplit implements InputSplit {
+ public void write(DataOutput out) throws IOException { }
+ public void readFields(DataInput in) throws IOException { }
+ public long getLength() { return 0L; }
+ public String[] getLocations() { return new String[0]; }
+ }
+
+ public FakeIF() { }
+
+ public void validateInput(JobConf conf) { }
+
+ public InputSplit[] getSplits(JobConf conf, int splits) {
+ return new InputSplit[] { new FakeSplit() };
+ }
+
+ public RecordReader<IncomparableKey,NullWritable> getRecordReader(
+ InputSplit ignored, JobConf conf, Reporter reporter) {
+ return new RecordReader<IncomparableKey,NullWritable>() {
+ public boolean next(IncomparableKey key, NullWritable value)
+ throws IOException { return false; }
+ public IncomparableKey createKey() { return new IncomparableKey(); }
+ public NullWritable createValue() { return NullWritable.get(); }
+ public long getPos() throws IOException { return 0L; }
+ public void close() throws IOException { }
+ public float getProgress() throws IOException { return 0.0f; }
+ };
+ }
+}
Added:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/IncomparableKey.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/IncomparableKey.java?rev=675072&view=auto
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/IncomparableKey.java
(added)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/IncomparableKey.java
Tue Jul 8 17:55:58 2008
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.join;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+
+import org.apache.hadoop.io.WritableComparable;
+
+public class IncomparableKey implements WritableComparable {
+ public void write(DataOutput out) { }
+ public void readFields(DataInput in) { }
+ public int compareTo(Object o) {
+ throw new RuntimeException("Should never see this.");
+ }
+}
Modified:
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java?rev=675072&r1=675071&r2=675072&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
(original)
+++
hadoop/core/branches/branch-0.18/src/test/org/apache/hadoop/mapred/join/TestDatamerge.java
Tue Jul 8 17:55:58 2008
@@ -29,6 +29,7 @@
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -40,6 +41,8 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
public class TestDatamerge extends TestCase {
@@ -251,4 +254,22 @@
CompositeInputFormat cif = new CompositeInputFormat();
cif.validateInput(conf);
}
+
+ public void testEmptyJoin() throws Exception {
+ JobConf job = new JobConf();
+ Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
+ Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
+ job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
+ FakeIF.class, src));
+ job.setInputFormat(CompositeInputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path(base, "out"));
+
+ job.setMapperClass(IdentityMapper.class);
+ job.setReducerClass(IdentityReducer.class);
+ job.setOutputKeyClass(IncomparableKey.class);
+ job.setOutputValueClass(NullWritable.class);
+
+ JobClient.runJob(job);
+ base.getFileSystem(job).delete(base, true);
+ }
}