[ 
https://issues.apache.org/jira/browse/HADOOP-3630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12607685#action_12607685
 ] 

jly edited comment on HADOOP-3630 at 6/24/08 10:08 AM:
--------------------------------------------------------------

I've pasted an example program below that demonstrates this:

{code:java}
        public static final void main(String[] args) throws Exception {
                String input = args[0], output = args[1];

                JobConf conf = new JobConf(TestJoin.class);
                
                FileSystem fs = FileSystem.get(conf);
                Path inputPath = new Path(input);
                
                Path[] src = createInputFiles(inputPath,fs,conf);
                
                String expr = CompositeInputFormat.compose("outer", 
SequenceFileInputFormat.class, src);
                conf.set("mapred.join.expr", expr);
                
                conf.addInputPath(inputPath);
                conf.setOutputPath(new Path(output));
                
                conf.setInputFormat(CompositeInputFormat.class);
                conf.setOutputFormat(SequenceFileOutputFormat.class);
                
                conf.setOutputKeyClass(ExampleWritable.class);
                conf.setOutputValueClass(TupleWritable.class);
                
                conf.setNumReduceTasks(0);
                
                conf.setMapperClass(IdentityMapper.class);
                conf.setReducerClass(IdentityReducer.class);
                
                JobClient.runJob(conf);
        }

        public static class ExampleWritable implements WritableComparable       
{
                String str = null;

                public void readFields(DataInput in) throws IOException {
                        str = in.readUTF();
                }

                public void write(DataOutput out) throws IOException    {
                        out.writeUTF(str);
                }

                public int compareTo(Object o){
                        ExampleWritable other = (ExampleWritable) o;
                        return str.compareTo(other.str);
                }
                
        }
        
        private static Path[] createInputFiles(Path inputPath, FileSystem fs, 
JobConf conf)
                throws IOException      {
                Path[] src = new Path[10];
                
                for (int i = 0; i < src.length; ++i)    {
                        src[i] = new Path(inputPath, Integer.toString(i + 10, 
36));
                }
            
                SequenceFile.Writer out[] = new SequenceFile.Writer[10];
                for (int i = 0; i < src.length; ++i)    {
                        out[i] = new SequenceFile.Writer(fs, conf, src[i],
                                        ExampleWritable.class, 
ExampleWritable.class);
                }
                
                ExampleWritable key = new ExampleWritable();
                ExampleWritable val = new ExampleWritable();

                // write arbitrary values to only one of the sequence files
                for (int i=0; i< 10; i++)       {
                        key.str = String.valueOf(i);
                        val.str = String.valueOf(i);
                        out[1].append(key, val);
                }
                
                for (SequenceFile.Writer o:out) {
                        o.close();
                }
                
                return src;
        }

{code}

(I hope that still compiles and makes sense :S) 

which gives me the following exception when I run it:

{code}
java.lang.NullPointerException
        at com.detica.netreveal.hadoop.TestJoin$ExampleWritable.compare
To(TestJoin.java:81)
        at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.ja
va:109)
        at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:134)
        at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:132)
        at java.util.PriorityQueue.fixUp(PriorityQueue.java:544)
        at java.util.PriorityQueue.offer(PriorityQueue.java:304)
        at java.util.PriorityQueue.add(PriorityQueue.java:327)
        at org.apache.hadoop.mapred.join.CompositeRecordReader.add(CompositeReco
rdReader.java:138)
        at org.apache.hadoop.mapred.join.Parser$CNode.getRecordReader(Parser.jav
a:419)
        at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:142)
        at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:49)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:211)
        at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124
)
{code}

I think this occurs because CompositeRecordReader#add() [Hadoop 0.17.0, line 
138] doesn't call rr.hasNext() to check if the RecordReader has any records 
before it adds it to the PriorityQueue. Is this a bug or expected behaviour?


      was (Author: jly):
    I've pasted an example program below to demonstrate:

{code:java}
        public static final void main(String[] args) throws Exception {
                String input = args[0], output = args[1];

                JobConf conf = new JobConf(TestJoin.class);
                
                FileSystem fs = FileSystem.get(conf);
                Path inputPath = new Path(input);
                
                Path[] src = createInputFiles(inputPath,fs,conf);
                
                String expr = CompositeInputFormat.compose("outer", 
SequenceFileInputFormat.class, src);
                conf.set("mapred.join.expr", expr);
                
                conf.addInputPath(inputPath);
                conf.setOutputPath(new Path(output));
                
                conf.setInputFormat(CompositeInputFormat.class);
                conf.setOutputFormat(SequenceFileOutputFormat.class);
                
                conf.setOutputKeyClass(ExampleWritable.class);
                conf.setOutputValueClass(TupleWritable.class);
                
                conf.setNumReduceTasks(0);
                
                conf.setMapperClass(IdentityMapper.class);
                conf.setReducerClass(IdentityReducer.class);
                
                JobClient.runJob(conf);
        }

        public static class ExampleWritable implements WritableComparable       
{
                String str = null;

                public void readFields(DataInput in) throws IOException {
                        str = in.readUTF();
                }

                public void write(DataOutput out) throws IOException    {
                        out.writeUTF(str);
                }

                public int compareTo(Object o){
                        ExampleWritable other = (ExampleWritable) o;
                        return str.compareTo(other.str);
                }
                
        }
        
        private static Path[] createInputFiles(Path inputPath, FileSystem fs, 
JobConf conf)
                throws IOException      {
                Path[] src = new Path[10];
                
                for (int i = 0; i < src.length; ++i)    {
                        src[i] = new Path(inputPath, Integer.toString(i + 10, 
36));
                }
            
                SequenceFile.Writer out[] = new SequenceFile.Writer[10];
                for (int i = 0; i < src.length; ++i)    {
                        out[i] = new SequenceFile.Writer(fs, conf, src[i],
                                        ExampleWritable.class, 
ExampleWritable.class);
                }
                
                ExampleWritable key = new ExampleWritable();
                ExampleWritable val = new ExampleWritable();

                // write arbitrary values to only one of the sequence files
                for (int i=0; i< 10; i++)       {
                        key.str = String.valueOf(i);
                        val.str = String.valueOf(i);
                        out[1].append(key, val);
                }
                
                for (SequenceFile.Writer o:out) {
                        o.close();
                }
                
                return src;
        }

{code}

(I hope that still compiles and makes sense :S) 

which gives me the following exception when I run it:

{code}
java.lang.NullPointerException
        at com.detica.netreveal.hadoop.TestJoin$ExampleWritable.compare
To(TestJoin.java:81)
        at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.ja
va:109)
        at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:134)
        at org.apache.hadoop.mapred.join.CompositeRecordReader$2.compare(Composi
teRecordReader.java:132)
        at java.util.PriorityQueue.fixUp(PriorityQueue.java:544)
        at java.util.PriorityQueue.offer(PriorityQueue.java:304)
        at java.util.PriorityQueue.add(PriorityQueue.java:327)
        at org.apache.hadoop.mapred.join.CompositeRecordReader.add(CompositeReco
rdReader.java:138)
        at org.apache.hadoop.mapred.join.Parser$CNode.getRecordReader(Parser.jav
a:419)
        at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:142)
        at org.apache.hadoop.mapred.join.CompositeInputFormat.getRecordReader(Co
mpositeInputFormat.java:49)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:211)
        at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2124
)
{code}

I think this occurs because CompositeRecordReader#add() [Hadoop 0.17.0, line 
138] doesn't call rr.hasNext() to check if the RecordReader has any records 
before it adds it to the PriorityQueue. Is this a bug or expected behaviour?

  
> CompositeRecordReader: key and values can be in uninitialized state if files 
> being joined have no records
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-3630
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3630
>             Project: Hadoop Core
>          Issue Type: Bug
>          Components: mapred
>    Affects Versions: 0.17.0
>            Reporter: Jingkei Ly
>
> I am using org.apache.hadoop.mapred.join.CompositeInputFormat to do an 
> outer-join across a number of SequenceFiles. This works fine in most 
> circumstances, but I get NullPointerExceptions/uninitialized data (where 
> Writable#readFields() has not been called) when some of the files being 
> joined have no records in them.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to