Author: rohini
Date: Thu Jun 25 20:59:31 2015
New Revision: 1687646

URL: http://svn.apache.org/r1687646
Log:
PIG-4574: Eliminate identity vertex for order by and skewed join right after 
LOAD (rohini)

Added:
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jun 25 20:59:31 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4574: Eliminate identity vertex for order by and skewed join right after 
LOAD (rohini)
+
 PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)
 
 PIG-4570: Allow AvroStorage to use a class for the schema (pmazak via daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Thu Jun 25 20:59:31 2015
@@ -1485,10 +1485,19 @@ public class TezCompiler extends PhyPlan
             POPoissonSample poSample = new POPoissonSample(new 
OperatorKey(scope,nig.getNextNodeId(scope)),
                     -1, sampleRate, heapPerc, totalMemory);
 
-            TezOperator prevOp = compiledInputs[0];
-            prevOp.plan.addAsLeaf(lrTez);
-            prevOp.plan.addAsLeaf(poSample);
-            prevOp.markSampler();
+            TezOperator samplerOper = compiledInputs[0];
+            boolean writeDataForPartitioner = 
shouldWriteDataForPartitioner(samplerOper);
+
+            PhysicalPlan partitionerPlan = null;
+            if (writeDataForPartitioner) {
+                samplerOper.plan.addAsLeaf(lrTez);
+            } else {
+                partitionerPlan = samplerOper.plan.clone();
+                partitionerPlan.addAsLeaf(lrTez);
+            }
+
+            samplerOper.plan.addAsLeaf(poSample);
+            samplerOper.markSampler();
 
             MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = 
op.getJoinPlans();
             List<PhysicalOperator> l = plan.getPredecessors(op);
@@ -1532,9 +1541,9 @@ public class TezCompiler extends PhyPlan
             // This foreach will pick the sort key columns from the 
POPoissonSample output
             POForEach nfe1 = new POForEach(new 
OperatorKey(scope,nig.getNextNodeId(scope)),
                     -1, eps1, flat1);
-            prevOp.plan.addAsLeaf(nfe1);
-            prevOp.plan.addAsLeaf(lrTezSample);
-            prevOp.setClosed(true);
+            samplerOper.plan.addAsLeaf(nfe1);
+            samplerOper.plan.addAsLeaf(lrTezSample);
+            samplerOper.setClosed(true);
 
             int rp = op.getRequestedParallelism();
             if (rp == -1) {
@@ -1557,10 +1566,9 @@ public class TezCompiler extends PhyPlan
 
             compiledInputs = new TezOperator[] {joinInputs[0]};
 
-            blocking();
-
-            // Add a POIdentityInOutTez to the joinJobs[0] which is a 
partition vertex.
-            // It just partitions the data from first vertex based on the 
quantiles from sample vertex.
+            // Add a partitioner vertex that partitions the data based on the 
quantiles from sample vertex.
+            curTezOp = getTezOp();
+            tezPlan.add(curTezOp);
             joinJobs[0] = curTezOp;
 
             try {
@@ -1578,15 +1586,38 @@ public class TezCompiler extends PhyPlan
             }
             lrTez.setKeyType(type);
             lrTez.setPlans(groups);
-            lrTez.setSkewedJoin(true);
             lrTez.setResultType(DataType.TUPLE);
 
-            POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
-                    OperatorKey.genOpKey(scope), lrTez);
-            identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
-            joinJobs[0].plan.addAsLeaf(identityInOutTez);
+            POLocalRearrangeTez partitionerLR = null;
+            if (!writeDataForPartitioner) {
+                // Read input from hdfs again
+                joinJobs[0].plan = partitionerPlan;
+                partitionerLR = lrTez;
+                lrTez.setSkewedJoin(true);
+            } else {
+                // Add a POIdentityInOutTez which just passes data through 
from sampler vertex
+                partitionerLR = new POIdentityInOutTez(
+                        OperatorKey.genOpKey(scope),
+                        lrTez,
+                        samplerOper.getOperatorKey().toString());
+                partitionerLR.setSkewedJoin(true);
+                joinJobs[0].plan.addAsLeaf(partitionerLR);
+
+                // Connect the sampler vertex to the partitioner vertex
+                lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
samplerOper, joinJobs[0]);
+                // TODO: PIG-3775 unsorted shuffle
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = UnorderedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+                // If prevOp.requestedParallelism changes based on no. of 
input splits
+                // it will reflect for joinJobs[0] so that 1-1 edge will work.
+                joinJobs[0].setRequestedParallelismByReference(samplerOper);
+            }
             joinJobs[0].setClosed(true);
             joinJobs[0].markSampleBasedPartitioner();
+            joinJobs[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
             rearrangeOutputs[0] = joinJobs[0];
 
             compiledInputs = new TezOperator[] {joinInputs[1]};
@@ -1663,23 +1694,11 @@ public class TezCompiler extends PhyPlan
             fe.visit(this);
 
             // Connect vertices
-            lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
             
lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
-            
identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString());
+            
partitionerLR.setOutputKey(joinJobs[2].getOperatorKey().toString());
             pr.setOutputKey(joinJobs[2].getOperatorKey().toString());
 
-            TezEdgeDescriptor edge = 
joinJobs[0].inEdges.get(prevOp.getOperatorKey());
-            joinJobs[0].setUseMRMapSettings(prevOp.isUseMRMapSettings());
-            // TODO: Convert to unsorted shuffle after TEZ-661
-            // Use 1-1 edge
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = UnorderedKVOutput.class.getName();
-            edge.inputClassName = UnorderedKVInput.class.getName();
-            // If prevOp.requestedParallelism changes based on no. of input 
splits
-            // it will reflect for joinJobs[0] so that 1-1 edge will work.
-            joinJobs[0].setRequestedParallelismByReference(prevOp);
-
-            TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
+            TezCompilerUtil.connect(tezPlan, samplerOper, sampleJobPair.first);
 
             POValueOutputTez sampleOut = (POValueOutputTez) 
sampleJobPair.first.plan.getLeaves().get(0);
             for (int i = 0; i <= 2; i++) {
@@ -1687,16 +1706,16 @@ public class TezCompiler extends PhyPlan
                     // We need to send sample to left relation partitioner 
vertex, right relation load vertex,
                     // and join vertex (IsFirstReduceOfKey in join vertex need 
sample file as well)
                     joinJobs[i].setSampleOperator(sampleJobPair.first);
