[
https://issues.apache.org/jira/browse/HADOOP-5368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12680491#action_12680491
]
he yongqiang commented on HADOOP-5368:
--------------------------------------
Actually the FileSplit solution does not apply for everywhere, sometimes we can
not determine exactly how to process a file according its path. Currently the
FileSplit solution complicates our code a lot. In our simple situation there
are many input files for maps, and these input files are further splited into
groups, by the group we can determine how to process a file. Sometimes we
donot know in advance which group a file belongs to, so we have to find other
ways to determine its group rather than by looking at its path.
Any comments are appreciated!
> more user control on customized RecordReader
> --------------------------------------------
>
> Key: HADOOP-5368
> URL: https://issues.apache.org/jira/browse/HADOOP-5368
> Project: Hadoop Core
> Issue Type: Wish
> Reporter: he yongqiang
>
> Currently user can define own InputFormat and RecordReader, but the user has
> little control on them.
> An example, we input mutiple files into the mapper and want to handle them in
> different ways depending on which file this mapper is working.
> This can be easily done as follows:
> {code}
> public class BlockMapRunner implements MapRunnable {
> private BlockMapper mapper;
> @Override
> public void run(RecordReader input, OutputCollector output,
> Reporter reporter) throws IOException {
> if (mapper == null)
> return;
> BlockReader blkReader = (BlockReader) input;
> this.mapper.initialize(input);
> ...........
> }
> @Override
> public void configure(JobConf job) {
> JobConf work = new JobConf(job);
> Class<? extends BlockMapper> mapCls =
> work.getBlockMapperClass();
> if (mapCls != null) {
> this.mapper = (BlockMapper) ReflectionUtils
> .newInstance(mapCls, job);
> }
> }
> }
> /*
> BlockMapper implements the Mapper and is initialized from RecordReader, from
> which we get which file this mapper is working on and find the right strategy
> for it.
> */
> public class ExtendedMapper extends BlockMapper {
> private Strategy strategy;
> private Configuration work;
> @Override
> public void configure(Configuration job) {
> this.work = job;
> }
> @Override
> public void initialize(RecordReader reader) throws IOException {
> String path = ((UserDefinedRecordReader)
> reader).which_File_We_Are_Working_On(); //((UserDefinedRecordReader)
> reader) is wrong!
> this.strategy = this.work.getStrategy(path);
> }
> @Override
> public void map(Key k, V value, OutputCollector output, Reporter
> reporter)
> throws IOException {
> strategy.handle(k,v);
> }
> }
> {code}
> {color:red}
> However, the above code does not work. The reader passed into mapper is
> wrapped by MapTask, and is either SkippingRecordReader or
> TrackedRecordReader. We can not cast it back and we can not pass any
> information through the user defined RecordReader. If the
> SkippingRecordReader and TrackedRecordReader have a method for getting the
> raw reader, it will not have this problem.
> {color:}
> This problem could be resolved by initiating many map-reduce jobs,one job for
> each file. But this apparently is what we want.
> Or there exist other solutions?
> Appreciated for any comments.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.