gates
Mon, 14 Apr 2008 14:04:39 -0700
Author: gates Date: Mon Apr 14 14:04:05 2008 New Revision: 647997 URL: http://svn.apache.org/viewvc?rev=647997&view=rev Log: PIG-188: Fix mismatches between pig slicer changes and new streaming feature. Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/Slice.java incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Mon Apr 14 14:04:05 2008 @@ -228,3 +228,6 @@ 1k caused pig to freeze. (kali via gates) PIG-204: Repair broken input splits (acmurthy via gates). + + PIG-188: Fix mismatches between pig slicer changes and new streaming + feature (acmurthy via gates). Modified: incubator/pig/trunk/src/org/apache/pig/Slice.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Slice.java?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/Slice.java (original) +++ incubator/pig/trunk/src/org/apache/pig/Slice.java Mon Apr 14 14:04:05 2008 @@ -41,6 +41,11 @@ void init(DataStorage store) throws IOException; /** + * Returns the offset from which data in this Slice will be processed. + */ + long getStart(); + + /** * Returns the length in bytes of all of the data that will be processed by * this Slice. * <p> Modified: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java Mon Apr 14 14:04:05 2008 @@ -48,6 +48,10 @@ return new String[] { file }; } + public long getStart() { + return start; + } + public long getLength() { return length; } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Mon Apr 14 14:04:05 2008 @@ -92,6 +92,15 @@ DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job)); store.setActiveContainer(store.asContainer("/user/" + job.getUser())); wrapped.init(store); + + // Mimic org.apache.hadoop.mapred.FileSplit if feasible... + String[] locations = wrapped.getLocations(); + if (locations.length > 0) { + job.set("map.input.file", locations[0]); + job.setLong("map.input.start", wrapped.getStart()); + job.setLong("map.input.length", wrapped.getLength()); + } + return new RecordReader<Text, Tuple>() { public void close() throws IOException { Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Mon Apr 14 14:04:05 2008 @@ -187,12 +187,13 @@ } processError("\nCommand: " + sb.toString()); processError("\nStart time: " + new Date(System.currentTimeMillis())); - processError("\nInput-split file: " + job.get("map.input.file")); - processError("\nInput-split start-offset: " + - job.getLong("map.input.start", -1)); - processError("\nInput-split length: " + - job.getLong("map.input.length", -1)); - + if (job.getBoolean("mapred.task.is.map", false)) { + processError("\nInput-split file: " + job.get("map.input.file")); + processError("\nInput-split start-offset: " + + job.getLong("map.input.start", -1)); + processError("\nInput-split length: " + + job.getLong("map.input.length", -1)); + } processError("\n===== * * * =====\n"); } Modified: incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java?rev=647997&r1=647996&r2=647997&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java Mon Apr 14 14:04:05 2008 @@ -68,6 +68,10 @@ return new String[0]; } + public long getStart() { + return 0; + } + public long getPos () throws IOException {