olga
Thu, 03 Apr 2008 11:41:04 -0700
Author: olga Date: Thu Apr 3 11:40:33 2008 New Revision: 644437 URL: http://svn.apache.org/viewvc?rev=644437&view=rev Log: PIG-174,180: bug fixes in streaming Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=644437&r1=644436&r2=644437&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu Apr 3 11:40:33 2008 @@ -196,6 +196,7 @@ PIG-122: Added build and src-gen to the list of ignore files in the top level directory (joa23 via gates). - PIG-94: M3 code update for streaming + PIG-94: M3 code update for streaming (arunc via olgan) - PIG-55: added custom splitter + PIG-55: added custom splitter (groves via olgan) + PIG-74,180: bug fixes in streaming (arunc via olgan) Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=644437&r1=644436&r2=644437&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Thu Apr 3 11:40:33 2008 @@ -104,9 +104,9 @@ oc = output; - setupMapPipe(properties, reporter); - try { + setupMapPipe(properties, reporter); + // allocate key & value instances that are re-used for all entries WritableComparable key = input.createKey(); Writable value = input.createValue(); @@ -115,12 +115,15 @@ } } finally { try { - evalPipe.finishPipe(); // EOF marker - evalPipe = null; + if (evalPipe != null) { + evalPipe.finishPipe(); // EOF marker + evalPipe = null; + } } finally { // Close the writer if (pigWriter != null) { pigWriter.close(reporter); + pigWriter = null; } } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=644437&r1=644436&r2=644437&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Thu Apr 3 11:40:33 2008 @@ -69,12 +69,7 @@ // and input files are to be processed as-is StreamSpec streamSpec = (StreamSpec)spec; StreamingCommand command = streamSpec.getCommand(); - List<HandleSpec> inputSpecs = - command.getHandleSpecs(Handle.INPUT); - HandleSpec streamInputSpec = - (inputSpecs == null) ? - new HandleSpec("stdin" , "PigStorage()") : - inputSpecs.get(0); + HandleSpec streamInputSpec = command.getInputSpec(); FileSpec loadFileSpec = load.getInputFileSpec(); @@ -91,7 +86,9 @@ // Since they both are the same, we can flip them // for BinaryStorage load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName())); + streamInputSpec.setSpec(BinaryStorage.class.getName()); + command.setInputSpec(streamInputSpec); optimize = true; } Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=644437&r1=644436&r2=644437&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Thu Apr 3 11:40:33 2008 @@ -94,12 +94,7 @@ // Try and optimize if the store and stream output specs match StreamSpec streamSpec = (StreamSpec)spec; StreamingCommand command = streamSpec.getCommand(); - List<HandleSpec> outputSpecs = - command.getHandleSpecs(Handle.OUTPUT); - HandleSpec streamOutputSpec = - (outputSpecs == null) ? - new HandleSpec("stdout" , "PigStorage()") : - outputSpecs.get(0); + HandleSpec streamOutputSpec = command.getOutputSpec(); FileSpec storeFileSpec = s.getOutputFileSpec(); @@ -116,7 +111,9 @@ // Since they both are the same, we can flip them // for BinaryStorage s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName())); + streamOutputSpec.setSpec(BinaryStorage.class.getName()); + command.setOutputSpec(streamOutputSpec); optimize = true; } Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=644437&r1=644436&r2=644437&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Apr 3 11:40:33 2008 @@ -163,6 +163,64 @@ } /** + * Set the input specification for the <code>StreamingCommand</code>. + * + * @param spec input specification + */ + public void setInputSpec(HandleSpec spec) { + List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT); + if (inputSpecs == null || inputSpecs.size() == 0) { + addHandleSpec(Handle.INPUT, spec); + } else { + inputSpecs.set(0, spec); + } + } + + /** + * Get the input specification of the <code>StreamingCommand</code>. + * + * @return input specification of the <code>StreamingCommand</code> + */ + public HandleSpec getInputSpec() { + List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT); + if (inputSpecs == null || inputSpecs.size() == 0) { + addHandleSpec(Handle.INPUT, new HandleSpec("stdin", PigStorage.class.getName())); + } + return getHandleSpecs(Handle.INPUT).get(0); + } + + /** + * Set the specification for the primary output of the + * <code>StreamingCommand</code>. + * + * @param spec specification for the primary output of the + * <code>StreamingCommand</code> + */ + public void setOutputSpec(HandleSpec spec) { + List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT); + if (outputSpecs == null || outputSpecs.size() == 0) { + addHandleSpec(Handle.OUTPUT, spec); + } else { + outputSpecs.set(0, spec); + } + } + + /** + * Get the specification of the primary output of the + * <code>StreamingCommand</code>. + * + * @return specification of the primary output of the + * <code>StreamingCommand</code> + */ + public HandleSpec getOutputSpec() { + List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT); + if (outputSpecs == null || outputSpecs.size() == 0) { + addHandleSpec(Handle.OUTPUT, new HandleSpec("stdout", PigStorage.class.getName())); + } + return getHandleSpecs(Handle.OUTPUT).get(0); + } + + /** * Get specifications for the given <code>Handle</code>. * * @param handle <code>Handle</code> of the stream