Author: rohini
Date: Tue Mar 22 10:44:07 2016
New Revision: 1736180

URL: http://svn.apache.org/viewvc?rev=1736180&view=rev
Log:
PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of 
no vertex groups (rohini)

Added:
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 22 10:44:07 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of 
no vertex groups (rohini)
+
 PIG-4843: Turn off combiner in reducer vertex for Tez if bags are in combine 
plan (rohini)
 
 PIG-4796: Authenticate with Kerberos using a keytab file (nielsbasjes via 
daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Tue Mar 22 10:44:07 2016
@@ -18,8 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -68,10 +70,23 @@ public class MultiQueryOptimizerTez exte
                 return;
             }
 
+            // Using a list instead of set to have consistently ordered plans
             List<TezOperator> splittees = new ArrayList<TezOperator>();
             Set<TezOperator> mergedNonPackageInputSuccessors = new 
HashSet<TezOperator>();
 
+            // When there is a union successor with unsupported storefunc, 
those splittees
+            // can only be merged into the split if all the union members will 
be from the split
+            // This is to ensure that there are no vertex groups created with 
unsupported storefunc.
+            Map<TezOperator, Set<OperatorKey>> tentativeMergeUnionMembers = 
new HashMap<TezOperator, Set<OperatorKey>>();
+
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
+
+            Set<OperatorKey> splitterAndSuccessorKeys = new 
HashSet<OperatorKey>();
+            splitterAndSuccessorKeys.add(tezOp.getOperatorKey());
+            for (TezOperator successor : successors) {
+                splitterAndSuccessorKeys.add(successor.getOperatorKey());
+            }
+
             for (TezOperator successor : successors) {
                 List<TezOperator> predecessors = new 
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
                 predecessors.remove(tezOp);
@@ -129,6 +144,7 @@ public class MultiQueryOptimizerTez exte
                 // Only in case of POPackage(POShuffleTezLoad) multiple inputs 
can be handled from a Split
                 Set<TezOperator> nonPackageInputSuccessors = new 
HashSet<TezOperator>();
                 boolean canMerge = true;
+                Set<TezOperator> successorUnsupportedStoreUnions = new 
HashSet<TezOperator>();
 
                 mergedSuccessors.addAll(successors);
                 for (TezOperator splittee : splittees) {
@@ -140,11 +156,21 @@ public class MultiQueryOptimizerTez exte
                     for (TezOperator succSuccessor : 
getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn &&
-                                    UnionOptimizer.isOptimizable(succSuccessor,
-                                            unionSupportedStoreFuncs,
-                                            unionUnsupportedStoreFuncs))) {
+                                    
UnionOptimizer.isOptimizable(succSuccessor))) {
                                 toNotMergeSuccessors.add(succSuccessor);
                             } else {
+                                if (unionOptimizerOn && 
!UnionOptimizer.isOptimizableStoreFunc(succSuccessor,unionSupportedStoreFuncs,unionUnsupportedStoreFuncs))
 {
+                                    // This optimization of using 
UnionOptimizer for unsupported storefunc
+                                    // is only good for one level of split and 
does not handle multiple level of split.
+                                    Set<OperatorKey> unionMembers = new 
HashSet<OperatorKey>(succSuccessor.getUnionMembers());
+                                    
unionMembers.removeAll(splitterAndSuccessorKeys);
+                                    if(unionMembers.isEmpty()) {
+                                        
successorUnsupportedStoreUnions.add(succSuccessor);
+                                    } else {
+                                        
toNotMergeSuccessors.add(succSuccessor);
+                                        continue;
+                                    }
+                                }
                                 toMergeSuccessors.add(succSuccessor);
                                 List<TezOperator> unionSuccessors = 
getPlan().getSuccessors(succSuccessor);
                                 if (unionSuccessors != null) {
@@ -187,11 +213,47 @@ public class MultiQueryOptimizerTez exte
 
                 mergedSuccessors.retainAll(toNotMergeSuccessors);
                 if (mergedSuccessors.isEmpty()) { // no shared edge after merge
-                    splittees.add(successor);
                     
mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
+                    if (successorUnsupportedStoreUnions.isEmpty()) {
+                        splittees.add(successor);
+                    } else {
+                        // If all other conditions were satisfied, but it had 
a successor union
+                        // with unsupported storefunc keep it in the tentative 
list
+                        for (TezOperator unionOp : 
successorUnsupportedStoreUnions) {
+                            Set<OperatorKey> tentativeSuccessors = 
tentativeMergeUnionMembers.get(unionOp);
+                            if (tentativeSuccessors == null) {
+                                tentativeSuccessors = new 
HashSet<OperatorKey>();
+                                tentativeMergeUnionMembers.put(unionOp, 
tentativeSuccessors);
+                            }
+                            
tentativeSuccessors.add(successor.getOperatorKey());
+                        }
+                    }
                 }
             }
 
+            Set<TezOperator> spliteesToRemove = new HashSet<TezOperator>();
+
+            for (Entry<TezOperator, Set<OperatorKey>> entry : 
tentativeMergeUnionMembers.entrySet()) {
+                Set<OperatorKey> unionMembers = new 
HashSet<OperatorKey>(entry.getKey().getUnionMembers());
+                if (entry.getValue().containsAll(unionMembers)) {
+                    // If all the union members were tentative splittees then 
add them
+                    for (OperatorKey key : entry.getValue()) {
+                        TezOperator splittee = getPlan().getOperator(key);
+                        if (!splittees.contains(splittee)) {
+                            splittees.add(splittee);
+                        }
+                    }
+                } else {
+                    for (OperatorKey key : entry.getValue()) {
+                        spliteesToRemove.add(getPlan().getOperator(key));
+                    }
+                }
+            }
+
+            for (TezOperator op : spliteesToRemove) {
+                splittees.remove(op);
+            }
+
             if (splittees.size() == 0) {
                 return;
             }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Tue Mar 22 10:44:07 2016
@@ -104,9 +104,7 @@ public class UnionOptimizer extends TezO
         this.unsupportedStoreFuncs = unsupportedStoreFuncs;
     }
 
