[ https://issues.apache.org/jira/browse/HADOOP-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12710040#action_12710040 ]
He Yongqiang commented on HADOOP-5368: -------------------------------------- One possible solution: 1) Splitting TrackedRecordReader out of MapTask, and add in a public method getRawRecordReader() to return the underlying recordreader. User can cast the passed in RecordReader into TrackedRecordReader in their MapRunnable's run() method. and 2) Since Hadoop introduced a new set of MR API, we also need to add in a public getRecordReader method in class MapContext. BTW, this is not a specific need, Prasad Chakka also talked a problem related to this: {quote} Hi, I am currently using getPos() method of RecordReader to get the current position of the row and use it in indexing. HADOOP-1230 removed this method in the new RecordReader class in org.apache.hadoop.mapreduce package. I didn't find any explicit reason for this in the JIRA. Are there any concerns to adding this method back and possibly something like seek(pos)? Thanks, Prasad {quote} Besides getPos(), I think the seek function maybe also useful for user. I do not know if there is a reason why current implementation hide RecordReader so well. > more user control on customized RecordReader > -------------------------------------------- > > Key: HADOOP-5368 > URL: https://issues.apache.org/jira/browse/HADOOP-5368 > Project: Hadoop Core > Issue Type: Wish > Reporter: He Yongqiang > > Currently user can define own InputFormat and RecordReader, but the user has > little control on them. > An example, we input mutiple files into the mapper and want to handle them in > different ways depending on which file this mapper is working. > This can be easily done as follows: > {code} > public class BlockMapRunner implements MapRunnable { > private BlockMapper mapper; > @Override > public void run(RecordReader input, OutputCollector output, > Reporter reporter) throws IOException { > if (mapper == null) > return; > BlockReader blkReader = (BlockReader) input; > this.mapper.initialize(input); > ........... > } > @Override > public void configure(JobConf job) { > JobConf work = new JobConf(job); > Class<? extends BlockMapper> mapCls = > work.getBlockMapperClass(); > if (mapCls != null) { > this.mapper = (BlockMapper) ReflectionUtils > .newInstance(mapCls, job); > } > } > } > /* > BlockMapper implements the Mapper and is initialized from RecordReader, from > which we get which file this mapper is working on and find the right strategy > for it. > */ > public class ExtendedMapper extends BlockMapper { > private Strategy strategy; > private Configuration work; > @Override > public void configure(Configuration job) { > this.work = job; > } > @Override > public void initialize(RecordReader reader) throws IOException { > String path = ((UserDefinedRecordReader) > reader).which_File_We_Are_Working_On(); //((UserDefinedRecordReader) > reader) is wrong! > this.strategy = this.work.getStrategy(path); > } > @Override > public void map(Key k, V value, OutputCollector output, Reporter > reporter) > throws IOException { > strategy.handle(k,v); > } > } > {code} > {color:red} > However, the above code does not work. The reader passed into mapper is > wrapped by MapTask, and is either SkippingRecordReader or > TrackedRecordReader. We can not cast it back and we can not pass any > information through the user defined RecordReader. If the > SkippingRecordReader and TrackedRecordReader have a method for getting the > raw reader, it will not have this problem. > {color:} > This problem could be resolved by initiating many map-reduce jobs,one job for > each file. But this apparently is what we want. > Or there exist other solutions? > Appreciated for any comments. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.