getWrappedSplit is incorrectly returning the fist split instead of the current 
split.
-------------------------------------------------------------------------------------

                 Key: PIG-2462
                 URL: https://issues.apache.org/jira/browse/PIG-2462
             Project: Pig
          Issue Type: Bug
            Reporter: Alex Rovner
             Fix For: 0.9.1, 0.11


If your loader needs information regarding what file is currently is being read 
(lets say for schema information), currently provides this ability by calling 
prepareToRead every time we read a new split. This is critical for 
ComibinedInputFormat as each mapper can read more then one file. In order for 
the load function to know what file we are currently reading, it should call 
getWrappedSplit() to get that information. How ever, getWrappedSplit always 
returns the first split in the list. Code from PigSplit.java:

    /**
     * This methods returns the actual InputSplit (as returned by the 
     * {@link InputFormat}) which this class is wrapping.
     * @return the wrappedSplit
     */
    public InputSplit getWrappedSplit() {
        return wrappedSplits[0];
    }


Furthermore, in PigRecordReader.java the splitIndex is never incremented when 
changing from split to split. So in fact, even if getWrappedSplit() wold be 
changed to return wrappedSplits[splitIndex]; it would still return the 
incorrect index. 

This can be fixed by changing PigRecordReader to increment PigSplit.splitIndex 
everytime the split chagnes in the following code:


    /**
     * Get the record reader for the next chunk in this CombineFileSplit.
     */
    protected boolean initNextRecordReader() throws IOException, 
InterruptedException {

        if (curReader != null) {
            curReader.close();
            curReader = null;
            if (idx > 0) {
                progress += pigSplit.getLength(idx-1);    // done processing so 
far
            }
        }

        // if all chunks have been processed, nothing more to do.
        if (idx == pigSplit.getNumPaths()) {
            return false;
        }

        // get a record reader for the idx-th chunk
        try {
          

            curReader =  
inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
            LOG.info("Current split being processed 
"+pigSplit.getWrappedSplit(idx));

            if (idx > 0) {
                // initialize() for the first RecordReader will be called by 
MapTask;
                // we're responsible for initializing subsequent RecordReaders.
                curReader.initialize(pigSplit.getWrappedSplit(idx), context);
                pigSplit.get
                loadfunc.prepareToRead(curReader, pigSplit);
            }
        } catch (Exception e) {
            throw new RuntimeException (e);
        }
        idx++;
        return true;
    }
}


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to