Author: rohini
Date: Tue Apr  7 23:21:01 2015
New Revision: 1671973

URL: http://svn.apache.org/r1671973
Log:
PIG-4495: Better multi-query planning in case of multiple edges (rohini)

Added:
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Cross-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-7.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SelfJoin-3.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-11.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-12.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-14.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-15.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/test/TestFRJoin.java
    pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Apr  7 23:21:01 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4495: Better multi-query planning in case of multiple edges (rohini)
+
 PIG-3294: Allow Pig use Hive UDFs (daijy)
 
 PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via 
rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 Tue Apr  7 23:21:01 2015
@@ -107,11 +107,7 @@ public class POPoissonSample extends Phy
                 if (res.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 } else if (res.returnStatus == POStatus.STATUS_EOP) {
-                    if (this.parentPlan.endOfAllInput) {
-                        return eop;
-                    } else {
-                        continue;
-                    }
+                    return res;
                 } else if (res.returnStatus == POStatus.STATUS_ERR) {
                     return res;
                 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Tue Apr  7 23:21:01 2015
@@ -109,6 +109,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
@@ -739,6 +740,8 @@ public class TezDagBuilder extends TezOp
                     additionalLocalResources));
         }
 
+        // Union within a split can have multiple stores writing to same output
+        Set<String> uniqueStoreOutputs = new HashSet<String>();
         for (POStore store : stores) {
 
             ArrayList<POStore> emptyList = new ArrayList<POStore>();
@@ -763,10 +766,13 @@ public class TezDagBuilder extends TezOp
                     continue;
                 }
             }
-            vertex.addDataSink(store.getOperatorKey().toString(),
-                    new DataSinkDescriptor(storeOutDescriptor,
-                    
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
-                    dag.getCredentials()));
+            String outputKey = ((POStoreTez) store).getOutputKey();
+            if (!uniqueStoreOutputs.contains(outputKey)) {
+                vertex.addDataSink(outputKey.toString(),
+                        new DataSinkDescriptor(storeOutDescriptor,
+                        
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
+                        dag.getCredentials()));
+            }
         }
 
         // LoadFunc and StoreFunc add delegation tokens to Job Credentials in

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Tue Apr  7 23:21:01 2015
@@ -403,11 +403,12 @@ public class TezLauncher extends Launche
             skOptimizer.visit();
         }
 
+        boolean isUnionOpt = 
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
         boolean isMultiQuery = 
conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
         if (isMultiQuery) {
             // reduces the number of TezOpers in the Tez plan generated
             // by multi-query (multi-store) script.
-            MultiQueryOptimizerTez mqOptimizer = new 
MultiQueryOptimizerTez(tezPlan);
+            MultiQueryOptimizerTez mqOptimizer = new 
MultiQueryOptimizerTez(tezPlan, isUnionOpt);
             mqOptimizer.visit();
         }
 
@@ -419,7 +420,6 @@ public class TezLauncher extends Launche
         }
 
         // Use VertexGroup in Tez
-        boolean isUnionOpt = 
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
         if (isUnionOpt) {
             UnionOptimizer uo = new UnionOptimizer(tezPlan);
             uo.visit();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Tue Apr  7 23:21:01 2015
@@ -284,6 +284,11 @@ public class TezCompiler extends PhyPlan
                     FuncSpec newSpec = new 
FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
                     userFunc.setFuncSpec(newSpec);
 
+                    //Remove unused store filename
+                    if (userFunc.getInputs().size() == 2) {
+                        userFunc.getInputs().remove(1);
+                    }
+
                     if (storeSeen.containsKey(store)) {
                         
storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
                     } else {
@@ -292,9 +297,6 @@ public class TezCompiler extends PhyPlan
                         
from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
                         from.plan.addAsLeaf(output);
                         storeSeen.put(store, output);
-
-                        //Remove unused store filename
-                        userFunc.getInputs().remove(1);
                     }
 
                     if (tezPlan.getPredecessors(tezOp)==null || 
!tezPlan.getPredecessors(tezOp).contains(from)) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Tue Apr  7 23:21:01 2015
@@ -441,7 +441,7 @@ public class TezOperator extends Operato
         this.useSecondaryKey = useSecondaryKey;
     }
 
