Author: knoguchi
Date: Mon Jun 26 16:37:39 2017
New Revision: 1799947
URL: http://svn.apache.org/viewvc?rev=1799947&view=rev
Log:
PIG-4548: Records Lost With Specific Combination of Commands and Streaming
Function (knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 26 16:37:39 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4548: Records Lost With Specific Combination of Commands and Streaming
Function (knoguchi)
+
PIG-5262: Fix jdiff related issues: fail build upon error, correct xml
character escaping (szita)
PIG-5225: Several unit tests are not annotated with @Test (nkollar via rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
Mon Jun 26 16:37:39 2017
@@ -272,7 +272,19 @@ public class PODemux extends PhysicalOpe
}
} else {
curLeaf = leaf;
- res = leaf.getNextTuple();
+ while (true) {
+ res = leaf.getNextTuple();
+
+ if (res.returnStatus == POStatus.STATUS_OK ||
+ res.returnStatus == POStatus.STATUS_EOP ||
+ res.returnStatus == POStatus.STATUS_ERR) {
+ break;
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ // this "else if" is unnecessary but keeping it to
+ // be sync with runPipeline()
+ continue;
+ }
+ }
if (res.returnStatus == POStatus.STATUS_EOP) {
processedSet.set(idx++);
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon Jun 26 16:37:39
2017
@@ -65,6 +65,14 @@ public class TestMultiQuery {
deleteOutputFiles();
}
+ private static final String simpleEchoStreamingCommand;
+ static {
+ if (Util.WINDOWS)
+ simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ else
+ simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ }
+
@Before
public void setUp() throws Exception {
deleteOutputFiles();
@@ -955,6 +963,40 @@ public class TestMultiQuery {
.translateSchema(myPig.dumpSchema("C2")),
Util.isSparkExecType(Util.getLocalTestMode()));
}
+ @Test
+ public void testMultiQueryJiraPig4548() throws Exception {
+ Storage.Data data = Storage.resetData(myPig);
+ data.set("inputLocation",
+ Storage.tuple("1", "f"),
+ Storage.tuple("2", "f"),
+ Storage.tuple("3", "f"),
+ Storage.tuple("4", "f"),
+ Storage.tuple("5", "f"),
+ Storage.tuple("6", "f"));
+ myPig.setBatchOn();
+ myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as
(f1:chararray, f2:chararray);");
+ myPig.registerQuery("SPLIT A into T if (f2 == 'T'), F otherwise;");
+ myPig.registerQuery("T2 = group T by f1;");
+ myPig.registerQuery("store T2 into 'output1' using mock.Storage();");
+ myPig.registerQuery("F2 = group F by f1;");
+ myPig.registerQuery("F3 = stream F2 through `" +
simpleEchoStreamingCommand
+ + "` as (group:chararray, F:bag{tuple(f1:
chararray,f2: chararray)});");
+ myPig.registerQuery("store F3 into 'output2' using mock.Storage();");
+ myPig.executeBatch();
+
+ List<Tuple> actualResults = data.get("output1");
+ assertEquals("Number of records for output1 should be
0",0,actualResults.size());
+ actualResults = data.get("output2");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[]{"('1',{('1','f')})",
+ "('2',{('2','f')})",
+ "('3',{('3','f')})",
+ "('4',{('4','f')})",
+ "('5',{('5','f')})",
+ "('6',{('6','f')})"});
+ Util.checkQueryOutputsAfterSort(actualResults.iterator(),
expectedResults);
+ }
+
//
--------------------------------------------------------------------------
// Helper methods