[
https://issues.apache.org/jira/browse/HADOOP-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12607685#action_12607685
]
Jingkei Ly commented on HADOOP-3630:
------------------------------------
I've pasted an example program below to demonstrate:
{code:java}
public static final void main(String[] args) throws Exception {
String input = args[0], output = args[1];
JobConf conf = new JobConf(TestJoin.class);
FileSystem fs = FileSystem.get(conf);
Path inputPath = new Path(input);
Path[] src = createInputFiles(inputPath,fs,conf);
String expr = CompositeInputFormat.compose("outer",
SequenceFileInputFormat.class, src);
conf.set("mapred.join.expr", expr);
conf.addInputPath(inputPath);
conf.setOutputPath(new Path(output));
conf.setInputFormat(CompositeInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
conf.setOutputKeyClass(ExampleWritable.class);
conf.setOutputValueClass(TupleWritable.class);
conf.setNumReduceTasks(0);
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);
}
public static class ExampleWritable implements WritableComparable
{
String str = null;
public void readFields(DataInput in) throws IOException {
str = in.readUTF();
}
public void write(DataOutput out) throws IOException {
out.writeUTF(str);
}
public int compareTo(Object o){
ExampleWritable other = (ExampleWritable) o;
return str.compareTo(other.str);
}
}
private static Path[] createInputFiles(Path inputPath, FileSystem fs,
JobConf conf)
throws IOException {
Path[] src = new Path[10];
for (int i = 0; i < src.length; ++i) {
src[i] = new Path(inputPath, Integer.toString(i + 10,
36));
}
SequenceFile.Writer out[] = new SequenceFile.Writer[10];
for (int i = 0; i < src.length; ++i) {
out[i] = new SequenceFile.Writer(fs, conf, src[i],
ExampleWritable.class,
ExampleWritable.class);
}
ExampleWritable key = new ExampleWritable();
ExampleWritable val = new ExampleWritable();
// write arbitrary values to only one of the sequence files
for (int i=0; i< 10; i++) {
key.str = String.valueOf(i);
val.str = String.valueOf(i);
out[1].append(key, val);
}
for (SequenceFile.Writer o:out) {
o.close();
}
return src;
}
{code}
(I hope that still compiles and makes sense :S)
which gives me the following exception when I run it:
{code}
java.lang.NullPointerException
at com.detica.netreveal.hadoop.TestJoin$ExampleWritable.compare
To(TestJoin.java:81)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.ja
va:109)
at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:134)
at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:132)
at java.util.PriorityQueue.fixUp(PriorityQueue.java:544)
at java.util.PriorityQueue.offer(PriorityQueue.java:304)
at java.util.PriorityQueue.add(PriorityQueue.java:327)
at org.apache.hadoop.mapred.join.CompositeRecordReader.add(CompositeReco
rdReader.java:138)
at org.apache.hadoop.mapred.join.Parser$CNode.getRecordReader(Parser.jav
a:419)
at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:142)
at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:49)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:211)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124
)
{code}
I think this occurs because CompositeRecordReader#add() [Hadoop 0.17.0, line
138] doesn't call rr.hasNext() to check if the RecordReader has any records
before it adds it to the PriorityQueue. Is this a bug or expected behaviour?
> CompositeRecordReader: key and values can be in uninitialized state if files
> being joined have no records
> ---------------------------------------------------------------------------------------------------------
>
> Key: HADOOP-3630
> URL: https://issues.apache.org/jira/browse/HADOOP-3630
> Project: Hadoop Core
> Issue Type: Bug
> Components: mapred
> Affects Versions: 0.17.0
> Reporter: Jingkei Ly
>
> I am using org.apache.hadoop.mapred.join.CompositeInputFormat to do an
> outer-join across a number of SequenceFiles. This works fine in most
> circumstances, but I get NullPointerExceptions/uninitialized data (where
> Writable#readFields() has not been called) when some of the files being
> joined have no records in them.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.