-    public List<OperatorKey> getUnionPredecessors() {
+    public List<OperatorKey> getUnionMembers() {
         return vertexGroupMembers;
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 Tue Apr  7 23:21:01 2015
@@ -182,8 +182,8 @@ public class TezPOPackageAnnotator exten
 
             Integer index = Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
-                if (isPOSplit && predTezOpVertexGrp != null ) {
-                    // Case of POSplit having more than one member of the 
vertex group
+                if (isPOSplit) {
+                    // Case of POSplit having more than one input in case of 
self join or union
                     loRearrangeFound--;
                 } else {
                     // something is wrong - we should not be getting key info

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 Tue Apr  7 23:21:01 2015
@@ -53,11 +53,13 @@ public class POFRJoinTez extends POFRJoi
     private static final Log log = LogFactory.getLog(POFRJoinTez.class);
     private static final long serialVersionUID = 1L;
 
-    // For replicated tables
-    private List<LogicalInput> replInputs = Lists.newArrayList();
-    private List<KeyValueReader> replReaders = Lists.newArrayList();
     private List<String> inputKeys;
+
+    // For replicated tables
+    private transient List<LogicalInput> replInputs;
+    private transient List<KeyValueReader> replReaders;
     private transient boolean isInputCached;
+    private transient String cacheKey;
 
     public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws 
ExecException {
        super(copy);
@@ -78,7 +80,7 @@ public class POFRJoinTez extends POFRJoi
 
     @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
-        String cacheKey = "replicatemap-" + getOperatorKey().toString();
+        cacheKey = "replicatemap-" + inputKeys.toString();
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
             isInputCached = true;
@@ -93,10 +95,14 @@ public class POFRJoinTez extends POFRJoi
             return;
         }
         try {
+            this.replInputs = Lists.newArrayList();
+            this.replReaders = Lists.newArrayList();
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
-                this.replInputs.add(input);
-                this.replReaders.add((KeyValueReader) input.getReader());
+                if (!this.replInputs.contains(input)) {
+                    this.replInputs.add(input);
+                    this.replReaders.add((KeyValueReader) input.getReader());
+                }
             }
         } catch (Exception e) {
             throw new ExecException(e);
@@ -110,10 +116,11 @@ public class POFRJoinTez extends POFRJoi
      */
     @Override
     protected void setUpHashMap() throws ExecException {
-        String cacheKey = "replicatemap-" + getOperatorKey().toString();
 
-        if (isInputCached) {
-            Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        // Re-check again in case of Split + union + replicate join
+        // where same POFRJoinTez occurs in different Split sub-plans
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
             replicates = (TupleToMapKey[]) cacheValue;
             log.info("Found " + (replicates.length - 1) + " replication hash 
tables in Tez cache. cachekey=" + cacheKey);
             return;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 Tue Apr  7 23:21:01 2015
@@ -42,6 +42,7 @@ import org.apache.pig.impl.builtin.Parti
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
 
@@ -255,4 +256,16 @@ public class POPartitionRearrangeTez ext
         cache.cache(reducerMapCacheKey, reducerMap);
         inited = true;
     }
+
+    @Override
+    public POPartitionRearrangeTez clone() throws CloneNotSupportedException {
+        POPartitionRearrangeTez clone = new POPartitionRearrangeTez(new 
OperatorKey(
+                mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
+                        mKey.scope)), requestedParallelism);
+        deepCopyTo(clone);
+        clone.isSkewedJoin = isSkewedJoin;
+        clone.connectedToPackage = connectedToPackage;
+        clone.setOutputKey(outputKey);
+        return clone;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 Tue Apr  7 23:21:01 2015
@@ -50,15 +50,14 @@ public class POShuffleTezLoad extends PO
     private static final long serialVersionUID = 1L;
 
     protected List<String> inputKeys = new ArrayList<String>();
-    protected List<LogicalInput> inputs = new ArrayList<LogicalInput>();
-    protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
-
-    private boolean[] finished;
-    private boolean[] readOnce;
-
-    private WritableComparator comparator = null;
     private boolean isSkewedJoin = false;
 
+    private transient List<LogicalInput> inputs;
+    private transient List<KeyValuesReader> readers;
+    private transient int numTezInputs;
+    private transient boolean[] finished;
+    private transient boolean[] readOnce;
+    private transient WritableComparator comparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
 
@@ -86,33 +85,39 @@ public class POShuffleTezLoad extends PO
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration 
conf)
             throws ExecException {
         this.conf = conf;
-        comparator = (WritableComparator) 
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+        this.inputs = new ArrayList<LogicalInput>();
+        this.readers = new ArrayList<KeyValuesReader>();
+        this.comparator = (WritableComparator) 
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+        this.accumulativeBatchSize = 
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
         try {
-            for (String key : inputKeys) {
-                LogicalInput input = inputs.get(key);
-                this.inputs.add(input);
-                this.readers.add((KeyValuesReader)input.getReader());
+            for (String inputKey : inputKeys) {
+                LogicalInput input = inputs.get(inputKey);
+                // 1) Case of self join/cogroup/cross with Split.
+                //     - Same TezInput will contain multiple indexes in case 
of join
+                // 2) data unioned within Split
+                //     - Input key will be repeated, but index would be same 
within a TezInput
+                if (!this.inputs.contains(input)) {
+                    this.inputs.add(input);
+                    this.readers.add((KeyValuesReader)input.getReader());
+                }
             }
 
-            // We need to adjust numInputs because it's possible for both
-            // OrderedGroupedKVInput and non-OrderedGroupedKVInput to be 
attached
-            // to the same vertex. If so, we're only interested in
-            // OrderedGroupedKVInputs. So we ignore the others.
-            this.numInputs = this.inputs.size();
+            this.numInputs = this.pkgr.getKeyInfo().size();
+            this.numTezInputs = this.inputs.size();
 
             readOnce = new boolean[numInputs];
             for (int i = 0; i < numInputs; i++) {
                 readOnce[i] = false;
             }
 
-            finished = new boolean[numInputs];
-            for (int i = 0; i < numInputs; i++) {
+            finished = new boolean[numTezInputs];
+            for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
         } catch (Exception e) {
             throw new ExecException(e);
         }
-        accumulativeBatchSize = 
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
     }
 
     @Override
@@ -131,7 +136,7 @@ public class POShuffleTezLoad extends PO
             int minIndex = -1;
 
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         hasData = true;
                         cur = readers.get(i).getCurrentKey();
@@ -172,8 +177,10 @@ public class POShuffleTezLoad extends PO
                 } else {
 
                     for (int i = 0; i < numInputs; i++) {
+                        bags[i] = new InternalCachedBag(numInputs);
+                    }
 
-                        DataBag bag = null;
+                    for (int i = 0; i < numTezInputs; i++) {
 
                         if (!finished[i]) {
                             cur = readers.get(i).getCurrentKey();
@@ -181,14 +188,12 @@ public class POShuffleTezLoad extends PO
                             while (comparator.compare(min, cur) == 0
                                     && (!min.isNull() || (min.isNull() && i == 
minIndex))) {
                                 Iterable<Object> vals = 
readers.get(i).getCurrentValues();
-                                bag = bags[i] == null ? new 
InternalCachedBag(numInputs) : bags[i];
                                 for (Object val : vals) {
                                     NullableTuple nTup = (NullableTuple) val;
                                     int index = nTup.getIndex();
                                     Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
-                                    bag.add(tup);
+                                    bags[index].add(tup);
                                 }
-                                bags[i] = bag;
                                 finished[i] = !readers.get(i).next();
                                 if (finished[i]) {
                                     break;
@@ -196,10 +201,6 @@ public class POShuffleTezLoad extends PO
                                 cur = readers.get(i).getCurrentKey();
                             }
                         }
-
-                        if (bag == null) {
-                            bags[i] = new InternalCachedBag(numInputs);
-                        }
                     }
                 }
 
@@ -269,7 +270,7 @@ public class POShuffleTezLoad extends PO
         public boolean hasNextBatch() {
             Object cur = null;
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         if (comparator.compare(min, cur) == 0
@@ -292,7 +293,7 @@ public class POShuffleTezLoad extends PO
                 bags[i].clear();
             }
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         int batchCount = 0;
@@ -300,7 +301,9 @@ public class POShuffleTezLoad extends PO
                                 min.isNull() && i==minIndex)) {
                             Iterator<Object> iter = 
readers.get(i).getCurrentValues().iterator();
                             while (iter.hasNext() && batchCount < batchSize) {
-                                bags[i].add(pkgr.getValueTuple(keyWritable, 
(NullableTuple) iter.next(), i));
+                                NullableTuple nTup = (NullableTuple) 
iter.next();
+                                int index = nTup.getIndex();
+                                
bags[index].add(pkgr.getValueTuple(keyWritable, nTup, index));
                                 batchCount++;
                             }
                             if (batchCount == batchSize) {
@@ -333,7 +336,7 @@ public class POShuffleTezLoad extends PO
             // early termination of accumulator
             Object cur = null;
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         while (comparator.compare(min, cur) == 0 && 
(!min.isNull() ||

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 Tue Apr  7 23:21:01 2015
@@ -43,10 +43,12 @@ import org.apache.tez.runtime.library.ap
 public class POStoreTez extends POStore implements TezOutput, 
TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
+
+    private String outputKey;
+
     private transient MROutput output;
     private transient KeyValueWriter writer;
-    private String outputKey;
-    private TezCounter outputRecordCounter;
+    private transient TezCounter outputRecordCounter;
 
     public POStoreTez(OperatorKey k) {
         super(k);
@@ -143,4 +145,9 @@ public class POStoreTez extends POStore
         return res;
     }
 
+    @Override
+    public String name() {
+        return super.name() + (getOperatorKey().toString().equals(outputKey) ? 
"" : "\t->\t " +outputKey);
+    }
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 Tue Apr  7 23:21:01 2015
@@ -97,7 +97,7 @@ public class POValueOutputTez extends Ph
     @Override
     public void replaceOutput(String oldOutputKey, String newOutputKey) {
         if (outputKeys.remove(oldOutputKey)) {
-            outputKeys.add(oldOutputKey);
+            outputKeys.add(newOutputKey);
         }
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Tue Apr  7 23:21:01 2015
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.lang.ArrayUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -35,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -42,8 +44,21 @@ import org.apache.pig.impl.plan.ReverseD
 import org.apache.pig.impl.plan.VisitorException;
 
 public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
-    public MultiQueryOptimizerTez(TezOperPlan plan) {
+
+    private boolean unionOptimizerOn;
+
+    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+        this.unionOptimizerOn = unionOptimizerOn;
+    }
+
+    private void addAllPredecessors(TezOperator tezOp, List<TezOperator> 
predsList) {
+        if (getPlan().getPredecessors(tezOp) != null) {
+            for (TezOperator pred : getPlan().getPredecessors(tezOp)) {
+                predsList.add(pred);
+                addAllPredecessors(pred, predsList);
+            }
+        }
     }
 
     @Override
@@ -57,19 +72,55 @@ public class MultiQueryOptimizerTez exte
 
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
             for (TezOperator successor : successors) {
+                List<TezOperator> predecessors = new 
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
+                predecessors.remove(tezOp);
+                if (!predecessors.isEmpty()) {
+                    // If has other dependency that conflicts with other 
splittees, don't merge into split
+                    // For eg: self replicate join/skewed join
+                    // But if replicate input is from a different operator 
allow it, but ensure
+                    // that we don't have more than one input coming from that 
operator into the split
+
+                    // Check if other splittees or its predecessors (till the 
root) are not present in
+                    // the predecessors (till the root) of this splittee.
+                    // Need to check the whole predecessors hierarchy till 
root as the conflict
+                    // could be multiple levels up
+                    for (TezOperator predecessor : 
getPlan().getPredecessors(successor)) {
+                        if (predecessor != tezOp) {
+                            predecessors.add(predecessor);
+                            addAllPredecessors(predecessor, predecessors);
+                        }
+                    }
+                    List<TezOperator> toMergeSuccPredecessors = new 
ArrayList<TezOperator>(successors);
+                    toMergeSuccPredecessors.remove(successor);
+                    for (TezOperator splittee : splittees) {
+                        for (TezOperator spliteePred : 
getPlan().getPredecessors(splittee)) {
+                            if (spliteePred != tezOp) {
+                                toMergeSuccPredecessors.add(spliteePred);
+                                addAllPredecessors(spliteePred, 
toMergeSuccPredecessors);
+                            }
+                        }
+                    }
+                    if (predecessors.removeAll(toMergeSuccPredecessors)) {
+                        continue;
+                    }
+                }
 
-                // If has other dependency, don't merge into split,
-                if (getPlan().getPredecessors(successor).size()!=1) {
+                // Split contains right input of different skewed joins
+                if (successor.getSampleOperator() != null
+                        && tezOp.getSampleOperator() != null
+                        && !successor.getSampleOperator().equals(
+                                tezOp.getSampleOperator())) {
                     continue;
                 }
 
-                // Detect diamond shape, we cannot merge it into split, since 
Tez
-                // does not handle double edge between vertexes
-                // TODO:
-                //    - Vertex groups handles double edges though. For the 
case where the
-                //      double edges are unioned (successor is a union vertex),
-                //      try merge into split if union optimizer is turned on.
-                //    - PIG-3876 to handle this by writing to same edge
+                // Detect diamond shape into successor operator, we cannot 
merge it into split,
+                // since Tez does not handle double edge between vertexes
+                // Successor could be
+                //    - union operator (if no union optimizer changing it to 
vertex group which supports multiple edges)
+                //    - self replicate join
+                //    - self skewed join
+                // Self hash joins can write to same output edge and is 
handled by POShuffleTezLoad
+                // TODO: PIG-3876 to handle this by writing to same edge
                 Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
                 Set<TezOperator> toMergeSuccessors = new 
HashSet<TezOperator>();
                 mergedSuccessors.addAll(successors);
@@ -79,15 +130,26 @@ public class MultiQueryOptimizerTez exte
                     }
                 }
                 if (getPlan().getSuccessors(successor) != null) {
-                    
toMergeSuccessors.addAll(getPlan().getSuccessors(successor));
+                    for (TezOperator succSuccessor : 
getPlan().getSuccessors(successor)) {
+                        if (succSuccessor.isUnion()) {
+                            if (!(unionOptimizerOn
+                                    && 
UnionOptimizer.isOptimizable(succSuccessor))) {
+                                toMergeSuccessors.add(succSuccessor);
+                            }
+                        } else if (successors.contains(succSuccessor)) {
+                                // Self replicate/skewed join
+                                toMergeSuccessors.add(succSuccessor);
+                        }
+                    }
                 }
+
                 mergedSuccessors.retainAll(toMergeSuccessors);
                 if (mergedSuccessors.isEmpty()) { // no shared edge after merge
                     splittees.add(successor);
                 }
             }
 
-            if (splittees.size()==0) {
+            if (splittees.size() == 0) {
                 return;
             }
 
@@ -140,31 +202,61 @@ public class MultiQueryOptimizerTez exte
         }
     }
 
-    static public void removeSplittee(TezOperPlan plan, TezOperator splitter, 
TezOperator splittee) throws PlanException {
-        if (plan.getSuccessors(splittee)!=null) {
-            List<TezOperator> succs = new ArrayList<TezOperator>();
-            succs.addAll(plan.getSuccessors(splittee));
-            plan.disconnect(splitter, splittee);
+    private void removeSplittee(TezOperPlan plan, TezOperator splitter,
+            TezOperator splittee) throws PlanException, VisitorException {
+
+        plan.disconnect(splitter, splittee);
+
+        String spliteeKey = splittee.getOperatorKey().toString();
+        String splitterKey = splitter.getOperatorKey().toString();
+
+        if (plan.getPredecessors(splittee) != null) {
+            for (TezOperator pred : new 
ArrayList<TezOperator>(plan.getPredecessors(splittee))) {
+                List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(pred.plan,
+                        TezOutput.class);
+                for (TezOutput tezOut : tezOutputs) {
+                    if (ArrayUtils.contains(tezOut.getTezOutputs(), 
spliteeKey)) {
+                        tezOut.replaceOutput(spliteeKey, splitterKey);
+                    }
+                }
+
+                TezEdgeDescriptor edge = 
pred.outEdges.remove(splittee.getOperatorKey());
+                if (edge == null) {
+                    throw new VisitorException("Edge description is empty");
+                }
+                pred.outEdges.put(splitter.getOperatorKey(), edge);
+                splitter.inEdges.put(pred.getOperatorKey(), edge);
+                plan.disconnect(pred, splittee);
+                plan.connect(pred, splitter);
+            }
+        }
+
+        if (plan.getSuccessors(splittee) != null) {
+            List<TezOperator> succs = new 
ArrayList<TezOperator>(plan.getSuccessors(splittee));
+            List<TezOperator> splitterSuccs = plan.getSuccessors(splitter);
             for (TezOperator succTezOperator : succs) {
                 TezEdgeDescriptor edge = 
succTezOperator.inEdges.get(splittee.getOperatorKey());
-
                 splitter.outEdges.remove(splittee.getOperatorKey());
                 succTezOperator.inEdges.remove(splittee.getOperatorKey());
                 plan.disconnect(splittee, succTezOperator);
-                TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
+
+                // Do not connect again in case of self join/cross/cogroup or 
union
+                if (splitterSuccs == null || 
!splitterSuccs.contains(succTezOperator)) {
+                    TezCompilerUtil.connect(plan, splitter, succTezOperator, 
edge);
+                }
 
                 try {
                     List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
                     for (TezInput input : inputs) {
-                        
input.replaceInput(splittee.getOperatorKey().toString(),
-                                splitter.getOperatorKey().toString());
+                        input.replaceInput(spliteeKey,
+                                splitterKey);
                     }
                     List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
                     for (POUserFunc userFunc : userFuncs) {
                         if (userFunc.getFunc() instanceof ReadScalarsTez) {
                             TezInput tezInput = (TezInput)userFunc.getFunc();
-                            
tezInput.replaceInput(splittee.getOperatorKey().toString(),
-                                    splitter.getOperatorKey().toString());
+                            tezInput.replaceInput(spliteeKey,
+                                    splitterKey);
                             
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
                         }
                     }
@@ -173,9 +265,10 @@ public class MultiQueryOptimizerTez exte
                 }
 
                 if (succTezOperator.isUnion()) {
-                    int index = 
succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey());
-                    if (index > -1) {
-                        succTezOperator.getUnionPredecessors().set(index, 
splitter.getOperatorKey());
+                    int index = 
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
+                    while (index > -1) {
+                        succTezOperator.getUnionMembers().set(index, 
splitter.getOperatorKey());
+                        index = 
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
                     }
                 }
             }
@@ -183,7 +276,7 @@ public class MultiQueryOptimizerTez exte
         plan.remove(splittee);
     }
 
-    static public void addSubPlanPropertiesToParent(TezOperator parentOper, 
TezOperator subPlanOper) {
+    private void addSubPlanPropertiesToParent(TezOperator parentOper, 
TezOperator subPlanOper) {
         // Copy only map side properties. For eg: crossKeys.
         // Do not copy reduce side specific properties. For eg: 
useSecondaryKey, segmentBelow, sortOrder, etc
         if (subPlanOper.getCrossKeys() != null) {
@@ -193,6 +286,11 @@ public class MultiQueryOptimizerTez exte
         }
         parentOper.copyFeatures(subPlanOper, null);
 
+        // For skewed join right input
+        if (subPlanOper.getSampleOperator() !=  null) {
+            parentOper.setSampleOperator(subPlanOper.getSampleOperator());
+        }
+
         if (subPlanOper.getRequestedParallelism() > 
parentOper.getRequestedParallelism()) {
             
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
         }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 Tue Apr  7 23:21:01 2015
@@ -157,7 +157,7 @@ public class TezOperDependencyParallelis
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         for (TezOperator pred : preds) {
             if (pred.isVertexGroup()) {
-                for (OperatorKey unionPred : pred.getUnionPredecessors()) {
+                for (OperatorKey unionPred : pred.getVertexGroupMembers()) {
                     if (unionPred.toString().equals(inputKey)) {
                         return plan.getOperator(unionPred);
                     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Tue Apr  7 23:21:01 2015
@@ -19,11 +19,16 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 
+import org.apache.commons.lang.ArrayUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -35,10 +40,12 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -63,8 +70,17 @@ import org.apache.tez.runtime.library.ou
  */
 public class UnionOptimizer extends TezOpPlanVisitor {
 
+    private TezOperPlan tezPlan;
     public UnionOptimizer(TezOperPlan plan) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+        tezPlan = plan;
+    }
+
+    public static boolean isOptimizable(TezOperator tezOp) {
+        if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
+            return false;
+        }
+        return true;
     }
 
     @Override
@@ -73,34 +89,66 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
+        if (!isOptimizable(tezOp)) {
             return;
         }
 
         TezOperator unionOp = tezOp;
-        String unionOpKey = unionOp.getOperatorKey().toString();
         String scope = unionOp.getOperatorKey().scope;
-        TezOperPlan tezPlan = getPlan();
+        PhysicalPlan unionOpPlan = unionOp.plan;
 
-        //TODO: PIG-3856 Handle replicated join. Replicate join input that was 
broadcast to union vertex
+        // TODO: PIG-3856 Handle replicated join and skewed join sample.
+        // Replicate join small table/skewed join sample that was broadcast to 
union vertex
         // now needs to be broadcast to all the union predecessors. How do we 
do that??
         // Wait for shared edge and do it or write multiple times??
-        // For now don't optimize
-        // Create a copy as disconnect while iterating modifies the original 
list
+        // For now don't optimize except in the case of Split where we need to 
write only once
+
+        Set<OperatorKey> uniqueUnionMembers = new 
HashSet<OperatorKey>(unionOp.getUnionMembers());
         List<TezOperator> predecessors = new 
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
-        if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
-            return;
+        List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null 
? null
+                : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+
+        if (predecessors.size() > unionOp.getUnionMembers().size()
+                && uniqueUnionMembers.size() != 1) {
+            return; // TODO: PIG-3856
         }
+        if (uniqueUnionMembers.size() == 1) {
+            // We actually don't need VertexGroup in this case. The multiple
+            // sub-plans of Split can write to same MROutput or the Tez 
LogicalOutput
+            OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
+            TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
+            PhysicalPlan splitPredPlan = splitPredOp.plan;
+            if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It 
has to be. But check anyways
+
+                try {
+                    connectUnionNonMemberPredecessorsToSplit(unionOp, 
splitPredOp, predecessors);
+
+                    // Remove POShuffledValueInputTez from union plan root
+                    unionOpPlan.remove(unionOpPlan.getRoots().get(0));
+                    // Clone union plan into split subplans
+                    for (int i=0; i < 
Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
+                        cloneAndMergeUnionPlan(unionOp, splitPredOp);
+                    }
+                    copyOperatorProperties(splitPredOp, unionOp);
+                    tezPlan.disconnect(splitPredOp, unionOp);
 
-        PhysicalPlan unionOpPlan = unionOp.plan;
+                    connectSplitOpToUnionSuccessors(unionOp, splitPredOp, 
successors);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
 
-        // Union followed by Split followed by Store could have multiple stores
+                //Remove union operator from the plan
+                tezPlan.remove(unionOp);
+                return;
+            } else {
+                throw new VisitorException("Expected POSplit but found " + 
splitPredPlan.getLeaves().get(0));
+            }
+        }
+
+        // Create vertex group operator for each store. Union followed by Split
+        // followed by Store could have multiple stores
         List<POStoreTez> unionStoreOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
         TezOperator[] storeVertexGroupOps = new 
TezOperator[unionStoreOutputs.size()];
-        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
-        // Create a copy as disconnect while iterating modifies the original 
list
-        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
-        
         for (int i=0; i < storeVertexGroupOps.length; i++) {
             TezOperator existingVertexGroup = null;
             if (successors != null) {
@@ -116,12 +164,13 @@ public class UnionOptimizer extends TezO
                 storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
                 storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
                 
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
-                
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+                
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getUnionMembers());
                 tezPlan.add(storeVertexGroupOps[i]);
             }
         }
 
-        // Case of split, orderby, skewed join, rank, etc will have multiple 
outputs
+        // Create vertex group operator for each output. Case of split, 
orderby,
+        // skewed join, rank, etc will have multiple outputs
         List<TezOutput> unionOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
         // One TezOutput can write to multiple LogicalOutputs (POCounterTez, 
POValueOutputTez, etc)
         List<String> unionOutputKeys = new ArrayList<String>();
@@ -133,133 +182,343 @@ public class UnionOptimizer extends TezO
                 unionOutputKeys.add(key);
             }
         }
-
-        // Create vertex group operator for each output
         TezOperator[] outputVertexGroupOps = new 
TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
             outputVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
-            
outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+            
outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getUnionMembers());
             newOutputKeys[i] = 
outputVertexGroupOps[i].getOperatorKey().toString();
             tezPlan.add(outputVertexGroupOps[i]);
         }
 
+        // Change plan from Predecessors -> Union -> Successor(s) to
+        // Predecessors -> Vertex Group(s) -> Successor(s)
         try {
-
-             // Clone plan of union and merge it into the predecessor operators
              // Remove POShuffledValueInputTez from union plan root
             unionOpPlan.remove(unionOpPlan.getRoots().get(0));
-            for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
+
+            for (OperatorKey predKey : unionOp.getUnionMembers()) {
                 TezOperator pred = tezPlan.getOperator(predKey);
-                PhysicalPlan predPlan = pred.plan;
-                PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
-                // if predLeaf not POValueOutputTez
-                if (predLeaf instanceof POSplit) {
-                    // Find the subPlan that connects to the union operator
-                    predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
-                    predLeaf = predPlan.getLeaves().get(0);
-                }
+                PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
+                connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
+                        storeVertexGroupOps, outputVertexGroupOps);
+            }
 
-                PhysicalPlan clonePlan = unionOpPlan.clone();
-                //Clone changes the operator keys
-                List<POStoreTez> clonedUnionStoreOutputs = 
PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
-
-                // Remove POValueOutputTez from predecessor leaf
-                predPlan.remove(predLeaf);
-                boolean isEmptyPlan = predPlan.isEmpty();
-                if (!isEmptyPlan) {
-                    predLeaf = predPlan.getLeaves().get(0);
-                }
-                predPlan.merge(clonePlan);
-                if (!isEmptyPlan) {
-                    predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
-                }
+            connectVertexGroupsToSuccessors(unionOp, successors,
+                    unionOutputKeys, outputVertexGroupOps);
 
-                // Connect predecessor to the storeVertexGroups
-                int i = 0;
-                for (TezOperator storeVertexGroup : storeVertexGroupOps) {
-                    
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-                    //Set the output key of cloned POStore to that of the 
initial union POStore.
-                    clonedUnionStoreOutputs.get(i).setOutputKey(
-                            storeVertexGroup.getVertexGroupInfo().getStore()
-                                    .getOperatorKey().toString());
-                    
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
-                            storeVertexGroup.getOperatorKey());
-                    tezPlan.connect(pred, storeVertexGroup);
-                }
+            replaceSuccessorInputsAndDisconnect(unionOp, successors, 
unionOutputKeys, newOutputKeys);
 
-                for (TezOperator outputVertexGroup : outputVertexGroupOps) {
-                    
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-                    tezPlan.connect(pred, outputVertexGroup);
+            //Remove union operator from the plan
+            tezPlan.remove(unionOp);
+        } catch (VisitorException e) {
+            throw e;
+        }  catch (Exception e) {
+            throw new VisitorException(e);
+        }
+
+    }
+
+    /**
+     * Connect the predecessors of the union which are not members of the union
+     * (usually FRJoin replicated table orSkewedJoin sample) to the Split op
+     * which is the only member of the union. Disconnect those predecessors 
from the union.
+     *
+     * Replace the output keys of those predecessors with the split operator
+     * key instead of the union operator key.
+     *
+     * @param unionOp Union operator
+     * @param splitPredOp Split operator which is the only member of the union 
and its predecessor
+     * @param unionPredecessors Predecessors of the union including the split 
operator
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void connectUnionNonMemberPredecessorsToSplit(TezOperator unionOp,
+            TezOperator splitPredOp,
+            List<TezOperator> unionPredecessors) throws PlanException, 
VisitorException {
+        String unionOpKey = unionOp.getOperatorKey().toString();
+        OperatorKey splitPredKey = splitPredOp.getOperatorKey();
+        for (TezOperator pred : unionPredecessors) {
+
+            if (!pred.getOperatorKey().equals(splitPredKey)) { //Skip 
splitPredOp which is also a predecessor
+                // Get actual predecessors if predecessor is a vertex group
+                TezOperator predVertexGroup = null;
+                List<TezOperator> actualPreds = new ArrayList<TezOperator>();
+                if (pred.isVertexGroup()) {
+                    predVertexGroup = pred;
+                    for (OperatorKey opKey : pred.getVertexGroupMembers()) {
+                        // There should not be multiple levels of vertex 
group. So no recursion required.
+                        actualPreds.add(tezPlan.getOperator(opKey));
+                    }
+                    tezPlan.disconnect(predVertexGroup, unionOp);
+                    tezPlan.connect(predVertexGroup, splitPredOp);
+                } else {
+                    actualPreds.add(pred);
                 }
 
-                copyOperatorProperties(pred, unionOp);
-                tezPlan.disconnect(pred, unionOp);
-            }
+                for (TezOperator actualPred : actualPreds) {
+                    List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(actualPred.plan,
+                                    TezOutput.class);
 
-            List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
-            for (TezOutput tezOutput : unionOutputs) {
-                if (tezOutput instanceof POValueOutputTez) {
-                    valueOnlyOutputs.add(tezOutput);
+                    for (TezOutput tezOut : tezOutputs) {
+                        if (ArrayUtils.contains(tezOut.getTezOutputs(), 
unionOpKey)) {
+                            tezOut.replaceOutput(unionOpKey, 
splitPredKey.toString());
+                        }
+                    }
+                    TezEdgeDescriptor edge = 
actualPred.outEdges.remove(unionOp.getOperatorKey());
+                    if (edge == null) {
+                        throw new VisitorException("Edge description is 
empty");
+                    }
+                    actualPred.outEdges.put(splitPredKey, edge);
+                    splitPredOp.inEdges.put(actualPred.getOperatorKey(), edge);
+                    if (predVertexGroup == null) {
+                        // Disconnect FRJoin table/SkewedJoin sample edge to
+                        // union op and connect to POSplit
+                        tezPlan.disconnect(actualPred, unionOp);
+                        tezPlan.connect(actualPred, splitPredOp);
+                    }
                 }
             }
-            // Connect to outputVertexGroupOps
-            // Copy output edges of union -> successor to 
predecessor->successor, vertexgroup -> successor
-            // and connect vertexgroup -> successor in the plan.
-            for (Entry<OperatorKey, TezEdgeDescriptor> entry : 
unionOp.outEdges.entrySet()) {
-                TezOperator succOp = tezPlan.getOperator(entry.getKey());
-                // Case of union followed by union.
-                // unionOp.outEdges will not point to vertex group, but to its 
output.
-                // So find the vertex group if there is one.
-                TezOperator succOpVertexGroup = null;
-                for (TezOperator succ : successors) {
-                    if (succ.isVertexGroup()
-                            && succ.getVertexGroupInfo().getOutput()
-                                    
.equals(succOp.getOperatorKey().toString())) {
-                        succOpVertexGroup = succ;
-                        break;
+        }
+    }
+
+    /**
+     * Connect the split operator to the successors of the union operators and 
update the edges.
+     * Also change the inputs of the successor from the union operator to the 
split operator.
+     *
+     * @param unionOp Union operator
+     * @param splitPredOp Split operator which is the only member of the union
+     * @param successors Successors of the union operator
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void connectSplitOpToUnionSuccessors(TezOperator unionOp,
+            TezOperator splitPredOp, List<TezOperator> successors)
+            throws PlanException, VisitorException {
+        String unionOpKey = unionOp.getOperatorKey().toString();
+        String splitPredOpKey = splitPredOp.getOperatorKey().toString();
+        if (successors != null) {
+            for (TezOperator succ : successors) {
+                TezOperator successorVertexGroup = null;
+                boolean removeSuccessorVertexGroup = false;
+                List<TezOperator> actualSuccs = new ArrayList<TezOperator>();
+                if (succ.isVertexGroup()) {
+                    successorVertexGroup = succ;
+                    if (tezPlan.getSuccessors(successorVertexGroup) != null) {
+                        // There should not be multiple levels of vertex 
group. So no recursion required.
+                        
actualSuccs.addAll(tezPlan.getSuccessors(successorVertexGroup));
                     }
-                }
-                TezEdgeDescriptor edge = entry.getValue();
-                // Edge cannot be one to one as it will get input from two or
-                // more union predecessors. Change it to SCATTER_GATHER
-                if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
-                    edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                    edge.partitionerClass = RoundRobinPartitioner.class;
-                    edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
-                    edge.inputClassName = UnorderedKVInput.class.getName();
-                }
-                TezOperator vertexGroupOp = 
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
-                for (OperatorKey predKey : 
vertexGroupOp.getVertexGroupMembers()) {
-                    TezOperator pred = tezPlan.getOperator(predKey);
-                    // Keep the output edge directly to successor
-                    // Don't need to keep output edge for vertexgroup
-                    pred.outEdges.put(entry.getKey(), edge);
-                    succOp.inEdges.put(predKey, edge);
-                    if (succOpVertexGroup != null) {
-                        succOpVertexGroup.getVertexGroupMembers().add(predKey);
-                        
succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
-                        // Connect directly to the successor vertex group
-                        tezPlan.disconnect(pred, vertexGroupOp);
-                        tezPlan.connect(pred, succOpVertexGroup);
+                    int index = 
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+                    while (index > -1) {
+                        succ.getVertexGroupMembers().set(index, 
splitPredOp.getOperatorKey());
+                        index = 
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+                    }
+                    tezPlan.disconnect(unionOp, successorVertexGroup);
+                    Set<OperatorKey> uniqueVertexGroupMembers = new 
HashSet<OperatorKey>(succ.getVertexGroupMembers());
+                    if (uniqueVertexGroupMembers.size() == 1) {
+                        //Only splitPredOp is member of the vertex group. Get 
rid of the vertex group
+                        removeSuccessorVertexGroup = true;
+                    } else {
+                        tezPlan.connect(splitPredOp, successorVertexGroup);
                     }
-                }
-                if (succOpVertexGroup != null) {
-                    
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
-                    
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
-                    //Discard the new vertex group created
-                    tezPlan.remove(vertexGroupOp);
                 } else {
-                    tezPlan.connect(vertexGroupOp, succOp);
+                    actualSuccs.add(succ);
+                }
+
+                for (TezOperator actualSucc : actualSuccs) {
+                    LinkedList<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
+                    for (TezInput tezInput : inputs) {
+                        for (String inputKey : tezInput.getTezInputs()) {
+                            if (inputKey.equals(unionOpKey)) {
+                                tezInput.replaceInput(inputKey, 
splitPredOpKey);
+                            }
+                        }
+                    }
+
+                    List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
+                    for (POUserFunc userFunc : userFuncs) {
+                        if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                            TezInput tezInput = (TezInput)userFunc.getFunc();
+                            for (String inputKey : tezInput.getTezInputs()) {
+                                if (inputKey.equals(unionOpKey)) {
+                                    tezInput.replaceInput(inputKey, 
splitPredOpKey);
+                                    
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+                                }
+                            }
+                        }
+                    }
+
+                    TezEdgeDescriptor edge = 
actualSucc.inEdges.remove(unionOp.getOperatorKey());
+                    if (edge == null) {
+                        throw new VisitorException("Edge description is 
empty");
+                    }
+                    actualSucc.inEdges.put(splitPredOp.getOperatorKey(), edge);
+                    splitPredOp.outEdges.put(actualSucc.getOperatorKey(), 
edge);
+                    if (successorVertexGroup == null || 
removeSuccessorVertexGroup) {
+                        if (removeSuccessorVertexGroup) {
+                            // Changes plan from SplitOp -> Union -> 
VertexGroup - > Successor
+                            // to SplitOp -> Successor
+                            tezPlan.disconnect(successorVertexGroup, 
actualSucc);
+                            tezPlan.remove(successorVertexGroup);
+                        } else {
+                            // Changes plan from SplitOp -> Union -> Successor
+                            // to SplitOp -> Successor
+                            tezPlan.disconnect(unionOp, actualSucc);
+                        }
+                        tezPlan.connect(splitPredOp, actualSucc);
+                    }
                 }
             }
+        }
+    }
+
+    /**
+     * Clone plan of union and merge it into the predecessor operator
+     *
+     * @param unionOp Union operator
+     * @param predOp Predecessor operator of union to which union plan should 
be merged to
+     */
+    private PhysicalPlan cloneAndMergeUnionPlan(TezOperator unionOp, 
TezOperator predOp) throws VisitorException {
+        try {
+            PhysicalPlan predPlan = predOp.plan;
+            PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+            // if predLeaf not POValueOutputTez
+            if (predLeaf instanceof POSplit) {
+                // Find the subPlan that connects to the union operator
+                predPlan = getUnionPredPlanFromSplit(predPlan, 
unionOp.getOperatorKey().toString());
+                predLeaf = predPlan.getLeaves().get(0);
+            }
+            PhysicalPlan clonePlan = unionOp.plan.clone();
+
+            // Remove POValueOutputTez from predecessor leaf
+            predPlan.remove(predLeaf);
+            boolean isEmptyPlan = predPlan.isEmpty();
+            if (!isEmptyPlan) {
+                predLeaf = predPlan.getLeaves().get(0);
+            }
+            predPlan.merge(clonePlan);
+            if (!isEmptyPlan) {
+                predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+            }
+            return clonePlan;
         } catch (Exception e) {
             throw new VisitorException(e);
         }
+    }
+
+    /**
+     * Connects the unionOp predecessor to the store vertex groups and the 
output vertex groups
+     * and disconnects it from the unionOp.
+     *
+     * @param pred Predecessor of union which will be made part of the vertex 
group
+     * @param unionOp Union operator
+     * @param predClonedUnionPlan Cloned plan of the union merged to the 
predecessor
+     * @param storeVertexGroupOps Store vertex groups to connect to
+     * @param outputVertexGroupOps Tez LogicalOutput vertex groups to connect 
to
+     */
+    public void connectPredecessorsToVertexGroups(TezOperator unionOp,
+            TezOperator pred, PhysicalPlan predClonedUnionPlan,
+            TezOperator[] storeVertexGroupOps,
+            TezOperator[] outputVertexGroupOps) throws 
VisitorException,PlanException {
+
+        //Clone changes the operator keys
+        List<POStoreTez> clonedUnionStoreOutputs = 
PlanHelper.getPhysicalOperators(predClonedUnionPlan, POStoreTez.class);
+
+        // Connect predecessor to the storeVertexGroups
+        int i = 0;
+        for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+            
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+            
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
+                    storeVertexGroup.getOperatorKey());
+            tezPlan.connect(pred, storeVertexGroup);
+        }
+
+        for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+            
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+            tezPlan.connect(pred, outputVertexGroup);
+        }
 
+        copyOperatorProperties(pred, unionOp);
+        tezPlan.disconnect(pred, unionOp);
+    }
+
+    /**
+     * Connect vertexgroup operator to successor operator in the plan.
+     *
+     * Copy the output edge between union operator and successor to between
+     * predecessors and successor. Predecessor output key and output edge 
points
+     * to successor so that we have all the edge configuration, but they are
+     * connected to the vertex group in the plan.
+     *
+     * @param unionOp Union operator
+     * @param successors Successors of the union operator
+     * @param unionOutputKeys Output keys of union
+     * @param outputVertexGroupOp  Tez LogicalOutput vertex groups 
corresponding to the output keys
+     *
+     * @throws PlanException
+     */
+    private void connectVertexGroupsToSuccessors(TezOperator unionOp,
+            List<TezOperator> successors, List<String> unionOutputKeys,
+            TezOperator[] outputVertexGroupOps) throws PlanException {
+        // Connect to outputVertexGroupOps
+        for (Entry<OperatorKey, TezEdgeDescriptor> entry : 
unionOp.outEdges.entrySet()) {
+            TezOperator succOp = tezPlan.getOperator(entry.getKey());
+            // Case of union followed by union.
+            // unionOp.outEdges will not point to vertex group, but to its 
output.
+            // So find the vertex group if there is one.
+            TezOperator succOpVertexGroup = null;
+            for (TezOperator succ : successors) {
+                if (succ.isVertexGroup()
+                        && succ.getVertexGroupInfo().getOutput()
+                                .equals(succOp.getOperatorKey().toString())) {
+                    succOpVertexGroup = succ;
+                    break;
+                }
+            }
+            TezEdgeDescriptor edge = entry.getValue();
+            // Edge cannot be one to one as it will get input from two or
+            // more union predecessors. Change it to SCATTER_GATHER
+            if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+                edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+            }
+            TezOperator vertexGroupOp = 
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+            for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+                TezOperator pred = tezPlan.getOperator(predKey);
+                // Keep the output edge directly to successor
+                // Don't need to keep output edge for vertexgroup
+                pred.outEdges.put(entry.getKey(), edge);
+                succOp.inEdges.put(predKey, edge);
+                if (succOpVertexGroup != null) {
+                    succOpVertexGroup.getVertexGroupMembers().add(predKey);
+                    succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+                    // Connect directly to the successor vertex group
+                    tezPlan.disconnect(pred, vertexGroupOp);
+                    tezPlan.connect(pred, succOpVertexGroup);
+                }
+            }
+            if (succOpVertexGroup != null) {
+                
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+                //Discard the new vertex group created
+                tezPlan.remove(vertexGroupOp);
+            } else {
+                tezPlan.connect(vertexGroupOp, succOp);
+            }
+        }
+    }
+
+    private void replaceSuccessorInputsAndDisconnect(TezOperator unionOp,
+            List<TezOperator> successors,
+            List<String> unionOutputKeys,
+            String[] newOutputKeys)
+            throws VisitorException {
         if (successors != null) {
+            String unionOpKey = unionOp.getOperatorKey().toString();
             // Successor inputs should now point to the vertex groups.
             for (TezOperator succ : successors) {
                 LinkedList<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
@@ -271,16 +530,27 @@ public class UnionOptimizer extends TezO
                         }
                     }
                 }
+
+                List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
+                for (POUserFunc userFunc : userFuncs) {
+                    if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                        TezInput tezInput = (TezInput)userFunc.getFunc();
+                        for (String inputKey : tezInput.getTezInputs()) {
+                            if (inputKey.equals(unionOpKey)) {
+                                tezInput.replaceInput(inputKey,
+                                        
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
+                                
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+                            }
+                        }
+                    }
+                }
+
                 tezPlan.disconnect(unionOp, succ);
             }
         }
-
-        //Remove union operator from the plan
-        tezPlan.remove(unionOp);
-
     }
 
