olga
Thu, 01 May 2008 19:31:04 -0700
Author: olga Date: Thu May 1 19:30:37 2008 New Revision: 652735 URL: http://svn.apache.org/viewvc?rev=652735&view=rev Log: PIG-226: fix for streaming optimization bug Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652735&r1=652734&r2=652735&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu May 1 19:30:37 2008 @@ -257,3 +257,6 @@ PIG-151: fixes to code that handles bzip files PIG-222: fix build break + + PIG-226: fix for streaming optimization bug + Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Thu May 1 19:30:37 2008 @@ -20,6 +20,7 @@ import java.util.List; import org.apache.pig.LoadFunc; +import org.apache.pig.StoreFunc; import org.apache.pig.builtin.BinaryStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; @@ -73,27 +74,31 @@ FileSpec loadFileSpec = load.getInputFileSpec(); - // Instantiate both LoadFunc objects to compare them for - // equality - LoadFunc streamLoader = - (LoadFunc)PigContext.instantiateFuncFromSpec( + // Instantiate both to compare them for equality + StoreFunc streamStorer = + (StoreFunc)PigContext.instantiateFuncFromSpec( streamInputSpec.getSpec()); LoadFunc inputLoader = (LoadFunc)PigContext.instantiateFuncFromSpec( loadFileSpec.getFuncSpec()); - // Check if both LoadFunc objects belong to the same type + // Check if the streaming command's inputSpec also implements + // LoadFunc and if it does, are they of the same type? boolean sameType = false; try { - streamLoader.getClass().cast(inputLoader); - sameType = true; + // TODO: We should actually check if the streamStorer + // is _reversible_ as the inputLoader ... + if (streamStorer instanceof LoadFunc) { + streamStorer.getClass().cast(inputLoader); + sameType = true; + } } catch (ClassCastException cce) { sameType = false; } // Check if both LoadFunc objects belong to the same type and // are equivalent - if (sameType && streamLoader.equals(inputLoader)) { + if (sameType && streamStorer.equals(inputLoader)) { // Since they both are the same, we can flip them // for BinaryStorage load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName())); Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Thu May 1 19:30:37 2008 @@ -19,6 +19,7 @@ import java.util.List; +import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; import org.apache.pig.builtin.BinaryStorage; import org.apache.pig.impl.PigContext; @@ -98,28 +99,32 @@ FileSpec storeFileSpec = s.getOutputFileSpec(); - // Instantiate both StoreFunc objects to compare them for - // equality - StoreFunc streamStorer = - (StoreFunc)PigContext.instantiateFuncFromSpec( + // Instantiate both to compare them for equality + LoadFunc streamLoader = + (LoadFunc)PigContext.instantiateFuncFromSpec( streamOutputSpec.getSpec()); StoreFunc outputStorer = (StoreFunc)PigContext.instantiateFuncFromSpec( storeFileSpec.getFuncSpec()); - // Check if both LoadFunc objects belong to the same type + // Check if the streaming command's outputSpec also implements + // StoreFunc and if it does, are they of the same type? boolean sameType = false; try { - streamStorer.getClass().cast(outputStorer); - sameType = true; + // TODO: We should actually check if the streamLoader + // is _reversible_ as the outputStorer ... + if (streamLoader instanceof StoreFunc) { + streamLoader.getClass().cast(outputStorer); + sameType = true; + } } catch (ClassCastException cce) { sameType = false; } // Check if both LoadFunc objects belong to the same type and // are equivalent - if (sameType && streamStorer.equals(outputStorer)) { + if (sameType && streamLoader.equals(outputStorer)) { // Since they both are the same, we can flip them // for BinaryStorage s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName())); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652735&r1=652734&r2=652735&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May 1 19:30:37 2008 @@ -470,4 +470,42 @@ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); } + @Test + public void testLocalNegativeLoadStoreOptimization() throws Exception { + testNegativeLoadStoreOptimization(ExecType.LOCAL); + } + + @Test + public void testMRNegativeLoadStoreOptimization() throws Exception { + testNegativeLoadStoreOptimization(ExecType.MAPREDUCE); + } + + private void testNegativeLoadStoreOptimization(ExecType execType) + throws Exception { + PigServer pigServer = createPigServer(execType); + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", "D,2", + "A,5", "B,5", "C,8", "A,8", + "D,8", "A,9"}); + + // Expected results + String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; + int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + + // Pig query to run + pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + + "` input(stdin using PigDump());"); + pigServer.registerQuery("IP = load 'file:" + input + "' using " + + PigStorage.class.getName() + "(',') " + + "split by 'file';"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + pigServer.registerQuery("OP = stream FILTERED_DATA through `" + + simpleEchoStreamingCommand + "`;"); + + // Run the query and check the results + Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); + } + }