Author: daijy
Date: Sat Apr 18 03:30:19 2015
New Revision: 1674429

URL: http://svn.apache.org/r1674429
Log:
PIG-4434: Improve auto-parallelism for tez

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
    pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java
    
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
    pig/trunk/test/org/apache/pig/test/Util.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
    pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
    pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat Apr 18 03:30:19 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4434: Improve auto-parallelism for tez (daijy)
+
 PIG-4495: Better multi-query planning in case of multiple edges (rohini)
 
 PIG-3294: Allow Pig use Hive UDFs (daijy)

Modified: pig/trunk/conf/pig.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Sat Apr 18 03:30:19 2015
@@ -566,10 +566,11 @@ hcat.bin=/usr/local/hcat/bin/hcat
 #
 # opt.fetch=true
 
-# Enable auto parallelism in tez. This should be used by default unless
-# you encounter some bug in automatic parallelism. If set to false, use 1 as
-# default parallelism
+# Enable auto/grace parallelism in tez. These should be used by default unless
+# you encounter some bug in automatic parallelism. If pig.tez.auto.parallelism
+# to false, use 1 as default parallelism
 pig.tez.auto.parallelism=true
+pig.tez.grace.parallelism=true
 
 ###########################################################################
 #

Modified: pig/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Sat Apr 18 03:30:19 2015
@@ -92,6 +92,6 @@ mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
 snappy-java.version=1.1.0.1