-    
+
                     // Configure broadcast edges for distribution map
-                    edge = TezCompilerUtil.connect(tezPlan, 
sampleJobPair.first, joinJobs[i]);
+                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
sampleJobPair.first, joinJobs[i]);
                     TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
                     
sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
                 }
 
                 // Configure skewed partitioner for join
                 if (i != 2) {
-                    edge = 
joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+                    TezEdgeDescriptor edge = 
joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
                     edge.partitionerClass = SkewedPartitionerTez.class;
                 }
             }
@@ -1726,55 +1745,71 @@ public class TezCompiler extends PhyPlan
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
 
-    /**
-     * Force an end to the current vertex with store and sample
-     * @return Tez operator that now is finished with a store.
-     * @throws PlanException
-     */
-    private TezOperator endSingleInputWithStoreAndSample(
-            POSort sort,
-            POLocalRearrangeTez lr,
-            POLocalRearrangeTez lrSample,
-            byte keyType,
-            Pair<POProject, Byte>[] fields) throws PlanException {
-        if(compiledInputs.length>1) {
-            int errCode = 2023;
-            String msg = "Received a multi input plan when expecting only a 
single input one.";
-            throw new PlanException(msg, errCode, PigException.BUG);
+    private boolean shouldWriteDataForPartitioner(TezOperator samplerOper) {
+        // If there are operators other than load and foreach (like filter
+        // split, etc) in the plan, then process and write the data out
+        // and save the cost of processing in the partitioner vertex
+        // Else read from hdfs again save the IO cost of the extra write
+        boolean writeDataForPartitioner = false;
+        if (samplerOper.plan.getRoots().get(0) instanceof POLoad) {
+            for (PhysicalOperator oper : samplerOper.plan) {
+                if (oper instanceof POForEach || oper instanceof POLoad) {
+                    continue;
+                }
+                writeDataForPartitioner = true;
+                break;
+            }
+        } else {
+            writeDataForPartitioner = true;
         }
-        TezOperator oper = compiledInputs[0];
-        if (!oper.isClosed()) {
+        return writeDataForPartitioner;
+    }
 
-            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-            if (fields == null) {
-                // This is project *
-                PhysicalPlan ep = new PhysicalPlan();
-                POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
-                prj.setStar(true);
-                prj.setOverloaded(false);
-                prj.setResultType(DataType.TUPLE);
-                ep.add(prj);
-                eps.add(ep);
-            } else {
-                // Attach the sort plans to the local rearrange to get the
-                // projection.
-                eps.addAll(sort.getSortPlans());
-            }
+    /**
+     * Get LocalRearrange for POSort input
+     */
+    private POLocalRearrangeTez getLocalRearrangeForSortInput(POSort sort,
+            byte keyType, Pair<POProject, Byte>[] fields)
+            throws PlanException {
+        POLocalRearrangeTez lr = new 
POLocalRearrangeTez(OperatorKey.genOpKey(scope));
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        if (fields == null) {
+            // This is project *
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            ep.add(prj);
+            eps.add(ep);
+        } else {
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps.addAll(sort.getSortPlans());
+        }
+
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on newly created 
POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : 
keyType);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+        return lr;
+    }
 
-            try {
-                lr.setIndex(0);
-            } catch (ExecException e) {
-                int errCode = 2058;
-                String msg = "Unable to set index on newly created 
POLocalRearrange.";
-                throw new PlanException(msg, errCode, PigException.BUG, e);
-            }
-            lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE 
: keyType);
-            lr.setPlans(eps);
-            lr.setResultType(DataType.TUPLE);
-            lr.addOriginalLocation(sort.getAlias(), 
sort.getOriginalLocations());
+    /**
+     * Add a sampler to the sort input
+     */
+    private POLocalRearrangeTez addSamplingToSortInput(POSort sort, 
TezOperator oper,
+            byte keyType, Pair<POProject, Byte>[] fields) throws PlanException 
{
 
-            lr.setOutputKey(curTezOp.getOperatorKey().toString());
-            oper.plan.addAsLeaf(lr);
+        POLocalRearrangeTez lrSample = 
localRearrangeFactory.create(LocalRearrangeType.NULL);
+        if (!oper.isClosed()) {
 
             List<Boolean> flat1 = new ArrayList<Boolean>();
             List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -1875,7 +1910,8 @@ public class TezCompiler extends PhyPlan
             String msg = "The current operator is closed. This is unexpected 
while compiling.";
             throw new PlanException(msg, errCode, PigException.BUG);
         }
-        return oper;
+        oper.markSampler();
+        return lrSample;
     }
 
     private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob(
@@ -2112,32 +2148,42 @@ public class TezCompiler extends PhyPlan
 
     private TezOperator[] getSortJobs(
             TezOperator inputOper,
+            PhysicalPlan partitionerPlan,
             POLocalRearrangeTez inputOperRearrange,
             POSort sort,
             byte keyType,
             Pair<POProject, Byte>[] fields) throws PlanException{
+
         TezOperator[] opers = new TezOperator[2];
+
+        // Partitioner Vertex
         TezOperator oper1 = getTezOp();
         tezPlan.add(oper1);
         opers[0] = oper1;
-
-        POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
-                OperatorKey.genOpKey(scope),
-                inputOperRearrange);
-        identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
-        oper1.plan.addAsLeaf(identityInOutTez);
+        POLocalRearrangeTez partitionerLR = null;
+        if (partitionerPlan != null) {
+            // Read from hdfs again
+            oper1.plan = partitionerPlan;
+            partitionerLR = inputOperRearrange;
+        } else {
+            partitionerLR = new POIdentityInOutTez(
+                    OperatorKey.genOpKey(scope),
+                    inputOperRearrange,
+                    inputOper.getOperatorKey().toString());
+            oper1.plan.addAsLeaf(partitionerLR);
+        }
         oper1.setClosed(true);
         oper1.markSampleBasedPartitioner();
 
+        // Global Sort Vertex
         TezOperator oper2 = getTezOp();
+        partitionerLR.setOutputKey(oper2.getOperatorKey().toString());
         oper2.markGlobalSort();
         opers[1] = oper2;
         tezPlan.add(oper2);
 
         long limit = sort.getLimit();
-
         boolean[] sortOrder;
-
         List<Boolean> sortOrderList = sort.getMAscCols();
         if(sortOrderList != null) {
             sortOrder = new boolean[sortOrderList.size()];
@@ -2147,8 +2193,6 @@ public class TezCompiler extends PhyPlan
             oper2.setSortOrder(sortOrder);
         }
 
-        identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
-
         if (limit!=-1) {
             POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
             pkg_c.setPkgr(new LitePackager());
@@ -2228,6 +2272,13 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitSort(POSort op) throws VisitorException {
         try{
+
+            if (compiledInputs.length > 1) {
+                int errCode = 2023;
+                String msg = "Received a multi input plan when expecting only 
a single input one.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
             Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
             byte keyType = DataType.UNKNOWN;
 
@@ -2242,58 +2293,65 @@ public class TezCompiler extends PhyPlan
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
 
-            POLocalRearrangeTez lr = new 
POLocalRearrangeTez(OperatorKey.genOpKey(scope));
-            POLocalRearrangeTez lrSample = 
localRearrangeFactory.create(LocalRearrangeType.NULL);
+            TezOperator samplerOper = compiledInputs[0];
+            boolean writeDataForPartitioner = 
shouldWriteDataForPartitioner(samplerOper);
 
-            TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, 
lrSample, keyType, fields);
-            prevOper.markSampler();
+            POLocalRearrangeTez lr = getLocalRearrangeForSortInput(op, 
keyType, fields);
+            PhysicalPlan partitionerPlan = null;
+            if (writeDataForPartitioner) {
+                samplerOper.plan.addAsLeaf(lr);
+            } else {
+                partitionerPlan = samplerOper.plan.clone();
+                partitionerPlan.addAsLeaf(lr);
+            }
 
+            // if rp is still -1, let it be, TezParallelismEstimator will set 
it to an estimated rp
             int rp = op.getRequestedParallelism();
             if (rp == -1) {
                 rp = pigContext.defaultParallel;
             }
 
-            // if rp is still -1, let it be, TezParallelismEstimator will set 
it to an estimated rp
+            // Add sampling to sort input. Create a sample aggregation 
operator and connect both
+            POLocalRearrangeTez lrSample = addSamplingToSortInput(op, 
samplerOper, keyType, fields);
             Pair<TezOperator, Integer> quantJobParallelismPair = 
getOrderbySamplingAggregationJob(op, rp);
-            TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, 
fields);
+            TezCompilerUtil.connect(tezPlan, samplerOper, 
quantJobParallelismPair.first);
 
-            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
prevOper, sortOpers[0]);
-            sortOpers[0].setUseMRMapSettings(prevOper.isUseMRMapSettings());
+            // Create the partitioner and the global sort vertices
+            TezOperator[] sortOpers = getSortJobs(samplerOper, 
partitionerPlan, lr, op, keyType, fields);
+            sortOpers[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
+            if (writeDataForPartitioner) {
+                // Connect the sampler and partitioner vertex
+                lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
samplerOper, sortOpers[0]);
+
+                // Use 1-1 edge
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = UnorderedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+                // If prevOper.requestedParallelism changes based on no. of 
input splits
+                // it will reflect for sortOpers[0] so that 1-1 edge will work.
+                sortOpers[0].setRequestedParallelismByReference(samplerOper);
+            }
 
-            // Use 1-1 edge
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = UnorderedKVOutput.class.getName();
-            edge.inputClassName = UnorderedKVInput.class.getName();
-            // If prevOper.requestedParallelism changes based on no. of input 
splits
-            // it will reflect for sortOpers[0] so that 1-1 edge will work.
-            sortOpers[0].setRequestedParallelismByReference(prevOper);
-            if (rp==-1) {
+            if (rp == -1) {
                 quantJobParallelismPair.first.setNeedEstimatedQuantile(true);
             }
+            quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
             
sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
 
-            /*
-            // TODO: Convert to unsorted shuffle after TEZ-661
-            // edge.outputClassName = UnorderedKVOutput.class.getName();
-            // edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
-            
sortOpers[0].setRequestedParallelism(quantJobParallelismPair.second);
-            
sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
-            */
-
-            TezCompilerUtil.connect(tezPlan, prevOper, 
quantJobParallelismPair.first);
-            lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
-            
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
-
-            edge = TezCompilerUtil.connect(tezPlan, 
quantJobParallelismPair.first, sortOpers[0]);
+            // Broadcast the sample to Partitioner vertex
+            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
quantJobParallelismPair.first, sortOpers[0]);
             TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
             POValueOutputTez sampleOut = 
(POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
             sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].setSampleOperator(quantJobParallelismPair.first);
 
+            
lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
+
+            // Connect the Partitioner and Global Sort vertex
             edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], 
sortOpers[1]);
             edge.partitionerClass = WeightedRangePartitionerTez.class;
-
             curTezOp = sortOpers[1];
 
             // TODO: Review sort udf
@@ -2301,7 +2359,6 @@ public class TezCompiler extends PhyPlan
 //                
curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
 //                curTezOp.isUDFComparatorUsed = true;
 //            }
-            quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
 
             // If Order by followed by Limit and parallelism of order by is 
not 1
             // add a new vertex for Limit with parallelism 1.

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
 Thu Jun 25 20:59:31 2015
@@ -58,9 +58,10 @@ public class POIdentityInOutTez extends
     private transient boolean shuffleInput;
     private transient boolean finished = false;
 
-    public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange) {
+    public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, 
String inputKey) {
         super(inputRearrange);
         this.mKey = k;
+        this.inputKey = inputKey;
     }
 
     public void setInputKey(String inputKey) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 Thu Jun 25 20:59:31 2015
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -144,6 +145,13 @@ public class POLocalRearrangeTez extends
                     // assign the tuple to its slot in the projection.
                     key.setIndex(index);
                     val.setIndex(index);
