Author: knoguchi Date: Mon Jun 26 16:44:24 2017 New Revision: 1799949 URL: http://svn.apache.org/viewvc?rev=1799949&view=rev Log: PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi)
Modified: pig/branches/branch-0.17/CHANGES.txt pig/branches/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java pig/branches/branch-0.17/test/org/apache/pig/test/TestMultiQuery.java Modified: pig/branches/branch-0.17/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/branches/branch-0.17/CHANGES.txt?rev=1799949&r1=1799948&r2=1799949&view=diff ============================================================================== --- pig/branches/branch-0.17/CHANGES.txt (original) +++ pig/branches/branch-0.17/CHANGES.txt Mon Jun 26 16:44:24 2017 @@ -18,6 +18,18 @@ Pig Change Log +Release 0.17.1 - Unreleased + +INCOMPATIBLE CHANGES + +IMPROVEMENTS + +OPTIMIZATIONS +Â +BUG FIXES + +PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi) + Release 0.17.0 Â INCOMPATIBLE CHANGES Modified: pig/branches/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=1799949&r1=1799948&r2=1799949&view=diff ============================================================================== --- pig/branches/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original) +++ pig/branches/branch-0.17/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Mon Jun 26 16:44:24 2017 @@ -272,7 +272,19 @@ public class PODemux extends PhysicalOpe } } else { curLeaf = leaf; - res = leaf.getNextTuple(); + while (true) { + res = leaf.getNextTuple(); + + if (res.returnStatus == POStatus.STATUS_OK || + res.returnStatus == POStatus.STATUS_EOP || + res.returnStatus == POStatus.STATUS_ERR) { + break; + } else if (res.returnStatus == POStatus.STATUS_NULL) { + // this "else if" is unnecessary but keeping it to + // be sync with runPipeline() + continue; + } + } if (res.returnStatus == POStatus.STATUS_EOP) { processedSet.set(idx++); Modified: pig/branches/branch-0.17/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/pig/branches/branch-0.17/test/org/apache/pig/test/TestMultiQuery.java?rev=1799949&r1=1799948&r2=1799949&view=diff ============================================================================== --- pig/branches/branch-0.17/test/org/apache/pig/test/TestMultiQuery.java (original) +++ pig/branches/branch-0.17/test/org/apache/pig/test/TestMultiQuery.java Mon Jun 26 16:44:24 2017 @@ -65,6 +65,14 @@ public class TestMultiQuery { deleteOutputFiles(); } + private static final String simpleEchoStreamingCommand; + static { + if (Util.WINDOWS) + simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'"; + else + simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'"; + } + @Before public void setUp() throws Exception { deleteOutputFiles(); @@ -955,6 +963,40 @@ public class TestMultiQuery { .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode())); } + @Test + public void testMultiQueryJiraPig4548() throws Exception { + Storage.Data data = Storage.resetData(myPig); + data.set("inputLocation", + Storage.tuple("1", "f"), + Storage.tuple("2", "f"), + Storage.tuple("3", "f"), + Storage.tuple("4", "f"), + Storage.tuple("5", "f"), + Storage.tuple("6", "f")); + myPig.setBatchOn(); + myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (f1:chararray, f2:chararray);"); + myPig.registerQuery("SPLIT A into T if (f2 == 'T'), F otherwise;"); + myPig.registerQuery("T2 = group T by f1;"); + myPig.registerQuery("store T2 into 'output1' using mock.Storage();"); + myPig.registerQuery("F2 = group F by f1;"); + myPig.registerQuery("F3 = stream F2 through `" + simpleEchoStreamingCommand + + "` as (group:chararray, F:bag{tuple(f1: chararray,f2: chararray)});"); + myPig.registerQuery("store F3 into 'output2' using mock.Storage();"); + myPig.executeBatch(); + + List<Tuple> actualResults = data.get("output1"); + assertEquals("Number of records for output1 should be 0",0,actualResults.size()); + actualResults = data.get("output2"); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[]{"('1',{('1','f')})", + "('2',{('2','f')})", + "('3',{('3','f')})", + "('4',{('4','f')})", + "('5',{('5','f')})", + "('6',{('6','f')})"}); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); + } + // -------------------------------------------------------------------------- // Helper methods