-    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) 
{
+    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) 
throws VisitorException {
         pred.UDFs.addAll(unionOp.UDFs);
         pred.scalars.addAll(unionOp.scalars);
         // Copy only map side properties. For eg: crossKeys.
@@ -292,6 +562,17 @@ public class UnionOptimizer extends TezO
             }
         }
         pred.copyFeatures(unionOp, Arrays.asList(new 
OPER_FEATURE[]{OPER_FEATURE.UNION}));
+
+        // For skewed join right input
+        if (unionOp.getSampleOperator() !=  null) {
+            if (pred.getSampleOperator() == null) {
+                pred.setSampleOperator(unionOp.getSampleOperator());
+            } else if 
(!pred.getSampleOperator().equals(unionOp.getSampleOperator())) {
+                throw new VisitorException("Conflicting sample operators "
+                        + pred.getSampleOperator().toString() + " and "
+                        + unionOp.getSampleOperator().toString());
+            }
+        }
     }
 
     public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, 
String unionOpKey) throws VisitorException {

Modified: pig/trunk/test/e2e/pig/tests/multiquery.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/multiquery.conf?rev=1671973&r1=1671972&r2=1671973&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/trunk/test/e2e/pig/tests/multiquery.conf Tue Apr  7 23:21:01 2015
@@ -52,6 +52,21 @@
 # - _TEST_ Streaming with multiple stores.
 # - _TEST_ Streaming in demux.
 # - _TEST_ Streaming in nested demux.
+# MultiQuery_Union (Also refer Union in nightly.conf)
+# - _TEST_ Multiple levels of union with join
+# - _TEST_ Union with replicate join left table part of split
+# - _TEST_ Union with replicate join right table part of split
+# - _TEST_ Union with skewed join left table part of split
+# - _TEST_ Union with skewed join right table part of split
+# - _TEST_ Union with group by + combiner
+# - _TEST_ Union with group by + secondary key partitioner
+# - _TEST_ Union with order by
+# MultiQuery_Self
+# - _TEST_ Self cross
+# - _TEST_ Self cogroup
+# - _TEST_ Three way join (two self)
+# - _TEST_ Self replicate join
+# - _TEST_ Self skewed join
 
 
 $cfg = {
@@ -554,7 +569,168 @@ $cfg = {
             },
             ] # end of tests
         },
