Author: rohini
Date: Mon Oct 5 20:55:51 2015
New Revision: 1706921
URL: http://svn.apache.org/viewvc?rev=1706921&view=rev
Log:
PIG-4688: Limit followed by POPartialAgg can give empty or partial results in
Tez (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Oct 5 20:55:51 2015
@@ -49,6 +49,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4688: Limit followed by POPartialAgg can give empty or partial results in
Tez (rohini)
+
PIG-4635: NPE while running pig script in tez mode (daijy)
PIG-4683: Nested order is broken after PIG-3591 in some cases (daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Mon Oct 5 20:55:51 2015
@@ -34,6 +34,7 @@ import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -197,6 +198,18 @@ public class PigProcessor extends Abstra
runPipeline(leaf);
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP,
"false"))
+ && !execPlan.endOfAllInput) {
+ // If there is a stream in the pipeline or if this map job
belongs to merge-join we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream or it is not a merge-join in the
pipeline
+ execPlan.endOfAllInput = true;
+ runPipeline(leaf);
+ }
+
// Calling EvalFunc.finish()
UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan,
new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
Modified: pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLimitVariable.java?rev=1706921&r1=1706920&r2=1706921&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLimitVariable.java Mon Oct 5
20:55:51 2015
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
@@ -64,20 +65,24 @@ public class TestLimitVariable {
@Test
public void testLimitVariable1() throws IOException {
+
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,
"" + true);
String query =
- "a = load '" + inputFile.getName() + "';" +
+ "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" +
"b = group a all;" +
"c = foreach b generate COUNT(a) as sum;" +
"d = order a by $0 DESC;" +
- "e = limit d c.sum/2;" // return top half of the tuples
+ "e = limit d c.sum/2;" + // return top half of the tuples
+ "f = group e all;" +
+ "g = foreach f generate AVG(e.$0), SUM(e.$1);"
;
Util.registerMultiLineQuery(pigServer, query);
- Iterator<Tuple> it = pigServer.openIterator("e");
+ Iterator<Tuple> it = pigServer.openIterator("g");
List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new
String[] {
- "(6,15)", "(5,10)", "(4,11)" });
+ "(5.0,36)"});
Util.checkQueryOutputs(it, expectedRes);
+
pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
}
@Test