Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 Fri Mar  4 18:17:39 2016
@@ -33,11 +33,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -57,13 +55,11 @@ public class POPartitionRearrangeTez ext
     private static final long serialVersionUID = 1L;
 
     private static final Log LOG = 
LogFactory.getLog(POPartitionRearrangeTez.class);
-    private static final TupleFactory tf = TupleFactory.getInstance();
-    private static final BagFactory mBagFactory = BagFactory.getInstance();
 
     // ReducerMap will store the tuple, max reducer index & min reducer index
-    private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
-    private Integer totalReducers = -1;
-    private boolean inited = false;
+    private transient Map<Object, Pair<Integer, Integer>> reducerMap;
+    private transient Integer totalReducers;
+    private transient boolean inited;
 
     public POPartitionRearrangeTez(OperatorKey k) {
         this(k, -1);
@@ -201,6 +197,8 @@ public class POPartitionRearrangeTez ext
         }
 
         Map<String, Object> distMap = null;
+        totalReducers = -1;
+        reducerMap = Maps.newHashMap();
         if (PigProcessor.sampleMap != null) {
             // We've already collected sampleMap in PigProcessor
             distMap = PigProcessor.sampleMap;
@@ -232,7 +230,7 @@ public class POPartitionRearrangeTez ext
                 if (idxTuple.size() > 3) {
                 // remove the last 2 fields of the tuple, i.e: minIndex
                 // and maxIndex and store it in the reducer map
-                Tuple keyTuple = tf.newTuple();
+                Tuple keyTuple = mTupleFactory.newTuple();
                 for (int i=0; i < idxTuple.size() - 2; i++) {
                     keyTuple.append(idxTuple.get(i));
                 }
@@ -255,4 +253,9 @@ public class POPartitionRearrangeTez ext
         cache.cache(reducerMapCacheKey, reducerMap);
         inited = true;
     }
+
+    @Override
+    public POPartitionRearrangeTez clone() throws CloneNotSupportedException {
+        return (POPartitionRearrangeTez) super.clone();
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
 Fri Mar  4 18:17:39 2016
@@ -50,6 +50,7 @@ public class PORankTez extends PORank im
     private transient KeyValueReader reader;
     private transient Map<Integer, Long> counterOffsets;
     private transient Configuration conf;
+    private transient boolean finished = false;
 
     public PORankTez(PORank copy) {
         super(copy);
@@ -133,6 +134,9 @@ public class PORankTez extends PORank im
 
     @Override
     public Result getNextTuple() throws ExecException {
+        if (finished) {
+            return RESULT_EOP;
+        }
         Result inp = null;
 
         try {
@@ -150,6 +154,7 @@ public class PORankTez extends PORank im
         if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, 
"false"))) {
             this.parentPlan.endOfAllInput = true;
         }
+        finished = true;
         return RESULT_EOP;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 Fri Mar  4 18:17:39 2016
@@ -50,15 +50,15 @@ public class POShuffleTezLoad extends PO
     private static final long serialVersionUID = 1L;
 
     protected List<String> inputKeys = new ArrayList<String>();
-    protected List<LogicalInput> inputs = new ArrayList<LogicalInput>();
-    protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
-
-    private boolean[] finished;
-    private boolean[] readOnce;
-
-    private WritableComparator comparator = null;
     private boolean isSkewedJoin = false;
 
+    private transient List<LogicalInput> inputs;
+    private transient List<KeyValuesReader> readers;
+    private transient int numTezInputs;
+    private transient boolean[] finished;
+    private transient boolean[] readOnce;
+    private transient WritableComparator comparator = null;
+    private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
 
@@ -73,7 +73,7 @@ public class POShuffleTezLoad extends PO
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }
@@ -86,33 +86,40 @@ public class POShuffleTezLoad extends PO
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration 
conf)
             throws ExecException {
         this.conf = conf;
-        comparator = (WritableComparator) 
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+        this.inputs = new ArrayList<LogicalInput>();
+        this.readers = new ArrayList<KeyValuesReader>();
+        this.comparator = (WritableComparator) 
ConfigUtils.getIntermediateInputKeyComparator(conf);
+        this.groupingComparator = (WritableComparator) 
ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
+        this.accumulativeBatchSize = 
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
+
         try {
-            for (String key : inputKeys) {
-                LogicalInput input = inputs.get(key);
-                this.inputs.add(input);
-                this.readers.add((KeyValuesReader)input.getReader());
+            for (String inputKey : inputKeys) {
+                LogicalInput input = inputs.get(inputKey);
+                // 1) Case of self join/cogroup/cross with Split - 
numTezInputs < numInputs/inputKeys
+                //     - Same TezInput will contain multiple indexes in case 
of join
+                // 2) data unioned within Split - inputKeys > 
numInputs/numTezInputs
+                //     - Input key will be repeated, but index would be same 
within a TezInput
+                if (!this.inputs.contains(input)) {
+                    this.inputs.add(input);
+                    this.readers.add((KeyValuesReader)input.getReader());
+                }
             }
 
-            // We need to adjust numInputs because it's possible for both
-            // OrderedGroupedKVInput and non-OrderedGroupedKVInput to be 
attached
-            // to the same vertex. If so, we're only interested in
-            // OrderedGroupedKVInputs. So we ignore the others.
-            this.numInputs = this.inputs.size();
+            this.numInputs = this.pkgr.getKeyInfo().size();
+            this.numTezInputs = this.inputs.size();
 
             readOnce = new boolean[numInputs];
             for (int i = 0; i < numInputs; i++) {
                 readOnce[i] = false;
             }
 
-            finished = new boolean[numInputs];
-            for (int i = 0; i < numInputs; i++) {
+            finished = new boolean[numTezInputs];
+            for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
         } catch (Exception e) {
             throw new ExecException(e);
         }
-        accumulativeBatchSize = 
AccumulatorOptimizerUtil.getAccumulativeBatchSize();
     }
 
     @Override
@@ -128,17 +135,27 @@ public class POShuffleTezLoad extends PO
             boolean hasData = false;
             Object cur = null;
             PigNullableWritable min = null;
-            int minIndex = -1;
 
             try {
-                for (int i = 0; i < numInputs; i++) {
-                    if (!finished[i]) {
+                if (numTezInputs == 1) {
+                    if (!finished[0]) {
                         hasData = true;
-                        cur = readers.get(i).getCurrentKey();
-                        if (min == null || comparator.compare(min, cur) > 0) {
-                            //Not a deep clone. Writable is referenced.
-                            min = ((PigNullableWritable)cur).clone();
-                            minIndex = i;
+                        cur = readers.get(0).getCurrentKey();
+                        // Just move to the next key without comparison
+                        min = ((PigNullableWritable)cur).clone();
+                    }
+                } else {
+                    for (int i = 0; i < numTezInputs; i++) {
+                        if (!finished[i]) {
+                            hasData = true;
+                            cur = readers.get(i).getCurrentKey();
+                            // TODO: PIG-4652 should compare key bytes instead
+                            // of deserialized objects when using 
BytesComparator
+                            // for faster comparison
+                            if (min == null || comparator.compare(min, cur) > 
0) {
+                                //Not a deep clone. Writable is referenced.
+                                min = ((PigNullableWritable)cur).clone();
+                            }
                         }
                     }
                 }
@@ -153,7 +170,7 @@ public class POShuffleTezLoad extends PO
                 if 
(Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
                     this.parentPlan.endOfAllInput = true;
                 }
-                return new Result(POStatus.STATUS_EOP, null);
+                return RESULT_EOP;
             }
 
             key = pkgr.getKey(min);
@@ -164,7 +181,6 @@ public class POShuffleTezLoad extends PO
                 if (isAccumulative()) {
 
                     buffer.setCurrentKey(min);
-                    buffer.setCurrentKeyIndex(minIndex);
                     for (int i = 0; i < numInputs; i++) {
                         bags[i] = new AccumulativeBag(buffer, i);
                     }
@@ -172,34 +188,45 @@ public class POShuffleTezLoad extends PO
                 } else {
 
                     for (int i = 0; i < numInputs; i++) {
+                        bags[i] = new InternalCachedBag(numInputs);
+                    }
 
-                        DataBag bag = null;
-
-                        if (!finished[i]) {
-                            cur = readers.get(i).getCurrentKey();
-                            // We need to loop in case of Grouping Comparators
-                            while (comparator.compare(min, cur) == 0
-                                    && (!min.isNull() || (min.isNull() && i == 
minIndex))) {
-                                Iterable<Object> vals = 
readers.get(i).getCurrentValues();
-                                bag = bags[i] == null ? new 
InternalCachedBag(numInputs) : bags[i];
-                                for (Object val : vals) {
-                                    NullableTuple nTup = (NullableTuple) val;
-                                    int index = nTup.getIndex();
-                                    Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
-                                    bag.add(tup);
-                                }
-                                bags[i] = bag;
-                                finished[i] = !readers.get(i).next();
-                                if (finished[i]) {
-                                    break;
-                                }
+                    if (numTezInputs == 1) {
+                        do {
+                            Iterable<Object> vals = 
readers.get(0).getCurrentValues();
+                            for (Object val : vals) {
+                                NullableTuple nTup = (NullableTuple) val;
+                                int index = nTup.getIndex();
+                                Tuple tup = pkgr.getValueTuple(keyWritable, 
nTup, index);
+                                bags[index].add(tup);
+                            }
+                            finished[0] = !readers.get(0).next();
+                            if (finished[0]) {
+                                break;
+                            }
+                            cur = readers.get(0).getCurrentKey();
+                        } while (groupingComparator.compare(min, cur) == 0); 
// We need to loop in case of Grouping Comparators
+                    } else {
+                        for (int i = 0; i < numTezInputs; i++) {
+                            if (!finished[i]) {
                                 cur = readers.get(i).getCurrentKey();
+                                // We need to loop in case of Grouping 
Comparators
+                                while (groupingComparator.compare(min, cur) == 
0) {
+                                    Iterable<Object> vals = 
readers.get(i).getCurrentValues();
+                                    for (Object val : vals) {
+                                        NullableTuple nTup = (NullableTuple) 
val;
+                                        int index = nTup.getIndex();
+                                        Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
+                                        bags[index].add(tup);
+                                    }
+                                    finished[i] = !readers.get(i).next();
+                                    if (finished[i]) {
+                                        break;
+                                    }
+                                    cur = readers.get(i).getCurrentKey();
+                                }
                             }
                         }
-
-                        if (bag == null) {
-                            bags[i] = new InternalCachedBag(numInputs);
-                        }
                     }
                 }
 
@@ -240,7 +267,6 @@ public class POShuffleTezLoad extends PO
         private int batchSize;
         private List<Tuple>[] bags;
         private PigNullableWritable min;
-        private int minIndex;
         private boolean clearedCurrent = true;
 
         @SuppressWarnings("unchecked")
@@ -261,19 +287,14 @@ public class POShuffleTezLoad extends PO
             clearedCurrent = false;
         }
 
-        public void setCurrentKeyIndex(int curKeyIndex) {
-            this.minIndex = curKeyIndex;
-        }
-
         @Override
         public boolean hasNextBatch() {
             Object cur = null;
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        if (comparator.compare(min, cur) == 0
-                                && (!min.isNull() || (min.isNull() && i == 
minIndex))) {
+                        if (groupingComparator.compare(min, cur) == 0) {
                             return true;
                         }
                     }
@@ -292,15 +313,16 @@ public class POShuffleTezLoad extends PO
                 bags[i].clear();
             }
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
                         int batchCount = 0;
-                        while (comparator.compare(min, cur) == 0 && 
(!min.isNull() ||
-                                min.isNull() && i==minIndex)) {
+                        while (groupingComparator.compare(min, cur) == 0) {
                             Iterator<Object> iter = 
readers.get(i).getCurrentValues().iterator();
                             while (iter.hasNext() && batchCount < batchSize) {
-                                bags[i].add(pkgr.getValueTuple(keyWritable, 
(NullableTuple) iter.next(), i));
+                                NullableTuple nTup = (NullableTuple) 
iter.next();
+                                int index = nTup.getIndex();
+                                
bags[index].add(pkgr.getValueTuple(keyWritable, nTup, index));
                                 batchCount++;
                             }
                             if (batchCount == batchSize) {
@@ -333,11 +355,10 @@ public class POShuffleTezLoad extends PO
             // early termination of accumulator
             Object cur = null;
             try {
-                for (int i = 0; i < numInputs; i++) {
+                for (int i = 0; i < numTezInputs; i++) {
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        while (comparator.compare(min, cur) == 0 && 
(!min.isNull() ||
-                                min.isNull() && i==minIndex)) {
+                        while (groupingComparator.compare(min, cur) == 0) {
                             finished[i] = !readers.get(i).next();
                             if (finished[i]) {
                                 break;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 Fri Mar  4 18:17:39 2016
@@ -38,7 +38,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -57,7 +56,6 @@ public class POShuffledValueInputTez ext
     private transient boolean finished = false;
     private transient Iterator<KeyValueReader> readers;
     private transient KeyValueReader currentReader;
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
     private transient Configuration conf;
 
     public POShuffledValueInputTez(OperatorKey k) {
@@ -71,7 +69,7 @@ public class POShuffledValueInputTez ext
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 Fri Mar  4 18:17:39 2016
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -31,28 +32,37 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigImplConstants;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.lib.MRReader;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
  */
-public class POSimpleTezLoad extends POLoad implements TezInput {
+public class POSimpleTezLoad extends POLoad implements TezInput, 
TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
+
     private String inputKey;
-    private MRInput input;
-    private KeyValueReader reader;
+
+    private transient ProcessorContext processorContext;
+    private transient MRInput input;
+    private transient KeyValueReader reader;
     private transient Configuration conf;
+    private transient boolean finished = false;
+    private transient TezCounter inputRecordCounter;
 
-    public POSimpleTezLoad(OperatorKey k, FileSpec lfile) {
-        super(k, lfile);
+    public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
+        super(k, loader);
     }
 
     @Override
@@ -68,6 +78,12 @@ public class POSimpleTezLoad extends POL
     }
 
     @Override
+    public void initialize(ProcessorContext processorContext)
+            throws ExecException {
+        this.processorContext = processorContext;
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 
@@ -92,6 +108,22 @@ public class POSimpleTezLoad extends POL
         } catch (IOException e) {
             throw new ExecException(e);
         }
+
+        // Multiple inputs - other broadcast input like replicate join table, 
order by sample.
+        // We use multi input counters to just get MRInput records count.
+        if (inputs.size() > 1) {
+            CounterGroup multiInputGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+            if (multiInputGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+            }
+            String name = 
MRPigStatsUtil.getMultiInputsCounterName(super.getLFile().getFileName(), 0);
+            if (name != null) {
+                inputRecordCounter = multiInputGroup.addCounter(name, name, 0);
+            }
+        }
     }
 
     /**
@@ -102,22 +134,28 @@ public class POSimpleTezLoad extends POL
     @Override
     public Result getNextTuple() throws ExecException {
         try {
-            Result res = new Result();
+            if (finished) {
+                return RESULT_EOP;
+            }
             if (!reader.next()) {
-                res.result = null;
-                res.returnStatus = POStatus.STATUS_EOP;
                 // For certain operators (such as STREAM), we could still have 
some work
                 // to do even after seeing the last input. These operators set 
a flag that
                 // says all input has been sent and to run the pipeline one 
more time.
                 if 
(Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
                     this.parentPlan.endOfAllInput = true;
                 }
+                finished = true;
+                return RESULT_EOP;
             } else {
+                Result res = new Result();
                 Tuple next = (Tuple) reader.getCurrentValue();
                 res.result = next;
                 res.returnStatus = POStatus.STATUS_OK;
+                if (inputRecordCounter != null) {
+                    inputRecordCounter.increment(1);
+                }
+                return res;
             }
-            return res;
         } catch (IOException e) {
             throw new ExecException(e);
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 Fri Mar  4 18:17:39 2016
@@ -43,10 +43,13 @@ import org.apache.tez.runtime.library.ap
 public class POStoreTez extends POStore implements TezOutput, 
TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
+
+    private String outputKey;
+
+    private transient ProcessorContext processorContext;
     private transient MROutput output;
     private transient KeyValueWriter writer;
-    private String outputKey;
-    private TezCounter outputRecordCounter;
+    private transient TezCounter outputRecordCounter;
 
     public POStoreTez(OperatorKey k) {
         super(k);
@@ -76,19 +79,7 @@ public class POStoreTez extends POStore
     @Override
     public void initialize(ProcessorContext processorContext)
             throws ExecException {
-        if (isMultiStore()) {
-            CounterGroup multiStoreGroup = processorContext.getCounters()
-                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            if (multiStoreGroup == null) {
-                processorContext.getCounters().addGroup(
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            }
-            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-            if (name != null) {
-                outputRecordCounter = multiStoreGroup.addCounter(name, name, 
0);
-            }
-        }
+        this.processorContext = processorContext;
     }
 
     @Override
@@ -110,6 +101,21 @@ public class POStoreTez extends POStore
         } catch (IOException e) {
             throw new ExecException(e);
         }
+
+        // Multiple outputs - can be another store or other outputs (shuffle, 
broadcast)
+        if (outputs.size() > 1) {
+            CounterGroup multiStoreGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            if (multiStoreGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            }
+            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+            if (name != null) {
+                outputRecordCounter = multiStoreGroup.addCounter(name, name, 
0);
+            }
+        }
     }
 
     @Override
@@ -121,9 +127,10 @@ public class POStoreTez extends POStore
                 if (illustrator == null) {
                     // PigOutputFormat.PigRecordWriter will call 
storeFunc.putNext
                     writer.write(null, res.result);
-                } else
+                } else {
                     illustratorMarkup(res.result, res.result, 0);
-                res = empty;
+                }
+                res = RESULT_EMPTY;
 
                 if (outputRecordCounter != null) {
                     outputRecordCounter.increment(1);
@@ -143,4 +150,9 @@ public class POStoreTez extends POStore
         return res;
     }
 
+    @Override
+    public String name() {
+        return super.name() + (getOperatorKey().toString().equals(outputKey) ? 
"" : "\t->\t " +outputKey);
+    }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
 Fri Mar  4 18:17:39 2016
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -58,7 +57,6 @@ public class POValueInputTez extends Phy
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean hasNext;
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
 
     public POValueInputTez(OperatorKey k) {
         super(k);
@@ -120,12 +118,10 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else {
-                if (reader.next()) {
-                    Tuple origTuple = (Tuple)reader.getCurrentValue();
-                    Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                    return new Result(POStatus.STATUS_OK, copy);
-                }
+            } else if (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                return new Result(POStatus.STATUS_OK, copy);
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have 
some work

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 Fri Mar  4 18:17:39 2016
@@ -39,7 +39,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -51,8 +50,8 @@ public class POValueOutputTez extends Ph
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(POValueOutputTez.class);
 
-    private static final TupleFactory tupleFactory = 
TupleFactory.getInstance();
-
+    private boolean scalarOutput;
+    private transient Object scalarValue;
     private boolean taskIndexWithRecordIndexAsKey;
     // TODO Change this to outputKey and write only once
     // when a shared edge support is available in Tez
@@ -71,6 +70,14 @@ public class POValueOutputTez extends Ph
         super(k);
     }
 
+    public boolean isScalarOutput() {
+        return scalarOutput;
+    }
+
+    public void setScalarOutput(boolean scalarOutput) {
+        this.scalarOutput = scalarOutput;
+    }
+
     public boolean isTaskIndexWithRecordIndexAsKey() {
         return taskIndexWithRecordIndexAsKey;
     }
@@ -96,8 +103,8 @@ public class POValueOutputTez extends Ph
 
     @Override
     public void replaceOutput(String oldOutputKey, String newOutputKey) {
-        if (outputKeys.remove(oldOutputKey)) {
-            outputKeys.add(oldOutputKey);
+        while (outputKeys.remove(oldOutputKey)) {
+            outputKeys.add(newOutputKey);
         }
     }
 
@@ -149,14 +156,25 @@ public class POValueOutputTez extends Ph
             if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
             }
+            if (scalarOutput) {
+                if (scalarValue == null) {
+                    scalarValue = inp.result;
+                } else {
+                    String msg = "Scalar has more than one row in the output. "
+                            + "1st : " + scalarValue + ", 2nd :"
+                            + inp.result
+                            + " (common cause: \"JOIN\" then \"FOREACH ... 
GENERATE foo.bar\" should be \"foo::bar\" )";
+                    throw new ExecException(msg);
+                }
+            }
+            if (taskIndexWithRecordIndexAsKey) {
+                Tuple tuple = mTupleFactory.newTuple(2);
+                tuple.set(0, taskIndex);
+                tuple.set(1, count++);
+                key = tuple;
+            }
             for (KeyValueWriter writer : writers) {
                 try {
-                    if (taskIndexWithRecordIndexAsKey) {
-                        Tuple tuple = tupleFactory.newTuple(2);
-                        tuple.set(0, taskIndex);
-                        tuple.set(1, count++);
-                        key = tuple;
-                    }
                     writer.write(key, inp.result);
                 } catch (IOException e) {
                     throw new ExecException(e);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 Fri Mar  4 18:17:39 2016
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
@@ -47,13 +48,28 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 
 public class LoaderProcessor extends TezOpPlanVisitor {
-    private Configuration conf;
+
+    private static final Log LOG = LogFactory.getLog(LoaderProcessor.class);
+
+    private TezOperPlan tezOperPlan;
+    private JobConf jobConf;
     private PigContext pc;
-    private static final Log log = LogFactory.getLog(LoaderProcessor.class);
-    public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
+
+    public LoaderProcessor(TezOperPlan plan, PigContext pigContext) throws 
VisitorException {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        this.tezOperPlan = plan;
         this.pc = pigContext;
-        this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());;
+        this.jobConf = new 
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties()));
+        // This ensures that the same credentials object is used by reference 
everywhere
+        this.jobConf.setCredentials(tezOperPlan.getCredentials());
+        this.jobConf.setBoolean("mapred.mapper.new-api", true);
+        this.jobConf.setClass("mapreduce.inputformat.class",
+                PigInputFormat.class, InputFormat.class);
+        try {
+            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
     }
 
     /**
@@ -76,22 +92,25 @@ public class LoaderProcessor extends Tez
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
         ArrayList<Long> inpLimits = new ArrayList<Long>();
 
-        Job job = Job.getInstance(conf);
-        conf = job.getConfiguration();
-        conf.setBoolean("mapred.mapper.new-api", true);
-        conf.setClass("mapreduce.inputformat.class",
-                PigInputFormat.class, InputFormat.class);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
         List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
                 POLoad.class);
 
+        Job job = Job.getInstance(jobConf);
+        Configuration conf = job.getConfiguration();
+
         if (lds != null && lds.size() > 0) {
-            for (POLoad ld : lds) {
-                LoadFunc lf = ld.getLoadFunc();
-                lf.setLocation(ld.getLFile().getFileName(), job);
+            if (lds.size() == 1) {
+                for (POLoad ld : lds) {
+                    LoadFunc lf = ld.getLoadFunc();
+                    lf.setLocation(ld.getLFile().getFileName(), job);
 
-                // Store the inp filespecs
-                inp.add(ld.getLFile());
+                    // Store the inp filespecs
+                    inp.add(ld.getLFile());
+                }
+            } else {
+                throw new VisitorException(
+                        "There is more than one load for TezOperator "
+                                + tezOp);
             }
         }
 
@@ -114,7 +133,9 @@ public class LoaderProcessor extends Tez
                 tezOp.plan.remove(ld);
                 // Now add the input handling operator for the Tez backend
                 // TODO: Move this upstream to the PhysicalPlan generation
-                POSimpleTezLoad tezLoad = new 
POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile());
+                POSimpleTezLoad tezLoad = new 
POSimpleTezLoad(ld.getOperatorKey(), ld.getLoadFunc());
+                tezLoad.setLFile(ld.getLFile());
+                tezLoad.setSignature(ld.getSignature());
                 tezLoad.setInputKey(ld.getOperatorKey().toString());
                 tezLoad.copyAliasFrom(ld);
                 tezLoad.setCacheFiles(ld.getCacheFiles());
@@ -127,10 +148,10 @@ public class LoaderProcessor extends Tez
             UDFContext.getUDFContext().serialize(conf);
             conf.set("udf.import.list",
                     
ObjectSerializer.serialize(PigContext.getPackageImportList()));
-            conf.set("pig.inputs", ObjectSerializer.serialize(inp));
-            conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
-            conf.set("pig.inpSignatures", 
ObjectSerializer.serialize(inpSignatureLists));
-            conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            conf.set(PigInputFormat.PIG_INPUTS, 
ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_INPUT_TARGETS, 
ObjectSerializer.serialize(inpTargets));
+            conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, 
ObjectSerializer.serialize(inpSignatureLists));
+            conf.set(PigInputFormat.PIG_INPUT_LIMITS, 
ObjectSerializer.serialize(inpLimits));
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!tezOp.combineSmallSplits() || 
pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, 
"true").equals("false"))
@@ -139,7 +160,7 @@ public class LoaderProcessor extends Tez
                 try {
                     maxCombinedSplitSize = Long.parseLong(tmp);
                 } catch (NumberFormatException e) {
-                    log.warn("Invalid numeric format for 
pig.maxCombinedSplitSize; use the default maximum combined split size");
+                    LOG.warn("Invalid numeric format for 
pig.maxCombinedSplitSize; use the default maximum combined split size");
                 }
             }
             if (maxCombinedSplitSize > 0)
@@ -150,6 +171,10 @@ public class LoaderProcessor extends Tez
             // Not using MRInputAMSplitGenerator because delegation tokens are
             // fetched in FileInputFormat
             
tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
 false, 0));
+            // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
+            // splits can be moved to if(loads) block below
+            int parallelism = 
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
+            tezOp.setRequestedParallelism(parallelism);
         }
         return lds;
     }
@@ -159,6 +184,7 @@ public class LoaderProcessor extends Tez
         try {
             tezOp.getLoaderInfo().setLoads(processLoads(tezOp));
         } catch (Exception e) {
+            e.printStackTrace();
             throw new VisitorException(e);
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Fri Mar  4 18:17:39 2016
@@ -24,17 +24,13 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -42,8 +38,27 @@ import org.apache.pig.impl.plan.ReverseD
 import org.apache.pig.impl.plan.VisitorException;
 
 public class MultiQueryOptimizerTez extends TezOpPlanVisitor {
-    public MultiQueryOptimizerTez(TezOperPlan plan) {
+
+    private boolean unionOptimizerOn;
+    private List<String> unionSupportedStoreFuncs;
+    private List<String> unionUnsupportedStoreFuncs;
+
+    public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn,
+            List<String> unionSupportedStoreFuncs,
+            List<String> unionUnsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+        this.unionOptimizerOn = unionOptimizerOn;
+        this.unionSupportedStoreFuncs = unionSupportedStoreFuncs;;
+        this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs;
+    }
+
+    private void addAllPredecessors(TezOperator tezOp, List<TezOperator> 
predsList) {
+        if (getPlan().getPredecessors(tezOp) != null) {
+            for (TezOperator pred : getPlan().getPredecessors(tezOp)) {
+                predsList.add(pred);
+                addAllPredecessors(pred, predsList);
+            }
+        }
     }
 
     @Override
@@ -54,20 +69,67 @@ public class MultiQueryOptimizerTez exte
             }
 
             List<TezOperator> splittees = new ArrayList<TezOperator>();
+            Set<TezOperator> mergedNonPackageInputSuccessors = new 
HashSet<TezOperator>();
 
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
             for (TezOperator successor : successors) {
+                List<TezOperator> predecessors = new 
ArrayList<TezOperator>(getPlan().getPredecessors(successor));
+                predecessors.remove(tezOp);
+                if (!predecessors.isEmpty()) {
+                    // If has other dependency that conflicts with other 
splittees, don't merge into split
+                    // For eg: self replicate join/skewed join
+                    // But if replicate input is from a different operator 
allow it, but ensure
+                    // that we don't have more than one input coming from that 
operator into the split
+
+                    // Check if other splittees or its predecessors (till the 
root) are not present in
+                    // the predecessors (till the root) of this splittee.
+                    // Need to check the whole predecessors hierarchy till 
root as the conflict
+                    // could be multiple levels up
+                    for (TezOperator predecessor : 
getPlan().getPredecessors(successor)) {
+                        if (predecessor != tezOp) {
+                            predecessors.add(predecessor);
+                            addAllPredecessors(predecessor, predecessors);
+                        }
+                    }
+                    List<TezOperator> toMergeSuccPredecessors = new 
ArrayList<TezOperator>(successors);
+                    toMergeSuccPredecessors.remove(successor);
+                    for (TezOperator splittee : splittees) {
+                        for (TezOperator spliteePred : 
getPlan().getPredecessors(splittee)) {
+                            if (spliteePred != tezOp) {
+                                toMergeSuccPredecessors.add(spliteePred);
+                                addAllPredecessors(spliteePred, 
toMergeSuccPredecessors);
+                            }
+                        }
+                    }
+                    if (predecessors.removeAll(toMergeSuccPredecessors)) {
+                        continue;
+                    }
+                }
 
-                // If has other dependency, don't merge into split,
-                if (getPlan().getPredecessors(successor).size()!=1) {
+                // Split contains right input of different skewed joins
+                if (successor.getSampleOperator() != null
+                        && tezOp.getSampleOperator() != null
+                        && !successor.getSampleOperator().equals(
+                                tezOp.getSampleOperator())) {
                     continue;
                 }
 
-                // Detect diamond shape, we cannot merge it into split, since 
Tez
-                // does not handle double edge between vertexes
-                // TODO: PIG-3876 to handle this by writing to same edge
+                // Detect diamond shape into successor operator, we cannot 
merge it into split,
+                // since Tez does not handle double edge between vertexes
+                // Successor could be
+                //    - union operator (if no union optimizer changing it to 
vertex group which supports multiple edges)
+                //    - self replicate join, self skewed join or scalar
+                //    - POPackage (Self hash joins can write to same output 
edge and is handled by POShuffleTezLoad)
                 Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
+                // These successors should not be merged due to diamond shape
+                Set<TezOperator> toNotMergeSuccessors = new 
HashSet<TezOperator>();
+                // These successors can be merged
                 Set<TezOperator> toMergeSuccessors = new 
HashSet<TezOperator>();
+                // These successors (Scalar, POFRJoinTez) can be merged if 
they are the only input.
+                // Only in case of POPackage(POShuffleTezLoad) multiple inputs 
can be handled from a Split
+                Set<TezOperator> nonPackageInputSuccessors = new 
HashSet<TezOperator>();
+                boolean canMerge = true;
+
                 mergedSuccessors.addAll(successors);
                 for (TezOperator splittee : splittees) {
                     if (getPlan().getSuccessors(splittee) != null) {
@@ -75,15 +137,62 @@ public class MultiQueryOptimizerTez exte
                     }
                 }
                 if (getPlan().getSuccessors(successor) != null) {
-                    
toMergeSuccessors.addAll(getPlan().getSuccessors(successor));
+                    for (TezOperator succSuccessor : 
getPlan().getSuccessors(successor)) {
+                        if (succSuccessor.isUnion()) {
+                            if (!(unionOptimizerOn &&
+                                    UnionOptimizer.isOptimizable(succSuccessor,
+                                            unionSupportedStoreFuncs,
+                                            unionUnsupportedStoreFuncs))) {
+                                toNotMergeSuccessors.add(succSuccessor);
+                            } else {
+                                toMergeSuccessors.add(succSuccessor);
+                                List<TezOperator> unionSuccessors = 
getPlan().getSuccessors(succSuccessor);
+                                if (unionSuccessors != null) {
+                                    for (TezOperator unionSuccessor : 
unionSuccessors) {
+                                        if 
(TezCompilerUtil.isNonPackageInput(succSuccessor.getOperatorKey().toString(), 
unionSuccessor)) {
+                                            canMerge = canMerge ? 
nonPackageInputSuccessors.add(unionSuccessor) : false;
+                                        } else {
+                                            
toMergeSuccessors.add(unionSuccessor);
+                                        }
+                                    }
+                                }
+                            }
+                        } else if 
(TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), 
succSuccessor)) {
+                            // Output goes to scalar or POFRJoinTez instead of 
POPackage
+                            // POPackage/POShuffleTezLoad can handle multiple 
inputs from a Split.
+                            // But if input is sent to any other operator like
+                            // scalar, POFRJoinTez then we need to ensure it 
is the only one.
+                            canMerge = canMerge ? 
nonPackageInputSuccessors.add(succSuccessor) : false;
+                        } else {
+                            toMergeSuccessors.add(succSuccessor);
+                        }
+                    }
                 }
-                mergedSuccessors.retainAll(toMergeSuccessors);
+
+                if (canMerge) {
+                    if (!nonPackageInputSuccessors.isEmpty() || 
!mergedNonPackageInputSuccessors.isEmpty()) {
+                        // If a non-POPackage input successor is already 
merged or
+                        // if there is a POPackage and non-POPackage to be 
merged,
+                        // then skip as it will become diamond shape
+                        // For eg: POFRJoinTez+Scalar, 
POFRJoinTez/Scalar+POPackage
+                        if 
(nonPackageInputSuccessors.removeAll(mergedSuccessors)
+                                || 
toMergeSuccessors.removeAll(mergedNonPackageInputSuccessors)
+                                || 
toMergeSuccessors.removeAll(nonPackageInputSuccessors)) {
+                            continue;
+                        }
+                    }
+                } else {
+                    continue;
+                }
+
+                mergedSuccessors.retainAll(toNotMergeSuccessors);
                 if (mergedSuccessors.isEmpty()) { // no shared edge after merge
                     splittees.add(successor);
+                    
mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
                 }
             }
 
-            if (splittees.size()==0) {
+            if (splittees.size() == 0) {
                 return;
             }
 
@@ -136,42 +245,46 @@ public class MultiQueryOptimizerTez exte
         }
     }
 
-    static public void removeSplittee(TezOperPlan plan, TezOperator splitter, 
TezOperator splittee) throws PlanException {
-        if (plan.getSuccessors(splittee)!=null) {
-            List<TezOperator> succs = new ArrayList<TezOperator>();
-            succs.addAll(plan.getSuccessors(splittee));
-            plan.disconnect(splitter, splittee);
+    private void removeSplittee(TezOperPlan plan, TezOperator splitter,
+            TezOperator splittee) throws PlanException, VisitorException {
+
+        plan.disconnect(splitter, splittee);
+
+        String spliteeKey = splittee.getOperatorKey().toString();
+        String splitterKey = splitter.getOperatorKey().toString();
+
+        if (plan.getPredecessors(splittee) != null) {
+            for (TezOperator pred : new 
ArrayList<TezOperator>(plan.getPredecessors(splittee))) {
+                TezEdgeDescriptor edge = 
pred.outEdges.remove(splittee.getOperatorKey());
+                if (edge == null) {
+                    throw new VisitorException("Edge description is empty");
+                }
+                plan.disconnect(pred, splittee);
+                TezCompilerUtil.connectTezOpToNewSuccesor(plan, pred, 
splitter, edge, spliteeKey);
+            }
+        }
+
+        if (plan.getSuccessors(splittee) != null) {
+            List<TezOperator> succs = new 
ArrayList<TezOperator>(plan.getSuccessors(splittee));
+            List<TezOperator> splitterSuccs = plan.getSuccessors(splitter);
             for (TezOperator succTezOperator : succs) {
                 TezEdgeDescriptor edge = 
succTezOperator.inEdges.get(splittee.getOperatorKey());
-
                 splitter.outEdges.remove(splittee.getOperatorKey());
                 succTezOperator.inEdges.remove(splittee.getOperatorKey());
                 plan.disconnect(splittee, succTezOperator);
-                TezCompilerUtil.connect(plan, splitter, succTezOperator, edge);
 
-                try {
-                    List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class);
-                    for (TezInput input : inputs) {
-                        
input.replaceInput(splittee.getOperatorKey().toString(),
-                                splitter.getOperatorKey().toString());
-                    }
-                    List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
-                    for (POUserFunc userFunc : userFuncs) {
-                        if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                            TezInput tezInput = (TezInput)userFunc.getFunc();
-                            
tezInput.replaceInput(splittee.getOperatorKey().toString(),
-                                    splitter.getOperatorKey().toString());
-                            
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
-                        }
-                    }
-                } catch (VisitorException e) {
-                    throw new PlanException(e);
+                // Do not connect again in case of self join/cross/cogroup or 
union
+                if (splitterSuccs == null || 
!splitterSuccs.contains(succTezOperator)) {
+                    TezCompilerUtil.connectTezOpToNewPredecessor(plan, 
succTezOperator, splitter, edge, null);
                 }
 
+                TezCompilerUtil.replaceInput(succTezOperator, spliteeKey, 
splitterKey);
+
                 if (succTezOperator.isUnion()) {
-                    int index = 
succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey());
-                    if (index > -1) {
-                        succTezOperator.getUnionPredecessors().set(index, 
splitter.getOperatorKey());
+                    int index = 
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
+                    while (index > -1) {
+                        succTezOperator.getUnionMembers().set(index, 
splitter.getOperatorKey());
+                        index = 
succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey());
                     }
                 }
             }
@@ -179,7 +292,7 @@ public class MultiQueryOptimizerTez exte
         plan.remove(splittee);
     }
 
-    static public void addSubPlanPropertiesToParent(TezOperator parentOper, 
TezOperator subPlanOper) {
+    private void addSubPlanPropertiesToParent(TezOperator parentOper, 
TezOperator subPlanOper) {
         // Copy only map side properties. For eg: crossKeys.
         // Do not copy reduce side specific properties. For eg: 
useSecondaryKey, segmentBelow, sortOrder, etc
         if (subPlanOper.getCrossKeys() != null) {
@@ -189,6 +302,11 @@ public class MultiQueryOptimizerTez exte
         }
         parentOper.copyFeatures(subPlanOper, null);
 
+        // For skewed join right input
+        if (subPlanOper.getSampleOperator() !=  null) {
+            parentOper.setSampleOperator(subPlanOper.getSampleOperator());
+        }
+
         if (subPlanOper.getRequestedParallelism() > 
parentOper.getRequestedParallelism()) {
             
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Fri Mar  4 18:17:39 2016
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
-import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -26,14 +25,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
@@ -81,25 +77,20 @@ public class ParallelismSetter extends T
             // Can only set parallelism here if the parallelism isn't derived 
from
             // splits
             int parallelism = -1;
-            boolean intermediateReducer = false;
-            LinkedList<POStore> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
-            if (stores.size() <= 0) {
-                intermediateReducer = true;
-            }
             if (tezOp.getLoaderInfo().getLoads() != null && 
tezOp.getLoaderInfo().getLoads().size() > 0) {
-                // TODO: Can be set to -1 if TEZ-601 gets fixed and getting 
input
-                // splits can be moved to if(loads) block below
-                parallelism = 
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
-                tezOp.setRequestedParallelism(parallelism);
+                // requestedParallelism of Loader vertex is handled in 
LoaderProcessor
+                // propogate to vertexParallelism
+                tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+                incrementTotalParallelism(tezOp, 
tezOp.getRequestedParallelism());
+                return;
             } else {
                 int prevParallelism = -1;
                 boolean isOneToOneParallelism = false;
-                intermediateReducer = 
TezCompilerUtil.isIntermediateReducer(tezOp);
 
                 for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : 
tezOp.inEdges.entrySet()) {
                     if (entry.getValue().dataMovementType == 
DataMovementType.ONE_TO_ONE) {
                         TezOperator pred = mPlan.getOperator(entry.getKey());
-                        parallelism = pred.getEffectiveParallelism();
+                        parallelism = 
pred.getEffectiveParallelism(pc.defaultParallel);
                         if (prevParallelism == -1) {
                             prevParallelism = parallelism;
                         } else if (prevParallelism != parallelism) {
@@ -107,7 +98,12 @@ public class ParallelismSetter extends T
                                     + tezOp.getOperatorKey().toString() + " 
are not equal");
                         }
                         
tezOp.setRequestedParallelism(pred.getRequestedParallelism());
-                        
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+                        // If tezOp.estimatedParallelism already set, don't 
override
+                        // The only case is in PigGraceShuffleVertexManager, 
which
+                        // set the estimated parallelism according to the 
output data size of the node
+                        if (tezOp.getEstimatedParallelism()==-1) {
+                            
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+                        }
                         isOneToOneParallelism = true;
                         incrementTotalParallelism(tezOp, parallelism);
                         parallelism = -1;
@@ -122,7 +118,7 @@ public class ParallelismSetter extends T
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && intermediateReducer
+                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
@@ -133,6 +129,9 @@ public class ParallelismSetter extends T
                             // if it is intermediate reducer
                             parallelism = estimator.estimateParallelism(mPlan, 
tezOp, conf);
                             if (overrideRequestedParallelism) {
+                                if (tezOp.getRequestedParallelism() != 
parallelism) {
+                                    LOG.info("Increased requested parallelism 
of " + tezOp.getOperatorKey() + " to " + parallelism);
+                                }
                                 tezOp.setRequestedParallelism(parallelism);
                             } else {
                                 tezOp.setEstimatedParallelism(parallelism);
@@ -141,7 +140,12 @@ public class ParallelismSetter extends T
                             parallelism = tezOp.getEstimatedParallelism();
                         }
                         if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
-                            if (!overrideRequestedParallelism) {
+                            boolean additionalEdge = false;
+                            if (tezOp.isGlobalSort() && 
getPlan().getPredecessors(tezOp).size() != 1 ||
+                                    tezOp.isSkewedJoin() && 
getPlan().getPredecessors(tezOp).size() != 2) {
+                                additionalEdge = true;
+                            }
+                            if (!overrideRequestedParallelism && 
!additionalEdge) {
                                 incrementTotalParallelism(tezOp, parallelism);
                                 // PartitionerDefinedVertexManager will 
determine parallelism.
                                 // So call setVertexParallelism with -1
@@ -156,12 +160,12 @@ public class ParallelismSetter extends T
                                 for (TezOperator pred : 
mPlan.getPredecessors(tezOp)) {
                                     if (pred.isSampleBasedPartitioner()) {
                                         for (TezOperator partitionerPred : 
mPlan.getPredecessors(pred)) {
-                                            if 
(partitionerPred.isSampleAggregation()) {
-                                                LOG.debug("Updating constant 
value to " + parallelism + " in " + partitionerPred.plan);
-                                                LOG.info("Increased requested 
parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism);
+                                            if 
(partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) {
+                                                LOG.debug("Updating 
parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
                                                 ParallelConstantVisitor 
visitor =
                                                         new 
ParallelConstantVisitor(partitionerPred.plan, parallelism);
                                                 visitor.visit();
+                                                
partitionerPred.setNeedEstimatedQuantile(false);
                                                 break;
                                             }
                                         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 Fri Mar  4 18:17:39 2016
@@ -56,12 +56,13 @@ public class SecondaryKeyOptimizerTez ex
             return;
         }
 
+        // TODO: PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup
         // Current code does not handle more than one predecessors
         // even though it is possible. The problem is when we
         // process the first predecessor, we remove the foreach inner
         // operators from the reduce side, and the second predecessor
         // cannot see them
-        if (predecessors.size()>1) {
+        if (predecessors.size() > 1) {
             return;
         }
         TezOperator from = predecessors.get(0);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 Fri Mar  4 18:17:39 2016
@@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configurat
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -42,7 +44,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -56,7 +58,7 @@ import org.apache.tez.dag.api.EdgeProper
  *
  * Since currently it is only possible to reduce the parallelism
  * estimation is exaggerated and will rely on Tez runtime to
- * descrease the parallelism
+ * decrease the parallelism
  */
 public class TezOperDependencyParallelismEstimator implements 
TezParallelismEstimator {
 
@@ -65,6 +67,14 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
 
+    // Most of the cases distinct does not reduce much.
+    // So keeping it high at 0.9
+    static final double DEFAULT_DISTINCT_FACTOR = 0.9;
+
+    // Most of the cases aggregation can reduce by a lot.
+    // But keeping at 0.7 to take worst case scenarios into account
+    static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
+
     private PigContext pc;
 
     @Override
@@ -79,15 +89,13 @@ public class TezOperDependencyParallelis
             return -1;
         }
 
-        boolean intermediateReducer = 
TezCompilerUtil.isIntermediateReducer(tezOper);
-
         // TODO: If map opts and reduce opts are same estimate higher 
parallelism
         // for tasks based on the count of number of map tasks else be 
conservative as now
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
         // If parallelism is set explicitly, respect it
-        if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) {
+        if (!tezOper.isIntermediateReducer() && 
tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
@@ -111,7 +119,7 @@ public class TezOperDependencyParallelis
             // and sample/scalar (does not impact parallelism)
             if 
(entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER ||
                     
entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) {
-                double predParallelism = pred.getEffectiveParallelism();
+                double predParallelism = 
pred.getEffectiveParallelism(pc.defaultParallel);
                 if (predParallelism==-1) {
                     throw new IOException("Cannot estimate parallelism for " + 
tezOper.getOperatorKey().toString()
                             + ", effective parallelism for predecessor " + 
tezOper.getOperatorKey().toString()
@@ -120,10 +128,8 @@ public class TezOperDependencyParallelis
 
                 //For cases like Union we can just limit to sum of pred 
vertices parallelism
                 boolean applyFactor = !tezOper.isUnion();
-                if (pred.plan!=null && applyFactor) { // pred.plan can be null 
if it is a VertexGroup
-                    TezParallelismFactorVisitor parallelismFactorVisitor = new 
TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
-                    parallelismFactorVisitor.visit();
-                    predParallelism = predParallelism * 
parallelismFactorVisitor.getFactor();
+                if (!pred.isVertexGroup() && applyFactor) {
+                    predParallelism = predParallelism * 
pred.getParallelismFactor(tezOper);
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -131,7 +137,7 @@ public class TezOperDependencyParallelis
 
         int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
 
-        if (intermediateReducer && 
tezOper.isOverrideIntermediateParallelism()) {
+        if (tezOper.isIntermediateReducer() && 
tezOper.isOverrideIntermediateParallelism()) {
             // Estimated reducers should not be more than the configured limit
             roundedEstimatedParallelism = 
Math.min(roundedEstimatedParallelism, maxTaskCount);
             int userSpecifiedParallelism = pc.defaultParallel;
@@ -150,6 +156,12 @@ public class TezOperDependencyParallelis
             roundedEstimatedParallelism = 
Math.min(roundedEstimatedParallelism, maxTaskCount);
         }
 
+        if (roundedEstimatedParallelism == 0) {
+            throw new IOException("Estimated parallelism for "
+                    + tezOper.getOperatorKey().toString()
+                    + " is 0 which is unexpected");
+        }
+
         return roundedEstimatedParallelism;
     }
 
@@ -157,7 +169,7 @@ public class TezOperDependencyParallelis
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         for (TezOperator pred : preds) {
             if (pred.isVertexGroup()) {
-                for (OperatorKey unionPred : pred.getUnionPredecessors()) {
+                for (OperatorKey unionPred : pred.getVertexGroupMembers()) {
                     if (unionPred.toString().equals(inputKey)) {
                         return plan.getOperator(unionPred);
                     }
@@ -174,9 +186,24 @@ public class TezOperDependencyParallelis
     public static class TezParallelismFactorVisitor extends PhyPlanVisitor {
         private double factor = 1;
         private String outputKey;
-        public TezParallelismFactorVisitor(PhysicalPlan plan, String 
outputKey) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(plan));
-            this.outputKey = outputKey;
+        private TezOperator tezOp;
+
+        public TezParallelismFactorVisitor(TezOperator tezOp, TezOperator 
successor) {
+            super(tezOp.plan, new DepthFirstWalker<PhysicalOperator, 
PhysicalPlan>(tezOp.plan));
+            this.tezOp = tezOp;
+            this.outputKey = tezOp.getOperatorKey().toString();
+
+            if (successor != null) {
+                // Map side combiner
+                TezEdgeDescriptor edge = 
tezOp.outEdges.get(successor.getOperatorKey());
+                if (!edge.combinePlan.isEmpty()) {
+                    if (successor.isDistinct()) {
+                        factor = DEFAULT_DISTINCT_FACTOR;
+                    } else {
+                        factor = DEFAULT_AGGREGATION_FACTOR;
+                    }
+                }
+            }
         }
 
         @Override
@@ -194,11 +221,17 @@ public class TezOperDependencyParallelis
         @Override
         public void visitPOForEach(POForEach nfe) throws VisitorException {
             List<Boolean> flattens = nfe.getToBeFlattened();
+            List<PhysicalPlan> inputPlans = nfe.getInputPlans();
             boolean containFlatten = false;
-            for (boolean flatten : flattens) {
-                if (flatten) {
-                    containFlatten = true;
-                    break;
+            for (int i = 0; i < flattens.size(); i++) {
+                if (flattens.get(i)) {
+                    PhysicalPlan inputPlan = inputPlans.get(i);
+                    PhysicalOperator root = inputPlan.getRoots().get(0);
+                    if (root instanceof POProject
+                            && root.getResultType() == DataType.BAG) {
+                        containFlatten = true;
+                        break;
+                    }
                 }
             }
             if (containFlatten) {
@@ -226,6 +259,12 @@ public class TezOperDependencyParallelis
             // JoinPackager is equivalent to a foreach flatten after shuffle
             if (pkg.getPkgr() instanceof JoinPackager) {
                 factor *= DEFAULT_FLATTEN_FACTOR;
+            } else if (pkg.getPkgr() instanceof CombinerPackager) {
+                if (tezOp.isDistinct()) {
+                    factor *= DEFAULT_DISTINCT_FACTOR;
+                } else {
+                    factor *= DEFAULT_AGGREGATION_FACTOR;
+                }
             }
         }
 


Reply via email to