+                    if (isSkewedJoin) {
+                        // Wrap into a NullablePartitionWritable to match the 
key
+                        // of the right table from POPartitionRearrangeTez for 
the skewed join
+                        NullablePartitionWritable wrappedKey = new 
NullablePartitionWritable(key);
+                        wrappedKey.setPartition(-1);
+                        key = wrappedKey;
+                    }
                     writer.write(key, val);
                 } else {
                     illustratorMarkup(res.result, res.result, 0);

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld 
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Limit-2.gld 
Thu Jun 25 20:59:31 2015
@@ -4,86 +4,92 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-12    ->      Tez vertex scope-22,Tez vertex scope-33,
-Tez vertex scope-22    ->      Tez vertex scope-33,
-Tez vertex scope-33    ->      Tez vertex scope-35,
-Tez vertex scope-35    ->      Tez vertex scope-46,
-Tez vertex scope-46
+Tez vertex scope-12    ->      Tez vertex scope-28,
+Tez vertex scope-28    ->      Tez vertex scope-39,
+Tez vertex scope-39    ->      Tez vertex scope-40,
+Tez vertex scope-40    ->      Tez vertex scope-51,
+Tez vertex scope-51
 
 Tez vertex scope-12
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-15        ->       scope-22
+Local Rearrange[tuple]{tuple}(false) - scope-21        ->       scope-28
 |   |
-|   Constant(DummyVal) - scope-14
+|   Constant(DummyVal) - scope-20
 |
-|---New For Each(false,false,true)[tuple] - scope-21
+|---New For Each(false,false,true)[tuple] - scope-27
     |   |
     |   Project[int][0] - scope-8
     |   |
     |   Project[int][1] - scope-9
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-20
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-26
     |   |
-    |   |---Project[tuple][*] - scope-19
+    |   |---Project[tuple][*] - scope-25
     |
-    |---ReservoirSample - scope-18
+    |---ReservoirSample - scope-24
         |
-        |---b: Local Rearrange[tuple]{tuple}(false) - scope-13 ->       
scope-33
+        |---a: New For Each(false,false)[bag] - scope-7
             |   |
-            |   Project[int][0] - scope-8
+            |   Cast[int] - scope-2
             |   |
-            |   Project[int][1] - scope-9
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
             |
-            |---a: New For Each(false,false)[bag] - scope-7
-                |   |
-                |   Cast[int] - scope-2
-                |   |
-                |   |---Project[bytearray][0] - scope-1
-                |   |
-                |   Cast[int] - scope-5
-                |   |
-                |   |---Project[bytearray][1] - scope-4
-                |
-                |---a: 
Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-22
+            |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - 
scope-0
+Tez vertex scope-28
 # Plan on vertex
-POValueOutputTez - scope-32    ->       [scope-33]
+POValueOutputTez - scope-38    ->       [scope-39]
 |
-|---New For Each(false)[tuple] - scope-31
+|---New For Each(false)[tuple] - scope-37
     |   |
-    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
 - scope-30
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
 - scope-36
     |   |
-    |   |---Project[tuple][*] - scope-29
+    |   |---Project[tuple][*] - scope-35
     |
-    |---New For Each(false,false)[tuple] - scope-28
+    |---New For Each(false,false)[tuple] - scope-34
         |   |
-        |   Constant(-1) - scope-27
+        |   Constant(-1) - scope-33
         |   |
-        |   Project[bag][1] - scope-24
+        |   Project[bag][1] - scope-30
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-23
-Tez vertex scope-33
+        |---Package(Packager)[tuple]{bytearray} - scope-29
+Tez vertex scope-39
 # Plan on vertex
-POIdentityInOutTez - scope-34  <-       scope-12       ->       scope-35
+b: Local Rearrange[tuple]{tuple}(false) - scope-13     ->       scope-40
 |   |
 |   Project[int][0] - scope-8
 |   |
 |   Project[int][1] - scope-9
-Tez vertex scope-35
+|
+|---a: New For Each(false,false)[bag] - scope-19
+    |   |
+    |   Cast[int] - scope-16
+    |   |
+    |   |---Project[bytearray][0] - scope-15
+    |   |
+    |   Cast[int] - scope-18
+    |   |
+    |   |---Project[bytearray][1] - scope-17
+    |
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-14
+Tez vertex scope-40
 # Plan on vertex
-POValueOutputTez - scope-45    ->       [scope-46]
+POValueOutputTez - scope-50    ->       [scope-51]
 |
-|---Limit - scope-44
+|---Limit - scope-49
     |
-    |---New For Each(true)[tuple] - scope-43
+    |---New For Each(true)[tuple] - scope-48
         |   |
-        |   Project[bag][1] - scope-42
+        |   Project[bag][1] - scope-47
         |
-        |---Package(LitePackager)[tuple]{tuple} - scope-41
-Tez vertex scope-46
+        |---Package(LitePackager)[tuple]{tuple} - scope-46
+Tez vertex scope-51
 # Plan on vertex
 c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-11
 |
-|---Limit - scope-48
+|---Limit - scope-53
     |
-    |---POValueInputTez - scope-47     <-       scope-35
+    |---POValueInputTez - scope-52     <-       scope-40

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld 
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld 
Thu Jun 25 20:59:31 2015
@@ -4,70 +4,78 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-11    ->      Tez vertex scope-20,Tez vertex scope-30,
-Tez vertex scope-20    ->      Tez vertex scope-30,
-Tez vertex scope-30    ->      Tez vertex scope-32,
-Tez vertex scope-32
+Tez vertex scope-11    ->      Tez vertex scope-26,
+Tez vertex scope-26    ->      Tez vertex scope-36,
+Tez vertex scope-36    ->      Tez vertex scope-37,
+Tez vertex scope-37
 
 Tez vertex scope-11
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-14        ->       scope-20
+Local Rearrange[tuple]{tuple}(false) - scope-20        ->       scope-26
 |   |
-|   Constant(DummyVal) - scope-13
+|   Constant(DummyVal) - scope-19
 |
-|---New For Each(false,true)[tuple] - scope-19
+|---New For Each(false,true)[tuple] - scope-25
     |   |
     |   Project[int][0] - scope-8
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-18
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-24
     |   |
-    |   |---Project[tuple][*] - scope-17
+    |   |---Project[tuple][*] - scope-23
     |
-    |---ReservoirSample - scope-16
+    |---ReservoirSample - scope-22
         |
-        |---b: Local Rearrange[tuple]{int}(false) - scope-12   ->       
scope-30
+        |---a: New For Each(false,false)[bag] - scope-7
             |   |
-            |   Project[int][0] - scope-8
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
             |
-            |---a: New For Each(false,false)[bag] - scope-7
-                |   |
-                |   Cast[int] - scope-2
-                |   |
-                |   |---Project[bytearray][0] - scope-1
-                |   |
-                |   Cast[int] - scope-5
-                |   |
-                |   |---Project[bytearray][1] - scope-4
-                |
-                |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
-Tez vertex scope-20
+            |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
+Tez vertex scope-26
 # Plan on vertex
-POValueOutputTez - scope-29    ->       [scope-30]
+POValueOutputTez - scope-35    ->       [scope-36]
 |
-|---New For Each(false)[tuple] - scope-28
+|---New For Each(false)[tuple] - scope-34
     |   |
-    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
 - scope-27
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
 - scope-33
     |   |
-    |   |---Project[tuple][*] - scope-26
+    |   |---Project[tuple][*] - scope-32
     |
-    |---New For Each(false,false)[tuple] - scope-25
+    |---New For Each(false,false)[tuple] - scope-31
         |   |
-        |   Constant(-1) - scope-24
+        |   Constant(-1) - scope-30
         |   |
-        |   Project[bag][1] - scope-22
+        |   Project[bag][1] - scope-28
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-21
-Tez vertex scope-30
+        |---Package(Packager)[tuple]{bytearray} - scope-27
+Tez vertex scope-36
 # Plan on vertex
-POIdentityInOutTez - scope-31  <-       scope-11       ->       scope-32
+b: Local Rearrange[tuple]{int}(false) - scope-12       ->       scope-37
 |   |
 |   Project[int][0] - scope-8
-Tez vertex scope-32
+|
+|---a: New For Each(false,false)[bag] - scope-18
+    |   |
+    |   Cast[int] - scope-15
+    |   |
+    |   |---Project[bytearray][0] - scope-14
+    |   |
+    |   Cast[int] - scope-17
+    |   |
+    |   |---Project[bytearray][1] - scope-16
+    |
+    |---a: Load(file:///tmp/input:PigStorage(',')) - scope-13
+Tez vertex scope-37
 # Plan on vertex
 b: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-10
 |
-|---New For Each(true)[tuple] - scope-35
+|---New For Each(true)[tuple] - scope-40
     |   |
-    |   Project[bag][1] - scope-34
+    |   Project[bag][1] - scope-39
     |
-    |---Package(LitePackager)[tuple]{int} - scope-33
+    |---Package(LitePackager)[tuple]{int} - scope-38

Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld?rev=1687646&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld 
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld 
Thu Jun 25 20:59:31 2015
@@ -0,0 +1,81 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-15    ->      Tez vertex scope-24,Tez vertex scope-34,
+Tez vertex scope-24    ->      Tez vertex scope-34,
+Tez vertex scope-34    ->      Tez vertex scope-36,
+Tez vertex scope-36
+
+Tez vertex scope-15
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-18        ->       scope-24
+|   |
+|   Constant(DummyVal) - scope-17
+|
+|---New For Each(false,true)[tuple] - scope-23
+    |   |
+    |   Project[int][0] - scope-12
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-22
+    |   |
+    |   |---Project[tuple][*] - scope-21
+    |
+    |---ReservoirSample - scope-20
+        |
+        |---c: Local Rearrange[tuple]{int}(false) - scope-16   ->       
scope-34
+            |   |
+            |   Project[int][0] - scope-12
+            |
+            |---b: Filter[bag] - scope-8
+                |   |
+                |   Equal To[boolean] - scope-11
+                |   |
+                |   |---Project[int][0] - scope-9
+                |   |
+                |   |---Constant(1) - scope-10
+                |
+                |---a: New For Each(false,false)[bag] - scope-7
+                    |   |
+                    |   Cast[int] - scope-2
+                    |   |
+                    |   |---Project[bytearray][0] - scope-1
+                    |   |
+                    |   Cast[int] - scope-5
+                    |   |
+                    |   |---Project[bytearray][1] - scope-4
+                    |
+                    |---a: Load(file:///tmp/input:PigStorage(',')) - scope-0
+Tez vertex scope-24
+# Plan on vertex
+POValueOutputTez - scope-33    ->       [scope-34]
+|
+|---New For Each(false)[tuple] - scope-32
+    |   |
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez)[tuple]
 - scope-31
+    |   |
+    |   |---Project[tuple][*] - scope-30
+    |
+    |---New For Each(false,false)[tuple] - scope-29
+        |   |
+        |   Constant(-1) - scope-28
+        |   |
+        |   Project[bag][1] - scope-26
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-25
+Tez vertex scope-34
+# Plan on vertex
+POIdentityInOutTez - scope-35  <-       scope-15       ->       scope-36
+|   |
+|   Project[int][0] - scope-12
+Tez vertex scope-36
+# Plan on vertex
+c: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-14
+|
+|---New For Each(true)[tuple] - scope-39
+    |   |
+    |   Project[bag][1] - scope-38
+    |
+    |---Package(LitePackager)[tuple]{int} - scope-37

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld 
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-1.gld 
Thu Jun 25 20:59:31 2015
@@ -4,68 +4,76 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-27    ->      Tez vertex scope-36,Tez vertex scope-46,
-Tez vertex scope-36    ->      Tez vertex scope-28,Tez vertex scope-46,
-Tez vertex scope-46    ->      Tez vertex scope-50,
-Tez vertex scope-28    ->      Tez vertex scope-50,
-Tez vertex scope-50
+Tez vertex scope-27    ->      Tez vertex scope-42,
+Tez vertex scope-42    ->      Tez vertex scope-28,Tez vertex scope-52,
+Tez vertex scope-52    ->      Tez vertex scope-55,
+Tez vertex scope-28    ->      Tez vertex scope-55,
+Tez vertex scope-55
 
 Tez vertex scope-27
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-31        ->       scope-36
+Local Rearrange[tuple]{tuple}(false) - scope-31        ->       scope-42
 |   |
 |   Constant(DummyVal) - scope-30
 |
-|---New For Each(true,true)[tuple] - scope-35
+|---New For Each(true,true)[tuple] - scope-41
     |   |
     |   Project[int][0] - scope-16
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-34
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-40
     |   |
-    |   |---Project[tuple][*] - scope-33
+    |   |---Project[tuple][*] - scope-39
     |
     |---PoissonSample - scope-32
         |
-        |---Local Rearrange[tuple]{int}(false) - scope-29      ->       
scope-46
+        |---a: New For Each(false,false)[bag] - scope-7
             |   |
-            |   Project[int][0] - scope-16
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
             |
-            |---a: New For Each(false,false)[bag] - scope-7
-                |   |
-                |   Cast[int] - scope-2
-                |   |
-                |   |---Project[bytearray][0] - scope-1
-                |   |
-                |   Cast[int] - scope-5
-                |   |
-                |   |---Project[bytearray][1] - scope-4
-                |
-                |---a: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-36
+            |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) 
- scope-0
+Tez vertex scope-42
 # Plan on vertex
-POValueOutputTez - scope-45    ->       [scope-28, scope-46]
+POValueOutputTez - scope-51    ->       [scope-28, scope-52]
 |
-|---New For Each(false)[tuple] - scope-44
+|---New For Each(false)[tuple] - scope-50
     |   |
-    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-43
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-49
     |   |
-    |   |---Project[tuple][*] - scope-42
+    |   |---Project[tuple][*] - scope-48
     |
-    |---New For Each(false,false)[tuple] - scope-41
+    |---New For Each(false,false)[tuple] - scope-47
         |   |
-        |   Constant(-1) - scope-40
+        |   Constant(-1) - scope-46
         |   |
-        |   Project[bag][1] - scope-38
+        |   Project[bag][1] - scope-44
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-37
-Tez vertex scope-46
+        |---Package(Packager)[tuple]{bytearray} - scope-43
+Tez vertex scope-52
 # Plan on vertex
-POIdentityInOutTez - scope-47  <-       scope-27       ->       scope-50
+Local Rearrange[tuple]{int}(false) - scope-29  ->       scope-55
 |   |
 |   Project[int][0] - scope-16
+|
+|---a: New For Each(false,false)[bag] - scope-38
+    |   |
+    |   Cast[int] - scope-35
+    |   |
+    |   |---Project[bytearray][0] - scope-34
+    |   |
+    |   Cast[int] - scope-37
+    |   |
+    |   |---Project[bytearray][1] - scope-36
+    |
+    |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-33
 Tez vertex scope-28
 # Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-48      ->       scope-50
+Partition Rearrange[tuple]{int}(false) - scope-53      ->       scope-55
 |   |
 |   Project[int][0] - scope-17
 |
@@ -80,7 +88,7 @@ Partition Rearrange[tuple]{int}(false) -
     |   |---Project[bytearray][1] - scope-12
     |
     |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
-Tez vertex scope-50
+Tez vertex scope-55
 # Plan on vertex
 d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-26
 |
@@ -92,10 +100,10 @@ d: Store(file:///tmp/output:org.apache.p
     |   |
     |   Project[int][3] - scope-23
     |
-    |---New For Each(true,true)[tuple] - scope-54
+    |---New For Each(true,true)[tuple] - scope-59
         |   |
-        |   Project[bag][1] - scope-52
+        |   Project[bag][1] - scope-57
         |   |
-        |   Project[bag][2] - scope-53
+        |   Project[bag][2] - scope-58
         |
-        |---Package(Packager)[tuple]{int} - scope-51
+        |---Package(Packager)[tuple]{int} - scope-56

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld?rev=1687646&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld 
(added)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld 
Thu Jun 25 20:59:31 2015
@@ -0,0 +1,109 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-31    ->      Tez vertex scope-40,Tez vertex scope-50,
+Tez vertex scope-40    ->      Tez vertex scope-32,Tez vertex scope-50,
+Tez vertex scope-50    ->      Tez vertex scope-54,
+Tez vertex scope-32    ->      Tez vertex scope-54,
+Tez vertex scope-54
+
+Tez vertex scope-31
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-35        ->       scope-40
+|   |
+|   Constant(DummyVal) - scope-34
+|
+|---New For Each(true,true)[tuple] - scope-39
+    |   |
+    |   Project[int][0] - scope-20
+    |   |
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-38
+    |   |
+    |   |---Project[tuple][*] - scope-37
+    |
+    |---PoissonSample - scope-36
+        |
+        |---Local Rearrange[tuple]{int}(false) - scope-33      ->       
scope-50
+            |   |
+            |   Project[int][0] - scope-20
+            |
+            |---a: Filter[bag] - scope-8
+                |   |
+                |   Equal To[boolean] - scope-11
+                |   |
+                |   |---Project[int][0] - scope-9
+                |   |
+                |   |---Constant(1) - scope-10
+                |
+                |---a: New For Each(false,false)[bag] - scope-7
+                    |   |
+                    |   Cast[int] - scope-2
+                    |   |
+                    |   |---Project[bytearray][0] - scope-1
+                    |   |
+                    |   Cast[int] - scope-5
+                    |   |
+                    |   |---Project[bytearray][1] - scope-4
+                    |
+                    |---a: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
+Tez vertex scope-40
+# Plan on vertex
+POValueOutputTez - scope-49    ->       [scope-32, scope-50]
+|
+|---New For Each(false)[tuple] - scope-48
+    |   |
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-47
+    |   |
+    |   |---Project[tuple][*] - scope-46
+    |
+    |---New For Each(false,false)[tuple] - scope-45
+        |   |
+        |   Constant(-1) - scope-44
+        |   |
+        |   Project[bag][1] - scope-42
+        |
+        |---Package(Packager)[tuple]{bytearray} - scope-41
+Tez vertex scope-50
+# Plan on vertex
+POIdentityInOutTez - scope-51  <-       scope-31       ->       scope-54
+|   |
+|   Project[int][0] - scope-20
+Tez vertex scope-32
+# Plan on vertex
+Partition Rearrange[tuple]{int}(false) - scope-52      ->       scope-54
+|   |
+|   Project[int][0] - scope-21
+|
+|---b: New For Each(false,false)[bag] - scope-19
+    |   |
+    |   Cast[int] - scope-14
+    |   |
+    |   |---Project[bytearray][0] - scope-13
+    |   |
+    |   Cast[int] - scope-17
+    |   |
+    |   |---Project[bytearray][1] - scope-16
+    |
+    |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - 
scope-12
+Tez vertex scope-54
+# Plan on vertex
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-30
+|
+|---d: New For Each(false,false,false)[bag] - scope-29
+    |   |
+    |   Project[int][0] - scope-23
+    |   |
+    |   Project[int][1] - scope-25
+    |   |
+    |   Project[int][3] - scope-27
+    |
+    |---New For Each(true,true)[tuple] - scope-58
+        |   |
+        |   Project[bag][1] - scope-56
+        |   |
+        |   Project[bag][2] - scope-57
+        |
+        |---Package(Packager)[tuple]{int} - scope-55

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16-OPTOFF.gld
 Thu Jun 25 20:59:31 2015
@@ -4,70 +4,78 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-30    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-37,Tez vertex scope-58,
-Tez vertex scope-58    ->      Tez vertex scope-62,
+Tez vertex scope-30    ->      Tez vertex scope-54,
+Tez vertex scope-54    ->      Tez vertex scope-37,Tez vertex scope-64,
+Tez vertex scope-64    ->      Tez vertex scope-67,
 Tez vertex scope-31    ->      Tez vertex scope-35,Tez vertex scope-37,
 Tez vertex scope-35    ->      Tez vertex scope-37,
-Tez vertex scope-37    ->      Tez vertex scope-62,
-Tez vertex scope-62
+Tez vertex scope-37    ->      Tez vertex scope-67,
+Tez vertex scope-67
 
 Tez vertex scope-30
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-43        ->       scope-48
+Local Rearrange[tuple]{tuple}(false) - scope-43        ->       scope-54
 |   |
 |   Constant(DummyVal) - scope-42
 |
-|---New For Each(true,true)[tuple] - scope-47
+|---New For Each(true,true)[tuple] - scope-53
     |   |
     |   Project[int][0] - scope-26
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-46
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-52
     |   |
-    |   |---Project[tuple][*] - scope-45
+    |   |---Project[tuple][*] - scope-51
     |
     |---PoissonSample - scope-44
         |
-        |---Local Rearrange[tuple]{int}(false) - scope-41      ->       
scope-58
+        |---d: New For Each(false,false)[bag] - scope-7
             |   |
-            |   Project[int][0] - scope-26
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[chararray] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
             |
-            |---d: New For Each(false,false)[bag] - scope-7
-                |   |
-                |   Cast[int] - scope-2
-                |   |
-                |   |---Project[bytearray][0] - scope-1
-                |   |
-                |   Cast[chararray] - scope-5
-                |   |
-                |   |---Project[bytearray][1] - scope-4
-                |
-                |---d: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-48
-# Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-37, scope-58]
+            |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) 
- scope-0
+Tez vertex scope-54
+# Plan on vertex
+POValueOutputTez - scope-63    ->       [scope-37, scope-64]
 |
-|---New For Each(false)[tuple] - scope-56
+|---New For Each(false)[tuple] - scope-62
     |   |
-    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-55
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-61
     |   |
-    |   |---Project[tuple][*] - scope-54
+    |   |---Project[tuple][*] - scope-60
     |
-    |---New For Each(false,false)[tuple] - scope-53
+    |---New For Each(false,false)[tuple] - scope-59
         |   |
-        |   Constant(-1) - scope-52
+        |   Constant(-1) - scope-58
         |   |
-        |   Project[bag][1] - scope-50
+        |   Project[bag][1] - scope-56
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-49
-Tez vertex scope-58
+        |---Package(Packager)[tuple]{bytearray} - scope-55
+Tez vertex scope-64
 # Plan on vertex
-POIdentityInOutTez - scope-59  <-       scope-30       ->       scope-62
+Local Rearrange[tuple]{int}(false) - scope-41  ->       scope-67
 |   |
 |   Project[int][0] - scope-26
+|
+|---d: New For Each(false,false)[bag] - scope-50
+    |   |
+    |   Cast[int] - scope-47
+    |   |
+    |   |---Project[bytearray][0] - scope-46
+    |   |
+    |   Cast[chararray] - scope-49
+    |   |
+    |   |---Project[bytearray][1] - scope-48
+    |
+    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-45
 Tez vertex scope-31
 # Plan on vertex
-a: Split - scope-68
+a: Split - scope-73
 |   |
 |   POValueOutputTez - scope-39        ->       [scope-37]
 |   |
@@ -99,19 +107,19 @@ POValueOutputTez - scope-40        ->       [scope-3
     |---POValueInputTez - scope-36     <-       scope-31
 Tez vertex scope-37
 # Plan on vertex
-Partition Rearrange[tuple]{int}(false) - scope-60      ->       scope-62
+Partition Rearrange[tuple]{int}(false) - scope-65      ->       scope-67
 |   |
 |   Project[int][0] - scope-27
 |
 |---POShuffledValueInputTez - scope-38 <-       [scope-31, scope-35]
-Tez vertex scope-62
+Tez vertex scope-67
 # Plan on vertex
 e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-29
 |
-|---New For Each(true,true)[tuple] - scope-66
+|---New For Each(true,true)[tuple] - scope-71
     |   |
-    |   Project[bag][1] - scope-64
+    |   Project[bag][1] - scope-69
     |   |
-    |   Project[bag][2] - scope-65
+    |   Project[bag][2] - scope-70
     |
-    |---Package(Packager)[tuple]{int} - scope-63
+    |---Package(Packager)[tuple]{int} - scope-68

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld 
(original)
+++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-16.gld 
Thu Jun 25 20:59:31 2015
@@ -4,76 +4,84 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-30    ->      Tez vertex scope-48,Tez vertex scope-58,
-Tez vertex scope-48    ->      Tez vertex scope-31,Tez vertex scope-58,
-Tez vertex scope-58    ->      Tez vertex scope-62,
-Tez vertex scope-31    ->      Tez vertex scope-62,
-Tez vertex scope-62
+Tez vertex scope-30    ->      Tez vertex scope-54,
+Tez vertex scope-54    ->      Tez vertex scope-31,Tez vertex scope-64,
+Tez vertex scope-64    ->      Tez vertex scope-67,
+Tez vertex scope-31    ->      Tez vertex scope-67,
+Tez vertex scope-67
 
 Tez vertex scope-30
 # Plan on vertex
-Local Rearrange[tuple]{tuple}(false) - scope-43        ->       scope-48
+Local Rearrange[tuple]{tuple}(false) - scope-43        ->       scope-54
 |   |
 |   Constant(DummyVal) - scope-42
 |
-|---New For Each(true,true)[tuple] - scope-47
+|---New For Each(true,true)[tuple] - scope-53
     |   |
     |   Project[int][0] - scope-26
     |   |
-    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-46
+    |   POUserFunc(org.apache.pig.impl.builtin.GetMemNumRows)[tuple] - scope-52
     |   |
-    |   |---Project[tuple][*] - scope-45
+    |   |---Project[tuple][*] - scope-51
     |
     |---PoissonSample - scope-44
         |
-        |---Local Rearrange[tuple]{int}(false) - scope-41      ->       
scope-58
+        |---d: New For Each(false,false)[bag] - scope-7
             |   |
-            |   Project[int][0] - scope-26
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[chararray] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
             |
-            |---d: New For Each(false,false)[bag] - scope-7
-                |   |
-                |   Cast[int] - scope-2
-                |   |
-                |   |---Project[bytearray][0] - scope-1
-                |   |
-                |   Cast[chararray] - scope-5
-                |   |
-                |   |---Project[bytearray][1] - scope-4
-                |
-                |---d: 
Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex scope-48
+            |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) 
- scope-0
+Tez vertex scope-54
 # Plan on vertex
-POValueOutputTez - scope-57    ->       [scope-31, scope-58]
+POValueOutputTez - scope-63    ->       [scope-31, scope-64]
 |
-|---New For Each(false)[tuple] - scope-56
+|---New For Each(false)[tuple] - scope-62
     |   |
-    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-55
+    |   
POUserFunc(org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez)[tuple]
 - scope-61
     |   |
-    |   |---Project[tuple][*] - scope-54
+    |   |---Project[tuple][*] - scope-60
     |
-    |---New For Each(false,false)[tuple] - scope-53
+    |---New For Each(false,false)[tuple] - scope-59
         |   |
-        |   Constant(-1) - scope-52
+        |   Constant(-1) - scope-58
         |   |
-        |   Project[bag][1] - scope-50
+        |   Project[bag][1] - scope-56
         |
-        |---Package(Packager)[tuple]{bytearray} - scope-49
-Tez vertex scope-58
+        |---Package(Packager)[tuple]{bytearray} - scope-55
+Tez vertex scope-64
 # Plan on vertex
-POIdentityInOutTez - scope-59  <-       scope-30       ->       scope-62
+Local Rearrange[tuple]{int}(false) - scope-41  ->       scope-67
 |   |
 |   Project[int][0] - scope-26
+|
+|---d: New For Each(false,false)[bag] - scope-50
+    |   |
+    |   Cast[int] - scope-47
+    |   |
+    |   |---Project[bytearray][0] - scope-46
+    |   |
+    |   Cast[chararray] - scope-49
+    |   |
+    |   |---Project[bytearray][1] - scope-48
+    |
+    |---d: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-45
 Tez vertex scope-31
 # Plan on vertex
-a: Split - scope-68
+a: Split - scope-73
 |   |
-|   Partition Rearrange[tuple]{int}(false) - scope-69  ->       scope-62
+|   Partition Rearrange[tuple]{int}(false) - scope-74  ->       scope-67
 |   |   |
-|   |   Project[int][0] - scope-70
+|   |   Project[int][0] - scope-75
 |   |
-|   Partition Rearrange[tuple]{int}(false) - scope-71  ->       scope-62
+|   Partition Rearrange[tuple]{int}(false) - scope-76  ->       scope-67
 |   |   |
-|   |   Project[int][0] - scope-72
+|   |   Project[int][0] - scope-77
 |   |
 |   |---b: Filter[bag] - scope-21
 |       |   |
@@ -94,14 +102,14 @@ a: Split - scope-68
     |   |---Project[bytearray][1] - scope-12
     |
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-8
-Tez vertex scope-62
+Tez vertex scope-67
 # Plan on vertex
 e: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-29
 |
-|---New For Each(true,true)[tuple] - scope-66
+|---New For Each(true,true)[tuple] - scope-71
     |   |
-    |   Project[bag][1] - scope-64
+    |   Project[bag][1] - scope-69
     |   |
-    |   Project[bag][2] - scope-65
+    |   Project[bag][2] - scope-70
     |
-    |---Package(Packager)[tuple]{int} - scope-63
+    |---Package(Packager)[tuple]{int} - scope-68

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Jun 25 
20:59:31 2015
@@ -309,7 +309,7 @@ public class TestTezAutoParallelism {
                 + "STORE E into '" + outputDir + "/finalout';";
         String log = testIncreaseIntermediateParallelism(script, outputDir, 
true);
         // Parallelism of C should be increased
-        assertTrue(log.contains("Increased requested parallelism of scope-54 
to 4"));
+        assertTrue(log.contains("Increased requested parallelism of scope-59 
to 4"));
         assertEquals(1, StringUtils.countMatches(log, "Increased requested 
parallelism"));
     }
 

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1687646&r1=1687645&r2=1687646&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Thu Jun 25 20:59:31 
2015
@@ -194,6 +194,19 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testSkewedJoinFilter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "a = filter a by x == 1;" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = join a by x, b by x using 'skewed';" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-SkewJoin-2.gld");
+    }
+
+    @Test
     public void testLimit() throws Exception {
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:int);" +
@@ -312,6 +325,17 @@ public class TestTezCompiler {
         run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-1.gld");
     }
 
+    @Test
+    public void testOrderByWithFilter() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input' using PigStorage(',') as (x:int, 
y:int);" +
+                "b = filter a by x == 1;" +
+                "c = order b by x;" +
+                "STORE c INTO 'file:///tmp/output';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Order-2.gld");
+    }
+
     // PIG-3759, PIG-3781
     // Combiner should not be added in case of co-group
     @Test
@@ -545,6 +569,8 @@ public class TestTezCompiler {
 
     @Test
     public void testUnionSkewedJoin() throws Exception {
+        // TODO: PIG-4574 optimization needs to be done for this as well.
+        // Requires changes in UnionOptimizer
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +
@@ -561,6 +587,8 @@ public class TestTezCompiler {
 
     @Test
     public void testUnionOrderby() throws Exception {
+        // TODO: PIG-4574 optimization needs to be done for this as well.
+        // Requires changes in UnionOptimizer
         String query =
                 "a = load 'file:///tmp/input' as (x:int, y:chararray);" +
                 "b = load 'file:///tmp/input' as (y:chararray, x:int);" +


Reply via email to