[ 
https://issues.apache.org/jira/browse/PIG-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13185145#comment-13185145
 ] 

Daniel Dai commented on PIG-2462:
---------------------------------

Patch looks good. Test is a little complex, but is possible. We need to add a 
testcase.
                
> getWrappedSplit is incorrectly returning the first split instead of the 
> current split.
> --------------------------------------------------------------------------------------
>
>                 Key: PIG-2462
>                 URL: https://issues.apache.org/jira/browse/PIG-2462
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.9.1, 0.11
>            Reporter: Alex Rovner
>             Fix For: 0.11
>
>         Attachments: split_fix_take2.patch, splitsfix.patch
>
>
> If your loader needs information regarding what file is currently is being 
> read (lets say for schema information), currently provides this ability by 
> calling prepareToRead every time we read a new split. This is critical for 
> ComibinedInputFormat as each mapper can read more then one file. In order for 
> the load function to know what file we are currently reading, it should call 
> getWrappedSplit() to get that information. How ever, getWrappedSplit always 
> returns the first split in the list. Code from PigSplit.java:
>     /**
>      * This methods returns the actual InputSplit (as returned by the 
>      * {@link InputFormat}) which this class is wrapping.
>      * @return the wrappedSplit
>      */
>     public InputSplit getWrappedSplit() {
>         return wrappedSplits[0];
>     }
> Furthermore, in PigRecordReader.java the splitIndex is never incremented when 
> changing from split to split. So in fact, even if getWrappedSplit() wold be 
> changed to return wrappedSplits[splitIndex]; it would still return the 
> incorrect index. 
> This can be fixed by changing PigRecordReader to increment 
> PigSplit.splitIndex everytime the split chagnes in the following code:
>     /**
>      * Get the record reader for the next chunk in this CombineFileSplit.
>      */
>     protected boolean initNextRecordReader() throws IOException, 
> InterruptedException {
>         if (curReader != null) {
>             curReader.close();
>             curReader = null;
>             if (idx > 0) {
>                 progress += pigSplit.getLength(idx-1);    // done processing 
> so far
>             }
>         }
>         // if all chunks have been processed, nothing more to do.
>         if (idx == pigSplit.getNumPaths()) {
>             return false;
>         }
>         // get a record reader for the idx-th chunk
>         try {
>           
>             curReader =  
> inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
>             LOG.info("Current split being processed 
> "+pigSplit.getWrappedSplit(idx));
>             if (idx > 0) {
>                 // initialize() for the first RecordReader will be called by 
> MapTask;
>                 // we're responsible for initializing subsequent 
> RecordReaders.
>                 curReader.initialize(pigSplit.getWrappedSplit(idx), context);
>                 pigSplit.get
>                 loadfunc.prepareToRead(curReader, pigSplit);
>             }
>         } catch (Exception e) {
>             throw new RuntimeException (e);
>         }
>         idx++;
>         return true;
>     }
> }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to