olga
Fri, 02 May 2008 13:06:57 -0700
Author: olga Date: Fri May 2 13:06:31 2008 New Revision: 652887 URL: http://svn.apache.org/viewvc?rev=652887&view=rev Log: PIG-228: make multiple streaming outputs adhere to spec Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652887&r1=652886&r2=652887&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Fri May 2 13:06:31 2008 @@ -260,3 +260,5 @@ PIG-226: fix for streaming optimization bug (acmurthy via olgan) + PIG-228: make multiple streaming outputs adhere to spec (acmurthy via olgan) + Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=652887&r1=652886&r2=652887&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri May 2 13:06:31 2008 @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.text.NumberFormat; import java.util.Date; import java.util.List; import java.util.Properties; @@ -45,6 +46,16 @@ * of the managed process and also persists the logs of the tasks on HDFS. */ public class HadoopExecutableManager extends ExecutableManager { + // The part-<partition> file name, similar to Hadoop's outputs + private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); + static { + NUMBER_FORMAT.setMinimumIntegerDigits(5); + NUMBER_FORMAT.setGroupingUsed(false); + } + + static String getOutputName(int partition) { + return "part-" + NUMBER_FORMAT.format(partition); + } JobConf job; @@ -122,10 +133,13 @@ for (int i=1; i < outputSpecs.size(); ++i) { String fileName = outputSpecs.get(i).getName(); try { + int partition = job.getInt("mapred.task.partition", -1); fs.copyFromLocalFile(false, true, new Path(fileName), - new Path(scriptOutputDir, - taskId+"-"+fileName) - ); + new Path( + new Path(scriptOutputDir, + fileName), + getOutputName(partition)) + ); } catch (IOException ioe) { System.err.println("Failed to save secondary output '" + fileName + "' of task: " + taskId + Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652887&r1=652886&r2=652887&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Fri May 2 13:06:31 2008 @@ -324,9 +324,9 @@ "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";", "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";", "while (<STDIN>) {", - " print OUTFILE \"A,10\n\";", + " print OUTFILE \"$_\n\";", " print STDERR \"STDERR: $_\n\";", - " print OUTFILE2 \"Secondary Output: $_\n\";", + " print OUTFILE2 \"A,10\n\";", "}", }; File command = Util.createInputFile("script", "pl", script); @@ -354,7 +354,8 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); + InputStream op = FileLocalizer.open(output+"/bar", + pigServer.getPigContext()); PigStorage ps = new PigStorage(","); ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); List<Tuple> outputs = new ArrayList<Tuple>(); @@ -388,7 +389,7 @@ " chomp $_;", " print OUTFILE \"$_\n\";", " print STDERR \"STDERR: $_\n\";", - " print OUTFILE2 \"Secondary Output: $_\n\";", + " print OUTFILE2 \"$_\n\";", "}", }; File command = Util.createInputFile("script", "pl", script); @@ -417,7 +418,8 @@ pigServer.deleteFile(output); pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); - InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); + InputStream op = FileLocalizer.open(output+"/foobar", + pigServer.getPigContext()); PigStorage ps = new PigStorage(","); ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); List<Tuple> outputs = new ArrayList<Tuple>();