-    public static boolean isOptimizable(TezOperator tezOp,
-            List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
-            throws VisitorException {
+    public static boolean isOptimizable(TezOperator tezOp) throws 
VisitorException {
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
             return false;
         }
@@ -116,6 +114,12 @@ public class UnionOptimizer extends TezO
         if (tezOp.isRankCounter()) {
             return false;
         }
+        return true;
+    }
+
+    public static boolean isOptimizableStoreFunc(TezOperator tezOp,
+            List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
+            throws VisitorException {
         if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
             List<POStoreTez> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
             for (POStoreTez store : stores) {
@@ -148,7 +152,7 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) 
{
+        if (!isOptimizable(tezOp)) {
             return;
         }
 
@@ -156,34 +160,40 @@ public class UnionOptimizer extends TezO
         String scope = unionOp.getOperatorKey().scope;
         PhysicalPlan unionOpPlan = unionOp.plan;
 
-        // TODO: PIG-3856 Handle replicated join and skewed join sample.
-        // Replicate join small table/skewed join sample that was broadcast to 
union vertex
-        // now needs to be broadcast to all the union predecessors. How do we 
do that??
-        // Wait for shared edge and do it or write multiple times??
-        // For now don't optimize except in the case of Split where we need to 
write only once
-
         Set<OperatorKey> uniqueUnionMembers = new 
HashSet<OperatorKey>(unionOp.getUnionMembers());
         List<TezOperator> predecessors = new 
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
         List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null 
? null
                 : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
 
+        if (uniqueUnionMembers.size() != 1) {
+
+            if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs, 
unsupportedStoreFuncs)) {
+                return;
+            }
 
-        if (successors != null && uniqueUnionMembers.size() > 1) {
-            for (TezOperator succ : successors) {
-                for (TezOperator pred : predecessors) {
-                    if (succ.inEdges.containsKey(pred.getOperatorKey())) {
-                        // Stop here, we cannot convert the node into vertex 
group
-                        // Otherwise, we will end up with a parallel edge 
between pred
-                        // and succ
-                        return;
+            if (successors != null) {
+                for (TezOperator succ : successors) {
+                    for (TezOperator pred : predecessors) {
+                        if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+                            // Stop here, we cannot convert the node into 
vertex group
+                            // Otherwise, we will end up with a parallel edge 
between pred
+                            // and succ
+                            return;
+                        }
                     }
                 }
             }
+
+            // TODO: PIG-3856 Handle replicated join and skewed join sample.
+            // Replicate join small table/skewed join sample that was 
broadcast to union vertex
+            // now needs to be broadcast to all the union predecessors. How do 
we do that??
+            // Wait for shared edge and do it or write multiple times??
+            // For now don't optimize except in the case of Split where we 
need to write only once
+            if (predecessors.size() > unionOp.getUnionMembers().size()) {
+                return;
+            }
         }
-        if (predecessors.size() > unionOp.getUnionMembers().size()
-                && uniqueUnionMembers.size() != 1) {
-            return; // TODO: PIG-3856
-        }
+
         if (uniqueUnionMembers.size() == 1) {
             // We actually don't need VertexGroup in this case. The multiple
             // sub-plans of Split can write to same MROutput or the Tez 
LogicalOutput

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Tue Mar 
22 10:44:07 2016
@@ -23,6 +23,7 @@ import static org.apache.pig.tools.pigst
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.pig.PigCounters;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -286,7 +288,14 @@ public class TezVertexStats extends JobS
             multiStoreCounters.putAll(msGroup);
         }
 
+        // Split followed by union will have multiple stores writing to same 
location
+        Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
         for (POStore sto : stores) {
+            POStoreTez store = (POStoreTez) sto;
+            uniqueOutputs.put(store.getOutputKey(), store);
+        }
+
+        for (POStore sto : uniqueOutputs.values()) {
             if (sto.isTmpStore()) {
                 continue;
             }

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld?rev=1736180&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
 (added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld
 Tue Mar 22 10:44:07 2016
@@ -0,0 +1,143 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-79    ->      Tez vertex scope-81,Tez vertex scope-98,Tez 
vertex scope-122,Tez vertex scope-124,
+Tez vertex scope-98    ->      Tez vertex scope-104,
+Tez vertex scope-122   ->      Tez vertex scope-81,
+Tez vertex scope-81    ->      Tez vertex scope-104,Tez vertex scope-126,
+Tez vertex scope-104
+Tez vertex scope-124   ->      Tez vertex scope-126,
+Tez vertex scope-126
+
+Tez vertex scope-79
+# Plan on vertex
+POValueOutputTez - scope-80    ->       [scope-122, scope-124, scope-81, 
scope-98]
+|
+|---a: New For Each(false,false)[bag] - scope-7
+    |   |
+    |   Cast[int] - scope-2
+    |   |
+    |   |---Project[bytearray][0] - scope-1
+    |   |
+    |   Cast[chararray] - scope-5
+    |   |
+    |   |---Project[bytearray][1] - scope-4
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-98
+# Plan on vertex
+POValueOutputTez - scope-106   ->       [scope-104]
+|
+|---POValueInputTez - scope-99 <-       scope-79
+Tez vertex scope-122
+# Plan on vertex
+Local Rearrange[tuple]{int}(false) - scope-67  ->       scope-81
+|   |
+|   Project[int][0] - scope-63
+|
+|---POValueInputTez - scope-123        <-       scope-79
+Tez vertex scope-81
+# Plan on vertex
+1-1: Split - scope-133
+|   |
+|   u3: Store(file:///tmp/output/u3:org.apache.pig.builtin.PigStorage) - 
scope-135     ->       scope-57
+|   |
+|   |---e: Filter[bag] - scope-43
+|       |   |
+|       |   Not[boolean] - scope-55
+|       |   |
+|       |   |---Or[boolean] - scope-54
+|       |       |
+|       |       |---Or[boolean] - scope-50
+|       |       |   |
+|       |       |   |---Greater Than[boolean] - scope-46
+|       |       |   |   |
+|       |       |   |   |---Project[int][0] - scope-44
+|       |       |   |   |
+|       |       |   |   |---Constant(5) - scope-45
+|       |       |   |
+|       |       |   |---Equal To[boolean] - scope-49
+|       |       |       |
+|       |       |       |---Project[int][0] - scope-47
+|       |       |       |
+|       |       |       |---Constant(7) - scope-48
+|       |       |
+|       |       |---Equal To[boolean] - scope-53
+|       |           |
+|       |           |---Project[int][0] - scope-51
+|       |           |
+|       |           |---Constant(8) - scope-52
+|   |
+|   d: Split - scope-132
+|   |   |
+|   |   u3: Store(file:///tmp/output/u3:org.apache.pig.builtin.PigStorage) - 
scope-134 ->       scope-57
+|   |   |
+|   |   POValueOutputTez - scope-128   ->       [scope-126]
+|   |   |
+|   |   |---j1: New For Each(false,false)[bag] - scope-74
+|   |       |   |
+|   |       |   Project[int][0] - scope-70
+|   |       |   |
+|   |       |   Project[chararray][1] - scope-72
+|   |       |
+|   |       |---j1: FRJoin[tuple] - scope-64   <-       scope-122
+|   |           |   |
+|   |           |   Project[int][0] - scope-62
+|   |           |   |
+|   |           |   Project[int][0] - scope-63
+|   |
+|   |---d: Filter[bag] - scope-36
+|       |   |
+|       |   Equal To[boolean] - scope-39
+|       |   |
+|       |   |---Project[int][0] - scope-37
+|       |   |
+|       |   |---Constant(8) - scope-38
+|   |
+|   b: Split - scope-130
+|   |   |
+|   |   u1: Store(file:///tmp/output/u1:org.apache.pig.builtin.PigStorage) - 
scope-136 ->       scope-27
+|   |   |
+|   |   POValueOutputTez - scope-107   ->       [scope-104]
+|   |
+|   |---b: Filter[bag] - scope-12
+|       |   |
+|       |   Greater Than[boolean] - scope-15
+|       |   |
+|       |   |---Project[int][0] - scope-13
+|       |   |
+|       |   |---Constant(5) - scope-14
+|   |
+|   c: Split - scope-131
+|   |   |
+|   |   u1: Store(file:///tmp/output/u1:org.apache.pig.builtin.PigStorage) - 
scope-137 ->       scope-27
+|   |   |
+|   |   POValueOutputTez - scope-108   ->       [scope-104]
+|   |
+|   |---c: Filter[bag] - scope-19
+|       |   |
+|       |   Equal To[boolean] - scope-22
+|       |   |
+|       |   |---Project[int][0] - scope-20
+|       |   |
+|       |   |---Constant(7) - scope-21
+|
+|---POValueInputTez - scope-82 <-       scope-79
+Tez vertex scope-104
+# Plan on vertex
+u2: Store(file:///tmp/output/u2:org.apache.pig.builtin.PigStorage) - scope-35
+|
+|---POShuffledValueInputTez - scope-105        <-       [scope-81, scope-98]
+Tez vertex scope-124
+# Plan on vertex
+POValueOutputTez - scope-129   ->       [scope-126]
+|
+|---POValueInputTez - scope-125        <-       scope-79
+Tez vertex scope-126
+# Plan on vertex
+u4: Store(file:///tmp/output/u4:org.apache.pig.builtin.PigStorage) - scope-78
+|
+|---POShuffledValueInputTez - scope-127        <-       [scope-124, scope-81]

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1736180&r1=1736179&r2=1736180&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Mar 22 10:44:07 
2016
@@ -661,6 +661,28 @@ public class TestTezCompiler {
                 "store c into 'file:///tmp/output' using " + 
DummyStoreWithOutputFormat.class.getName() + "();";
         // Plan should not have union optimization applied
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-DummyStore-OPTOFF.gld");
+
+        resetScope();
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
PigStorage.class.getName());
+        setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
null);
+        query =
+                "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
+                "split a into b if x > 5, c if x == 7, d if x == 8, e 
otherwise;" +
+                "u1 = union onschema b, c;" +
+                "store u1 into 'file:///tmp/output/u1';" +
+                //TODO: multiple levels of split not merged
+                "u2 = union onschema a, b, c;" +
+                "store u2 into 'file:///tmp/output/u2';" +
+                "u3 = union onschema d, e;" +
+                "store u3 into 'file:///tmp/output/u3';" +
+                "j1 = join d by x, a by x using 'replicated';" +
+                "j1 = foreach j1 generate d::x as x, d::y as y;" +
+                "u4 = union onschema j1, a;" +
+                "store u4 into 'file:///tmp/output/u4';";
+
+        // Plan should have union optimization applied even for unsupported 
storefunc
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1-SplitStore.gld");
+
         // Restore the value
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS, 
oldSupported);
         setProperty(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS, 
oldUnSupported);


Reply via email to