more user control on customized RecordReader
--------------------------------------------

                 Key: HADOOP-5368
                 URL: https://issues.apache.org/jira/browse/HADOOP-5368
             Project: Hadoop Core
          Issue Type: Wish
            Reporter: he yongqiang


Currently user can define own InputFormat and RecordReader, but the user has 
little control on them. 
An example, we input mutiple files into the mapper and want to handle them in 
different ways depending on which file this mapper is working.
This can be easily done as follows:
      public class BlockMapRunner implements MapRunnable {

        private BlockMapper mapper;

        @Override
        public void run(RecordReader input, OutputCollector output,
                        Reporter reporter) throws IOException {
                if (mapper == null)
                        return;
                BlockReader blkReader = (BlockReader) input;
                this.mapper.initialize(input);
                ...........
        }

        @Override
        public void configure(JobConf job) {
                JobConf work = new JobConf(job);
                Class<? extends BlockMapper> mapCls = 
work.getBlockMapperClass();
                if (mapCls != null) {
                        this.mapper = (BlockMapper) ReflectionUtils
                                        .newInstance(mapCls, job);
                }
        }
}

BlockMapper implements the Mapper and is initialized from RecordReader, from 
which we get which file this mapper is working on and find the right strategy 
for it.


public class ExtendedMapper extends BlockMapper {

        private Strategy strategy;

        private Configuration work;

        @Override
        public void configure(Configuration job) {
                this.work = job;
        }

        @Override
        public void initialize(RecordReader reader) throws IOException {
                String path = ((UserDefinedRecordReader) 
reader).which_File_We_Are_Working_On();   //((UserDefinedRecordReader) reader) 
is wrong!
                this.strategy = this.work.getStrategy(path);
        }

        @Override
        public void map(Key k, V value, OutputCollector output, Reporter 
reporter)
                        throws IOException {
                strategy.handle(k,v);
        }
}


However, the above code does not work. The reader passed into mapper is wrapped 
by MapTask, and is either SkippingRecordReader or TrackedRecordReader. We can 
not cast it back and we can not pass any information through the user defined 
RecordReader. If the SkippingRecordReader and TrackedRecordReader have a method 
for getting the raw reader, it will not have this problem.

This problem could be resolved by initiating many map-reduce jobs,one job for 
each file. But this apparently is what we want.

Or there exist other solutions? 
Appreciated for any comments.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to