+        
+        {
+        'name' => 'MultiQuery_Union',
+        'floatpostprocess' => 1,
+        'delimiter' => '    ',
+        'tests' => [
+            { 
+            # Multiple levels of union + join
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, 
age, gpa:float);
+c = filter a by gpa >= 4;
+c1 = foreach c generate *;
+c2 = foreach c generate *;
+c3 = union c1, c2;
+d = filter a by gpa < 4;
+d1 = foreach d generate *;
+d2 = foreach d generate *;
+d3 = union d1, d2;
+a1 = union c3, d3;
+e = join a1 by name, b by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Replicate Join left
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+e = join c by name, d by name using 'replicated';
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Replicate Join right
+            'num' => 3,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+e = join d by name, c by name using 'replicated';
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Skewed Join left
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+e = join c by name, d by name using 'skewed' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Skewed Join right
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+e = join d by name, c by name using 'skewed' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            },
+            { 
+            # Union + Groupby + Combiner
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Groupby + Secondary key partitioner
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d { f = order c by $1,$2; generate group, f; };
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Orderby
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = order c by name PARALLEL 3;
+store d into ':OUTPATH:';\,
+            'sortArgs' => ['-t', '     ', '-k', '1,1'],
+            },
+            ] # end of tests
+        },
+        
+        {
+        'name' => 'MultiQuery_Self',
+        'floatpostprocess' => 1,
+        'delimiter' => '    ',
+        'tests' => [
+            # Self cross
+            {
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa <= 0.5;
+d = filter a by gpa >= 3.5 and gpa < 3.9;
+e = filter a by gpa > 0.5 and gpa < 1;
+f = CROSS b, c PARALLEL 3;
+g = CROSS d, e PARALLEL 4;
+store f into ':OUTPATH:.1';
+store g into ':OUTPATH:.2';\,
+            },
+            {
+            # Self cogroup
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = cogroup c by name, b by name;
+e = foreach d generate flatten(c), flatten(b);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Three way join (two self)
+            'num' => 3,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, 
contributions);
+e = join b by name, c by name, d by name PARALLEL 2;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Self join replicated
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'replicated';
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join skewed
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, 
age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'skewed' PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            ] # end of tests
+        },
 
     ] # end of groups
 }
-;
\ No newline at end of file
+;


Reply via email to