-tez.version=0.6.0
+tez.version=0.7.0-SNAPSHOT
 parquet-pig-bundle.version=1.2.3
 snappy.version=0.2

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Sat Apr 18 03:30:19 2015
@@ -117,6 +117,10 @@ public class PigConfiguration {
      * This key is used to configure auto parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_AUTO_PARALLELISM = 
"pig.tez.auto.parallelism";
+    /**
+     * This key is used to configure grace parallelism in tez. Default is true.
+     */
+    public static final String PIG_TEZ_GRACE_PARALLELISM = 
"pig.tez.grace.parallelism";
 
     /**
      * This key is used to configure compression for the pig input splits which

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Sat Apr 18 03:30:19 2015
@@ -113,6 +113,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
@@ -392,6 +393,7 @@ public class TezDagBuilder extends TezOp
         }
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+        conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
         conf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
@@ -437,6 +439,11 @@ public class TezDagBuilder extends TezOp
                     edge.dataSourceType, edge.schedulingType, out, in);
             }
 
+        if (to.isUseGraceParallelism()) {
+            // Put datamovement to null to prevent vertex "to" from starting. 
It will be started by PigGraceShuffleVertexManager
+            return EdgeProperty.create((EdgeManagerPluginDescriptor)null, 
edge.dataSourceType,
+                    edge.schedulingType, out, in);
+        }
         return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType,
                 edge.schedulingType, out, in);
     }
@@ -458,6 +465,7 @@ public class TezDagBuilder extends TezOp
         conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
                 PigCombiner.Combine.class.getName());
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+        conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
         conf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
@@ -515,6 +523,7 @@ public class TezDagBuilder extends TezOp
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
         payloadConf.set("exectype", "TEZ");
         payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+        payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
         payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
                 PigInputFormat.class, InputFormat.class);
 
@@ -657,9 +666,77 @@ public class TezDagBuilder extends TezOp
         UserPayload userPayload = 
TezUtils.createUserPayloadFromConf(payloadConf);
         
procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(),
 payloadConf));
 
-        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), 
procDesc, tezOp.getVertexParallelism(),
-                tezOp.isUseMRMapSettings() ? 
MRHelpers.getResourceForMRMapper(globalConf) : 
MRHelpers.getResourceForMRReducer(globalConf));
+        String vmPluginName = null;
+        Configuration vmPluginConf = null;
+
+        // Set the right VertexManagerPlugin
+        if (tezOp.getEstimatedParallelism() != -1) {
+            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, 
which is able
+                // to decrease/increase parallelism of sorting vertex 
dynamically
+                // based on the numQuantiles calculated by sample aggregation 
vertex
+                vmPluginName = PartitionerDefinedVertexManager.class.getName();
+                log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
+            } else {
+                boolean containScatterGather = false;
+                boolean containCustomPartitioner = false;
+                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+                    if (edge.dataMovementType == 
DataMovementType.SCATTER_GATHER) {
+                        containScatterGather = true;
+                    }
+                    if (edge.partitionerClass!=null) {
+                        containCustomPartitioner = true;
+                    }
+                }
+                if (containScatterGather && !containCustomPartitioner) {
+                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
+                    // reduce the parallelism of the vertex
+                    if 
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && 
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+                        vmPluginName = 
PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                    } else {
+                        vmPluginName = ShuffleVertexManager.class.getName();
+                    }
+                    vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
+                    vmPluginConf.set("pig.tez.plan", 
ObjectSerializer.serialize(getPlan()));
+                    vmPluginConf.set("pig.pigContext", 
ObjectSerializer.serialize(pc));
+                    if (stores.size() <= 0) {
+                        // Intermediate reduce. Set the bytes per reducer to 
be block size.
+                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+                                        intermediateTaskInputSize);
+                    } else if 
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
+                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
+                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+                                
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                        
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+                    }
+                    log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
+                }
+            }
+        }
+        if (tezOp.isLimit() && (vmPluginName == null || 
vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
+                vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
+            if 
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
 {
+                // Setting SRC_FRACTION to 0.00001 so that even if there are 
100K source tasks,
+                // limit job starts when 1 source task finishes.
+                // If limit is part of a group by or join because their 
parallelism is 1,
+                // we should leave the configuration with the defaults.
+                vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 "0.00001");
+                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 "0.00001");
+                log.info("Set " + 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 
for limit vertex " + tezOp.getOperatorKey().toString());
+            }
+        }
 
+        int parallel = tezOp.getVertexParallelism();
+        if (tezOp.isUseGraceParallelism()) {
+            parallel = -1;
+        }
+        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), 
procDesc, parallel,
+                tezOp.isUseMRMapSettings() ? 
MRHelpers.getResourceForMRMapper(globalConf) : 
MRHelpers.getResourceForMRReducer(globalConf));
         Map<String, String> taskEnv = new HashMap<String, String>();
         MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, 
tezOp.isUseMRMapSettings());
         vertex.setTaskEnvironment(taskEnv);
@@ -772,6 +849,7 @@ public class TezDagBuilder extends TezOp
                         new DataSinkDescriptor(storeOutDescriptor,
                         
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
                         dag.getCredentials()));
+                uniqueStoreOutputs.add(outputKey);
             }
         }
 
@@ -783,62 +861,6 @@ public class TezDagBuilder extends TezOp
             new PigOutputFormat().checkOutputSpecs(job);
         }
 
-        String vmPluginName = null;
-        Configuration vmPluginConf = null;
-
-        // Set the right VertexManagerPlugin
-        if (tezOp.getEstimatedParallelism() != -1) {
-            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
-                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, 
which is able
-                // to decrease/increase parallelism of sorting vertex 
dynamically
-                // based on the numQuantiles calculated by sample aggregation 
vertex
-                vmPluginName = PartitionerDefinedVertexManager.class.getName();
-                log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
-            } else {
-                boolean containScatterGather = false;
-                boolean containCustomPartitioner = false;
-                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-                    if (edge.dataMovementType == 
DataMovementType.SCATTER_GATHER) {
-                        containScatterGather = true;
-                    }
-                    if (edge.partitionerClass!=null) {
-                        containCustomPartitioner = true;
-                    }
-                }
-                if (containScatterGather && !containCustomPartitioner) {
-                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
-                    // reduce the parallelism of the vertex
-                    vmPluginName = ShuffleVertexManager.class.getName();
-                    vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
-                    if (stores.size() <= 0) {
-                        // Intermediate reduce. Set the bytes per reducer to 
be block size.
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                        intermediateTaskInputSize);
-                    } else if 
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                        
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
-                    }
-                    log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
-                }
-            }
-        }
-        if (tezOp.isLimit() && (vmPluginName == null || 
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
-            if 
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
 {
-                // Setting SRC_FRACTION to 0.00001 so that even if there are 
100K source tasks,
-                // limit job starts when 1 source task finishes.
-                // If limit is part of a group by or join because their 
parallelism is 1,
-                // we should leave the configuration with the defaults.
-                vmPluginName = ShuffleVertexManager.class.getName();
-                vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 "0.00001");
-                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 "0.00001");
-                log.info("Set " + 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 
for limit vertex " + tezOp.getOperatorKey().toString());
-            }
-        }
         // else if(tezOp.isLimitAfterSort())
         // TODO: PIG-4049 If standalone Limit we need a new VertexManager or 
new input
         // instead of ShuffledMergedInput. For limit part of the sort (order 
by parallel 1) itself

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
 Sat Apr 18 03:30:19 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
+import java.io.Serializable;
+
 import org.apache.hadoop.mapreduce.Partitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -28,7 +30,7 @@ import org.apache.tez.runtime.library.ou
 /**
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
-public class TezEdgeDescriptor {
+public class TezEdgeDescriptor implements Serializable {
     // Combiner runs on both input and output of Tez edge.
     public PhysicalPlan combinePlan;
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 Sat Apr 18 03:30:19 2015
@@ -51,11 +51,11 @@ public class TezOperPlan extends Operato
 
     private static final long serialVersionUID = 1L;
 
-    private Map<String, Path> extraResources = new HashMap<String, Path>();
+    private transient Map<String, Path> extraResources = new HashMap<String, 
Path>();
 
     private int estimatedTotalParallelism = -1;
 
-    private Credentials creds;
+    private transient Credentials creds;
 
     public TezOperPlan() {
         creds = new Credentials();
@@ -242,5 +242,47 @@ public class TezOperPlan extends Operato
             super.remove(node);
         }
     }
+
+    // This method is used in PigGraceShuffleVertexManager to get a list of 
grandparents. 
+    // Also need to exclude grandparents which also a parent (a is both parent 
and grandparent in the diagram below)
+    //    a   ->    c
+    //      \  b  /
+    // 
+    public static List<TezOperator> 
getGrandParentsForGraceParallelism(TezOperPlan tezPlan, TezOperator op) {
+        List<TezOperator> grandParents = new ArrayList<TezOperator>();
+        List<TezOperator> preds = tezPlan.getPredecessors(op);
+        if (preds != null) {
+            for (TezOperator pred : preds) {
+                if (pred.isVertexGroup()) {
+                    grandParents.clear();
+                    return grandParents;
+                }
+                List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
+                if (predPreds!=null) {
+                    for (TezOperator predPred : predPreds) {
+                        if (predPred.isVertexGroup()) {
+                            grandParents.clear();
+                            return grandParents;
+                        }
+                        if (!grandParents.contains(predPred)) {
+                            grandParents.add(predPred);
+                        }
+                    }
+                } else {
+                    grandParents.clear();
+                    break;
+                }
+            }
+
+            if (!grandParents.isEmpty()) {
+                for (TezOperator pred : preds) {
+                    if (grandParents.contains(pred)) {
+                        grandParents.remove(pred);
+                    }
+                }
+            }
+        }
+        return grandParents;
+    }
 }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Sat Apr 18 03:30:19 2015
@@ -18,9 +18,11 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
 import java.io.ByteArrayOutputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,6 +32,8 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.Operator;
@@ -50,7 +54,11 @@ public class TezOperator extends Operato
     private static final long serialVersionUID = 1L;
 
     // Processor pipeline
-    public PhysicalPlan plan;
+    // Note TezOperator needs to be serialized and de-serialized to
+    // be used in PigGraceShuffleVertexManager, some fields are either
+    // big, or not serializable, and not in use in 
PigGraceShuffleVertexManager,
+    // mark them as transient: plan, vertexGroupInfo, inputSplitInfo
+    public transient PhysicalPlan plan;
 
     // Descriptors for out-bound edges.
     public Map<OperatorKey, TezEdgeDescriptor> outEdges;
@@ -133,6 +141,12 @@ public class TezOperator extends Operato
 
     private boolean useMRMapSettings = false;
 
+    private boolean useGraceParallelism = false;
+
+    private double parallelismFactor = -1;
+
+    private LinkedList<POStore> stores = null;
+
     // Types of blocking operators. For now, we only support the following 
ones.
     public static enum OPER_FEATURE {
         // Indicate if this job is a merge indexer
@@ -170,16 +184,16 @@ public class TezOperator extends Operato
 
     private List<OperatorKey> vertexGroupMembers;
     // For union
-    private VertexGroupInfo vertexGroupInfo;
+    private transient VertexGroupInfo vertexGroupInfo;
     // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
     private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
 
-    public static class LoaderInfo {
+    public static class LoaderInfo implements Serializable {
         private List<POLoad> loads = null;
         private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         private ArrayList<String> inpSignatureLists = new ArrayList<String>();
         private ArrayList<Long> inpLimits = new ArrayList<Long>();
-        private InputSplitInfo inputSplitInfo = null;
+        private transient InputSplitInfo inputSplitInfo = null;
         public List<POLoad> getLoads() {
             return loads;
         }
@@ -497,7 +511,7 @@ public class TezOperator extends Operato
     public String toString() {
         StringBuilder sb = new StringBuilder(name() + ":\n");
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        if (!plan.isEmpty()) {
+        if (plan!=null && !plan.isEmpty()) {
             plan.explain(baos);
             String mp = new String(baos.toByteArray());
             sb.append(shiftStringByTabs(mp, "|   "));
@@ -602,6 +616,29 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
+    public void setUseGraceParallelism(boolean useGraceParallelism) {
+        this.useGraceParallelism = useGraceParallelism;
+    }
+    public boolean isUseGraceParallelism() {
+        return useGraceParallelism;
+    }
+
+    public double getParallelismFactor() throws VisitorException {
+        if (parallelismFactor == -1) {
+            TezParallelismFactorVisitor parallelismFactorVisitor = new 
TezParallelismFactorVisitor(plan, getOperatorKey().toString());
+            parallelismFactorVisitor.visit();
+            parallelismFactor = parallelismFactorVisitor.getFactor();
+        }
+        return parallelismFactor;
+    }
+
+    public LinkedList<POStore> getStores() throws VisitorException {
+        if (stores == null) {
+            stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+        }
+        return stores;
+    }
+
     public static class VertexGroupInfo {
 
         private List<OperatorKey> inputKeys;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 Sat Apr 18 03:30:19 2015
@@ -169,6 +169,10 @@ public class LoaderProcessor extends Tez
             // Not using MRInputAMSplitGenerator because delegation tokens are
             // fetched in FileInputFormat
             
tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf,
 false, 0));
+            // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
+            // splits can be moved to if(loads) block below
+            int parallelism = 
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
+            tezOp.setRequestedParallelism(parallelism);
         }
         return lds;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Sat Apr 18 03:30:19 2015
@@ -27,7 +27,6 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -82,15 +81,15 @@ public class ParallelismSetter extends T
             // splits
             int parallelism = -1;
             boolean intermediateReducer = false;
-            LinkedList<POStore> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+            LinkedList<POStore> stores = tezOp.getStores();
             if (stores.size() <= 0) {
                 intermediateReducer = true;
             }
             if (tezOp.getLoaderInfo().getLoads() != null && 
tezOp.getLoaderInfo().getLoads().size() > 0) {
-                // TODO: Can be set to -1 if TEZ-601 gets fixed and getting 
input
-                // splits can be moved to if(loads) block below
-                parallelism = 
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
-                tezOp.setRequestedParallelism(parallelism);
+                // requestedParallelism of Loader vertex is handled in 
LoaderProcessor
+                // propogate to vertexParallelism
+                tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
+                return;
             } else {
                 int prevParallelism = -1;
                 boolean isOneToOneParallelism = false;
@@ -107,7 +106,12 @@ public class ParallelismSetter extends T
                                     + tezOp.getOperatorKey().toString() + " 
are not equal");
                         }
                         
tezOp.setRequestedParallelism(pred.getRequestedParallelism());
-                        
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+                        // If tezOp.estimatedParallelism already set, don't 
override
+                        // The only case is in PigGraceShuffleVertexManager, 
which
+                        // set the estimated parallelism according to the 
output data size of the node
+                        if (tezOp.getEstimatedParallelism()==-1) {
+                            
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
+                        }
                         isOneToOneParallelism = true;
                         incrementTotalParallelism(tezOp, parallelism);
                         parallelism = -1;
@@ -159,7 +163,7 @@ public class ParallelismSetter extends T
                                 for (TezOperator pred : 
mPlan.getPredecessors(tezOp)) {
                                     if (pred.isSampleBasedPartitioner()) {
                                         for (TezOperator partitionerPred : 
mPlan.getPredecessors(pred)) {
-                                            if 
(partitionerPred.isSampleAggregation()) {
+                                            if 
(partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) {
                                                 LOG.debug("Updating 
parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
                                                 ParallelConstantVisitor 
visitor =
                                                         new 
ParallelConstantVisitor(partitionerPred.plan, parallelism);

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1674429&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
 Sat Apr 18 03:30:19 2015
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
+
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class TezEstimatedParallelismClearer extends TezOpPlanVisitor{
+    public TezEstimatedParallelismClearer(TezOperPlan plan) {
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        tezOp.setEstimatedParallelism(-1);
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 Sat Apr 18 03:30:19 2015
@@ -120,10 +120,8 @@ public class TezOperDependencyParallelis
 
                 //For cases like Union we can just limit to sum of pred 
vertices parallelism
                 boolean applyFactor = !tezOper.isUnion();
-                if (pred.plan!=null && applyFactor) { // pred.plan can be null 
if it is a VertexGroup
-                    TezParallelismFactorVisitor parallelismFactorVisitor = new 
TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
-                    parallelismFactorVisitor.visit();
-                    predParallelism = predParallelism * 
parallelismFactorVisitor.getFactor();
+                if (!pred.isVertexGroup() && applyFactor) {
+                    predParallelism = predParallelism * 
pred.getParallelismFactor();
                 }
                 estimatedParallelism += predParallelism;
             }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 Sat Apr 18 03:30:19 2015
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -68,7 +70,7 @@ public class PartitionerDefinedVertexMan
     }
 
     @Override
-    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
         if (isParallelismSet) {
@@ -86,14 +88,14 @@ public class PartitionerDefinedVertexMan
             if (dynamicParallelism!=currentParallelism) {
                 LOG.info("Pig Partitioner Defined Vertex Manager: reset 
parallelism to " + dynamicParallelism
                         + " from " + currentParallelism);
-                Map<String, EdgeManagerPluginDescriptor> edgeManagers =
-                    new HashMap<String, EdgeManagerPluginDescriptor>();
-                for(String vertex : 
getContext().getInputVertexEdgeProperties().keySet()) {
-                    EdgeManagerPluginDescriptor edgeManagerDescriptor =
-                            
EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName());
-                    edgeManagers.put(vertex, edgeManagerDescriptor);
+                Map<String, EdgeProperty> edgeManagers = new HashMap<String, 
EdgeProperty>();
+                for(Map.Entry<String,EdgeProperty> entry : 
getContext().getInputVertexEdgeProperties().entrySet()) {
+                    EdgeProperty edge = entry.getValue();
+                    edge = 
EdgeProperty.create(DataMovementType.SCATTER_GATHER, edge.getDataSourceType(), 
edge.getSchedulingType(),
+                            edge.getEdgeSource(), edge.getEdgeDestination());
+                    edgeManagers.put(entry.getKey(), edge);
                 }
-                getContext().setVertexParallelism(dynamicParallelism, null, 
edgeManagers, null);
+                getContext().reconfigureVertex(dynamicParallelism, null, 
edgeManagers);
             }
         }
     }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1674429&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
 Sat Apr 18 03:30:19 2015
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+public class PigGraceShuffleVertexManager extends ShuffleVertexManager {
+
+    private TezOperPlan tezPlan;
+    private List<String> grandParents = new ArrayList<String>();
+    private List<String> finishedGrandParents = new ArrayList<String>();
+    private long bytesPerTask;
+    private Configuration conf;
+    private PigContext pc;
+    private int thisParallelism = -1;
+    private boolean parallelismSet = false;
+
+    private static final Log LOG = 
LogFactory.getLog(PigGraceShuffleVertexManager.class);
+
+    public PigGraceShuffleVertexManager(VertexManagerPluginContext context) {
+        super(context);
+    }
+
+    @Override
+    public synchronized void initialize() {
+        try {
+            conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+            bytesPerTask = 
conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+            pc = 
(PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+            tezPlan = 
(TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
+            TezEstimatedParallelismClearer clearer = new 
TezEstimatedParallelismClearer(tezPlan);
+            try {
+                clearer.visit();
+            } catch (VisitorException e) {
+                throw new TezUncheckedException(e);
+            }
+            TezOperator op = 
tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
+    
+            // Collect grandparents of the vertex
+            Function<TezOperator, String> tezOpToString = new 
Function<TezOperator, String>() { 
+                public String apply(TezOperator op) { return 
op.getOperatorKey().toString(); }
+            };
+            grandParents = 
Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), 
tezOpToString);
+        } catch (IOException e) {
+            throw new TezUncheckedException(e);
+        }
+
+        // Register notification for grandparents
+        for (String grandParent : grandParents) {
+            getContext().registerForVertexStateUpdates(grandParent, 
EnumSet.of(VertexState.SUCCEEDED));
+        }
+        super.initialize();
+    }
+
+    @Override
+    public synchronized void onVertexStateUpdated(VertexStateUpdate 
stateUpdate) {
+        super.onVertexStateUpdated(stateUpdate);
+        if (parallelismSet) {
+            return;
+        }
+        String vertexName = stateUpdate.getVertexName();
+        if (grandParents.contains(vertexName)) {
+            if (!finishedGrandParents.contains(vertexName)) {
+                finishedGrandParents.add(vertexName);
+            }
+        }
+
+        TezOperator op = 
tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
+
+        List<TezOperator> preds = tezPlan.getPredecessors(op);
+        boolean anyPredAboutToStart = false;
+        for (TezOperator pred : preds) {
+            List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
+            boolean predAboutToStart = true;
+            for (TezOperator predPred : predPreds) {
+                if 
(!finishedGrandParents.contains(predPred.getOperatorKey().toString())) {
+                    predAboutToStart = false;
+                    break;
+                }
+            }
+            if (predAboutToStart) {
+                LOG.info("All predecessors for " + 
pred.getOperatorKey().toString() + " are finished, time to " +
+                        "set parallelism for " + getContext().getVertexName());
+                anyPredAboutToStart = true;
+                break;
+            }
+        }
+
+        // Now one of the predecessor is about to start, we need to make a 
decision now
+        if (anyPredAboutToStart) {
+            // All grandparents finished, start parents with right parallelism
+            
+            for (TezOperator pred : preds) {
+                if (pred.getRequestedParallelism()==-1) {
+                    List<TezOperator> predPreds = 
tezPlan.getPredecessors(pred);
+                    if (predPreds!=null) {
+                        for (TezOperator predPred : predPreds) {
+                            String predPredVertexName = 
predPred.getOperatorKey().toString();
+                            if 
(finishedGrandParents.contains(predPredVertexName)) {
+                                // We shall get precise output size since all 
those nodes are finished
+                                long outputSize = 
getContext().getVertexStatistics(predPredVertexName).getOutputStatistics(pred.getOperatorKey().toString()).getDataSize();
+                                int desiredNumReducers = 
(int)Math.ceil((double)outputSize/bytesPerTask);
+                                
predPred.setEstimatedParallelism(desiredNumReducers);
+                                LOG.info(getContext().getVertexName() +  ": 
Grandparent " + predPred.getOperatorKey().toString() +
+                                        " finished with actual output " + 
outputSize + " (desired parallelism " + desiredNumReducers + ")");
+                            }
+                        }
+                    }
+                }
+            }
+            try {
+                ParallelismSetter parallelismSetter = new 
ParallelismSetter(tezPlan, pc);
+                parallelismSetter.visit();
+                thisParallelism = op.getEstimatedParallelism();
+            } catch (IOException e) {
+                throw new TezUncheckedException(e);
+            }
+            Map<String, EdgeProperty> edgeManagers = new HashMap<String, 
EdgeProperty>();
+            for(Map.Entry<String,EdgeProperty> entry : 
getContext().getInputVertexEdgeProperties().entrySet()) {
+                EdgeProperty edge = entry.getValue();
+                edge = EdgeProperty.create(DataMovementType.SCATTER_GATHER, 
edge.getDataSourceType(), edge.getSchedulingType(),
+                        edge.getEdgeSource(), edge.getEdgeDestination());
+                edgeManagers.put(entry.getKey(), edge);
+            }
+            try {
+                getContext().reconfigureVertex(thisParallelism, null, 
edgeManagers);
+            } catch (TezException e) {
+                throw new RuntimeException(e);
+            }
+            parallelismSet = true;
+            LOG.info("Initialize parallelism for " + 
getContext().getVertexName() + " to " + thisParallelism);
+        }
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Sat Apr 18 03:30:19 2015
@@ -198,7 +198,7 @@ public class TezCompilerUtil {
      */
     static public boolean isIntermediateReducer(TezOperator tezOper) throws 
VisitorException {
         boolean intermediateReducer = false;
-        LinkedList<POStore> stores = 
PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+        LinkedList<POStore> stores = tezOper.getStores();
         // Not map and not final reducer
         if (stores.size() <= 0 &&
                 (tezOper.getLoaderInfo().getLoads() == null || 
tezOper.getLoaderInfo().getLoads().size() <= 0)) {

Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorKey.java Sat Apr 18 03:30:19 
2015
@@ -89,5 +89,10 @@ public class OperatorKey implements Seri
             NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
 
+    static public OperatorKey fromString(String op) {
+        String scope = op.substring(0, op.indexOf("-"));
+        long id = Long.parseLong(op.substring(op.indexOf("-")+1));
+        return new OperatorKey(scope, id);
+    }
 
 }

Modified: 
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
 (original)
+++ 
pig/trunk/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
 Sat Apr 18 03:30:19 2015
@@ -56,7 +56,7 @@ public class TestImplicitSplitOnTuple {
                 "D2 = FOREACH tuplified GENERATE tuplify.memberId as memberId, 
tuplify.shopId as shopId, score AS score;"+
                 "J = JOIN D1 By shopId, D2 by shopId;"+
                 "K = FOREACH J GENERATE D1::memberId AS member_id1, 
D2::memberId AS member_id2, D1::shopId as shop;"+
-                "L = ORDER K by shop;"+
+                "L = ORDER K by shop, member_id1, member_id2;"+
                 "STORE L into 'output' using mock.Storage;");
         List<Tuple> list = data.get("output");
         assertEquals("list: "+list, 20, list.size());

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Sat Apr 18 03:30:19 2015
@@ -1359,11 +1359,13 @@ public class Util {
         }
     }
 
-    public static void createLogAppender(Class clazz, String appenderName, 
Writer writer) {
-        Logger logger = Logger.getLogger(clazz);
-        WriterAppender writerAppender = new WriterAppender(new 
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
-        writerAppender.setName(appenderName);
-        logger.addAppender(writerAppender);
+    public static void createLogAppender(String appenderName, Writer writer, 
Class...clazzes) {
+        for (Class clazz : clazzes) {
+            Logger logger = Logger.getLogger(clazz);
+            WriterAppender writerAppender = new WriterAppender(new 
PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+            writerAppender.setName(appenderName);
+            logger.addAppender(writerAppender);
+        }
     }
 
     public static void removeLogAppender(Class clazz, String appenderName) {

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Sat Apr 18 
03:30:19 2015
@@ -307,7 +307,7 @@ public class TestTezAutoParallelism {
         PigServer.resetScope();
         StringWriter writer = new StringWriter();
         // When there is a combiner operation involved user specified 
parallelism is overriden
-        Util.createLogAppender(ParallelismSetter.class, 
"testIncreaseIntermediateParallelism", writer);
+        Util.createLogAppender("testIncreaseIntermediateParallelism", writer, 
ParallelismSetter.class);
         try {
             
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
             
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "4000");

Added: pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java?rev=1674429&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java (added)
+++ pig/trunk/test/org/apache/pig/tez/TestTezGraceParallelism.java Sat Apr 18 
03:30:19 2015
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Pattern;
+
+import org.apache.pig.PigServer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.test.Util;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestTezGraceParallelism {
+    private static PigServer pigServer;
+    private static final String INPUT_FILE1 = 
TestTezGraceParallelism.class.getName() + "_1";
+    private static final String INPUT_FILE2 = 
TestTezGraceParallelism.class.getName() + "_2";
+    private static final String INPUT_DIR = 
Util.getTestDirectory(TestTezGraceParallelism.class);
+
+    @Before
+    public void setUp() throws Exception {
+        pigServer = new PigServer(Util.getLocalTestMode());
+    }
+
+    private static void createFiles() throws IOException {
+        new File(INPUT_DIR).mkdirs();
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + 
INPUT_FILE1));
+
+        String boyNames[] = {"Noah", "Liam", "Jacob", "Mason", "William",
+                "Ethan", "Michael", "Alexander", "Jayden", "Daniel"};
+        String girlNames[] = {"Sophia", "Emma", "Olivia", "Isabella", "Ava",
+                "Mia", "Emily", "Abigail", "Madison", "Elizabeth"};
+
+        String names[] = new String[boyNames.length + girlNames.length];
+        for (int i=0;i<boyNames.length;i++) {
+            names[i] = boyNames[i];
+        }
+        for (int i=0;i<girlNames.length;i++) {
+            names[boyNames.length+i] = girlNames[i];
+        }
+
+        Random rand = new Random(1);
+        for (int i=0;i<1000;i++) {
+            w.println(names[rand.nextInt(names.length)] + "\t" + 
rand.nextInt(18));
+        }
+        w.close();
+
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
+        for (String name : boyNames) {
+            w.println(name + "\t" + "M");
+        }
+        for (String name : girlNames) {
+            w.println(name + "\t" + "F");
+        }
+        w.close();
+    }
+
+    private static void deleteFiles() {
+        Util.deleteDirectory(new File(INPUT_DIR));
+    }
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        createFiles();
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws Exception {
+        deleteFiles();
+    }
+
+    @Test
+    public void testDecreaseParallelism() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testDecreaseParallelism", writer, new 
Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
+        try {
+            // DAG: 47 \
+            //           -> 49(join) -> 52(distinct) -> 61(group)
+            //      48 /
+            // Parallelism at compile time:
+            // DAG: 47(1) \
+            //              -> 49(2) -> 52(20) -> 61(200)
+            //      48(1) /
+            // However, when 49 finishes, the actual output of 49 only justify 
parallelism 1.
+            // We adjust the parallelism for 61 to 100 based on this.
+            // At runtime, ShuffleVertexManager still kick in and further 
reduce parallelism from 100 to 1. 
+            // 
+            pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("C = join A by name, B by name;");
+            pigServer.registerQuery("D = foreach C generate A::name as name, 
A::age as age, gender;");
+            pigServer.registerQuery("E = distinct D;");
+            pigServer.registerQuery("F = group E by gender;");
+            pigServer.registerQuery("G = foreach F generate group as gender, 
SUM(E.age);");
+            Iterator<Tuple> iter = pigServer.openIterator("G");
+            List<Tuple> expectedResults = Util
+                    .getTuplesFromConstantTupleStrings(new String[] {
+                            "('F',1349L)", "('M',1373L)"});
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-52 to 20"));
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-61 to 100"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-49 to 1 from 2"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-52 to 1 from 20"));
+            assertTrue(writer.toString().contains("Reduce auto parallelism for 
vertex: scope-61 to 1 from 100"));
+        } finally {
+            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testDecreaseParallelism");
+            Util.removeLogAppender(ShuffleVertexManager.class, 
"testDecreaseParallelism");
+        }
+    }
+
+    @Test
+    public void testIncreaseParallelism() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testIncreaseParallelism", writer, new 
Class[]{PigGraceShuffleVertexManager.class, ShuffleVertexManager.class});
+        try {
+            // DAG: 35 \             /  46(sample)   \
+            //           -> 37(order)        ->    56(order) -> 58(order) -> 
64(distinct)
+            //      36 /
+            // Parallelism at compile time:
+            // DAG: 35(1) \          /  46(1)   \
+            //              -> 37(2)     ->    56(-1) -> 58(-1) -> 64(20)
+            //      36(1) /
+            // However, when 56 finishes, the actual output of 56 need 
parallelism 5.
+            // We adjust the parallelism for 64 to 50 based on this.
+            // At runtime, ShuffleVertexManager will play and reduce 
parallelism from 50 
+            
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "80000");
+            pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("C = join A by 1, B by 1;");
+            pigServer.registerQuery("D = foreach C generate A::name as name, 
A::age as age, gender;");
+            pigServer.registerQuery("E = order D by name;");
+            pigServer.registerQuery("F = distinct E;");
+            Iterator<Tuple> iter = pigServer.openIterator("F");
+            int count = 0;
+            while (iter.hasNext()) {
+                iter.next();
+                count++;
+            }
+            assertEquals(count, 644);
+            System.out.println(writer.toString());
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-64 to 50"));
+            // There are randomness in which task finishes first, so the auto 
parallelism could result different result
+            assertTrue(Pattern.compile("Reduce auto parallelism for vertex: 
scope-64 to (\\d+)* from 50").matcher(writer.toString()).find());
+        } finally {
+            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testIncreaseParallelism");
+            Util.removeLogAppender(ShuffleVertexManager.class, 
"testIncreaseParallelism");
+        }
+    }
+
+    @Test
+    public void testJoinWithDifferentDepth() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testJoinWithDifferentDepth", writer, 
PigGraceShuffleVertexManager.class);
+        try {
+            // DAG:     /  51(sample) \
+            //      42        ->        61(order) -> 63(order) -> 69(distinct) 
\
+            //                                                                 
  -> 80(join)
+            //                                             78  -> 79(group)    
/
+            // The join(80) has two inputs: 69 with deeper pipeline, 79 with 
narrower.
+            // This test is to make sure 79 can start (by invoking 
80.setParallelism) early,
+            // don't need to wait for 63 complete
+            pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("B = order A by name;");
+            pigServer.registerQuery("C = distinct B;");
+            pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + 
INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("E = group D by name;");
+            pigServer.registerQuery("F = foreach E generate group as name, 
AVG(D.age) as avg_age;");
+            pigServer.registerQuery("G = join C by name, F by name;");
+            Iterator<Tuple> iter = pigServer.openIterator("G");
+            int count = 0;
+            while (iter.hasNext()) {
+                iter.next();
+                count++;
+            }
+            assertEquals(count, 20);
+            assertTrue(writer.toString().contains("All predecessors for 
scope-79 are finished, time to set parallelism for scope-80"));
+            assertTrue(writer.toString().contains("Initialize parallelism for 
scope-80 to 101"));
+        } finally {
+            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testJoinWithDifferentDepth");
+        }
+    }
+
+    @Test
+    public void testJoinWithDifferentDepth2() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        Util.createLogAppender("testJoinWithDifferentDepth2", writer, 
PigGraceShuffleVertexManager.class);
+        try {
+            // DAG:     /  40(sample) \
+            //      31        ->        50(order) -> 52(order) -> 58(distinct) 
\
+            //                                                                 
  -> 68(join)
+            //                                                           67    
/
+            // The join(68) should start immediately. We will not use 
PigGraceShuffleVertexManager in this case
+            pigServer.registerQuery("A = load '" + INPUT_DIR + "/" + 
INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("B = order A by name;");
+            pigServer.registerQuery("C = distinct B;");
+            pigServer.registerQuery("D = load '" + INPUT_DIR + "/" + 
INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("E = join C by name, D by name;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            int count = 0;
+            while (iter.hasNext()) {
+                iter.next();
+                count++;
+            }
+            assertEquals(count, 1000);
+            assertFalse(writer.toString().contains("scope-68"));
+        } finally {
+            Util.removeLogAppender(PigGraceShuffleVertexManager.class, 
"testJoinWithDifferentDepth2");
+        }
+    }
+}

Modified: pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java Sat Apr 18 
03:30:19 2015
@@ -238,8 +238,9 @@ public class TestTezJobControlCompiler {
                 + "store e into 'output';";
         Pair<TezOperPlan, DAG> compiledPlan = compile(query);
         TezOperator leafOper = compiledPlan.first.getLeaves().get(0);
+        assertTrue(leafOper.isUseGraceParallelism());
         Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
-        assertEquals(leafVertex.getParallelism(), 70);
+        assertEquals(leafVertex.getParallelism(), -1);
     }
 
     @Test
@@ -271,8 +272,9 @@ public class TestTezJobControlCompiler {
         List<TezOperator> leaves = compiledPlan.first.getLeaves();
         Collections.sort(leaves);
         TezOperator leafOper = leaves.get(1);
+        assertTrue(leafOper.isUseGraceParallelism());
         Vertex leafVertex = 
compiledPlan.second.getVertex(leafOper.getOperatorKey().toString());
-        assertEquals(leafVertex.getParallelism(), 7);
+        assertEquals(leafVertex.getParallelism(), -1);
     }
 
     @Test

Modified: pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java?rev=1674429&r1=1674428&r2=1674429&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezLauncher.java Sat Apr 18 03:30:19 
2015
@@ -20,12 +20,17 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+import java.util.Iterator;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
@@ -56,8 +61,8 @@ public class TestTezLauncher {
     };
 
     private static final String OUTPUT_FILE = "TestTezLauncherOutput";
-    private static final String[] OUTPUT_RECORDS = {
-        "all\t{(apple),(pear),(pear),(strawberry),(orange)}"
+    private static final String[] OUTPUT_RECORDS = new String[] {
+        "(apple)", "(pear)", "(pear)", "(strawberry)", "(orange)"
     };
 
     @BeforeClass
@@ -96,16 +101,25 @@ public class TestTezLauncher {
         PigStats pigStats = launcher.launchPig(pp, "testRun1", pc);
         assertTrue(pigStats.isSuccessful());
 
-        String[] output = Util.readOutput(cluster.getFileSystem(), 
OUTPUT_FILE);
-        for (int i = 0; i < output.length; i++) {
-            assertEquals(OUTPUT_RECORDS[i], output[i]);
-        }
-
         assertEquals(1, pigStats.getInputStats().size());
         assertEquals(INPUT_FILE, pigStats.getInputStats().get(0).getName());
 
         assertEquals(1, pigStats.getOutputStats().size());
         assertEquals(OUTPUT_FILE, pigStats.getOutputStats().get(0).getName());
+
+        query = "m = load '" + OUTPUT_FILE + "' as (a:chararray, 
b:{(y:chararray)});";
+        pigServer = new PigServer(pc);
+        pigServer.registerQuery(query);
+        Iterator<Tuple> iter = pigServer.openIterator("m");
+        Tuple result = iter.next();
+        assertEquals(result.get(0).toString(), "all");
+        Iterator<Tuple> innerIter = ((DataBag)result.get(1)).iterator();
+        int count = 0;
+        while (innerIter.hasNext()) {
+            
assertTrue(Arrays.asList(OUTPUT_RECORDS).contains(innerIter.next().toString()));
+            count++;
+        }
+        assertEquals(count, OUTPUT_RECORDS.length);
     }
 
     @Test


Reply via email to