Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 Fri Feb 24 08:19:42 2017
@@ -44,6 +44,9 @@ public class POPoissonSample extends Phy
 
     private transient boolean initialized;
 
+    // num of rows skipped so far
+    private transient int numSkipped;
+
     // num of rows sampled so far
     private transient int numRowsSampled;
 
@@ -89,6 +92,7 @@ public class POPoissonSample extends Phy
     @Override
     public Result getNextTuple() throws ExecException {
         if (!initialized) {
+            numSkipped = 0;
             numRowsSampled = 0;
             avgTupleMemSz = 0;
             rowNum = 0;
@@ -134,7 +138,7 @@ public class POPoissonSample extends Phy
         }
 
         // skip tuples
-        for (long numSkipped  = 0; numSkipped < skipInterval; numSkipped++) {
+        while (numSkipped < skipInterval) {
             res = processInput();
             if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
@@ -148,6 +152,7 @@ public class POPoissonSample extends Phy
                 return res;
             }
             rowNum++;
+            numSkipped++;
         }
 
         // skipped enough, get new sample
@@ -173,6 +178,8 @@ public class POPoissonSample extends Phy
 
             rowNum++;
             newSample = res;
+            // reset skipped
+            numSkipped = 0;
             return currentSample;
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
 Fri Feb 24 08:19:42 2017
@@ -125,7 +125,7 @@ public class POReservoirSample extends P
                 }
 
                 // collect samples until input is exhausted
-                int rand = randGen.nextInt(rowProcessed);
+                int rand = randGen.nextInt(rowProcessed + 1);
                 if (rand < numSamples) {
                     samples[rand] = res;
                 }
@@ -133,8 +133,13 @@ public class POReservoirSample extends P
             }
         }
 
-        if (this.parentPlan.endOfAllInput && res.returnStatus == 
POStatus.STATUS_EOP) {
-            sampleCollectionDone = true;
+        if (res.returnStatus == POStatus.STATUS_EOP) {
+            if (this.parentPlan.endOfAllInput) {
+                sampleCollectionDone = true;
+            } else {
+                // In case of Split can get EOP in between.
+                return res;
+            }
         }
 
         return getSample();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
 Fri Feb 24 08:19:42 2017
@@ -51,13 +51,13 @@ public class Packager implements Illustr
     protected DataBag[] bags;
 
     public static enum PackageType {
-        GROUP, JOIN
+        GROUP, JOIN, BLOOMJOIN
     };
 
     protected transient Illustrator illustrator = null;
 
     // The key being worked on
-    Object key;
+    protected Object key;
 
     // marker to indicate if key is a tuple
     protected boolean isKeyTuple = false;
@@ -65,7 +65,7 @@ public class Packager implements Illustr
     protected boolean isKeyCompound = false;
 
     // key's type
-    byte keyType;
+    protected byte keyType;
 
     // The number of inputs to this
     // co-group. 0 indicates a distinct, which means there will only be a

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
 Fri Feb 24 08:19:42 2017
@@ -60,7 +60,7 @@ public class StoreFuncDecorator {
 
     private boolean allowErrors() {
         return UDFContext.getUDFContext().getJobConf()
-                .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false);
+                .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, 
false);
     }
 
     /**

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Fri Feb 24 08:19:42 2017
@@ -19,13 +19,14 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -56,6 +58,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
@@ -87,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -108,7 +110,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -174,6 +175,7 @@ public class TezDagBuilder extends TezOp
     private PigContext pc;
     private Configuration globalConf;
     private Configuration pigContextConf;
+    private Configuration shuffleVertexManagerBaseConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -191,6 +193,8 @@ public class TezDagBuilder extends TezOp
     private String mapTaskLaunchCmdOpts;
     private String reduceTaskLaunchCmdOpts;
 
+    private boolean disableDAGRecovery = false;
+
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -210,6 +214,10 @@ public class TezDagBuilder extends TezOp
         }
     }
 
+    public boolean shouldDisableDAGRecovery() {
+        return disableDAGRecovery;
+    }
+
     private void initialize(PigContext pc) throws IOException {
 
         this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
@@ -217,6 +225,16 @@ public class TezDagBuilder extends TezOp
         this.pigContextConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
         MRToTezHelper.processMRSettings(pigContextConf, globalConf);
 
+        shuffleVertexManagerBaseConf = new Configuration(false);
+        // Only copy tez.shuffle-vertex-manager config to keep payload size 
small
+        Iterator<Entry<String, String>> iter = pigContextConf.iterator();
+        while (iter.hasNext()) {
+            Entry<String, String> entry = iter.next();
+            if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) {
+                shuffleVertexManagerBaseConf.set(entry.getKey(), 
entry.getValue());
+            }
+        }
+
         // Add credentials from binary token file and get tokens for namenodes
         // specified in mapreduce.job.hdfs-servers
         SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
@@ -265,7 +283,7 @@ public class TezDagBuilder extends TezOp
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) 
{
             // If tez setting is not defined
             MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
-            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, 
true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, 
false);
         }
 
         if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) 
{
@@ -279,7 +297,7 @@ public class TezDagBuilder extends TezOp
 
         try {
             fs = FileSystem.get(globalConf);
-            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, 
FileLocalizer.getTemporaryResourcePath(pc));
+            intermediateTaskInputSize = 
fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, 
defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -397,7 +415,11 @@ public class TezDagBuilder extends TezOp
                 tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup);
                 POStore store = tezOp.getVertexGroupInfo().getStore();
                 if (store != null) {
-                    vertexGroup.addDataSink(store.getOperatorKey().toString(),
+                    String outputKey = store.getOperatorKey().toString();
+                    if (store instanceof POStoreTez) {
+                        outputKey = ((POStoreTez) store).getOutputKey();
+                    }
+                    vertexGroup.addDataSink(outputKey,
                             
DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
                             
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), 
dag.getCredentials()));
                 }
@@ -441,7 +463,14 @@ public class TezDagBuilder extends TezOp
 
         Configuration conf = new Configuration(pigContextConf);
 
-        if (!combinePlan.isEmpty()) {
+        if (edge.needsDistinctCombiner()) {
+            conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS,
+                    MRCombiner.class.getName());
+            conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+                    DistinctCombiner.Combine.class.getName());
+            log.info("Setting distinct combiner class between "
+                    + from.getOperatorKey() + " and " + to.getOperatorKey());
+        } else if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, 
UDFType.USERFUNC);
             addCombiner(combinePlan, to, conf, isMergedInput);
         }
@@ -450,7 +479,7 @@ public class TezDagBuilder extends TezOp
                 POLocalRearrangeTez.class);
 
         for (POLocalRearrangeTez lr : lrs) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
                 setIntermediateOutputKeyValue(keyType, conf, to, 
lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual 
key type
@@ -479,7 +508,8 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.set("pig.pigContext", serializedPigContext);
+        conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, 
pc.getExecType().isLocal());
+        conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, 
ObjectSerializer.serialize(pc.getLog4jProperties()));
         conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
@@ -510,26 +540,36 @@ public class TezDagBuilder extends TezOp
 
         UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
         out.setUserPayload(payLoad);
+        in.setUserPayload(payLoad);
 
+        // Remove combiner and reset payload
         if (!combinePlan.isEmpty()) {
             boolean noCombineInReducer = false;
+            boolean noCombineInMapper = edge.getCombinerInMap() == null ? 
false : !edge.getCombinerInMap();
             String reducerNoCombiner = 
globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER);
-            if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) 
{
+            if (edge.getCombinerInReducer() != null) {
+                noCombineInReducer = !edge.getCombinerInReducer();
+            } else if (reducerNoCombiner == null || 
reducerNoCombiner.equals("auto")) {
                 noCombineInReducer = 
TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan);
             } else {
                 noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner);
             }
-            if (noCombineInReducer) {
+            if (noCombineInReducer || noCombineInMapper) {
                 log.info("Turning off combiner in reducer vertex " + 
to.getOperatorKey() + " for edge from " + from.getOperatorKey());
                 conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
                 conf.unset(MRJobConfig.COMBINE_CLASS_ATTR);
                 conf.unset("pig.combinePlan");
                 conf.unset("pig.combine.package");
                 conf.unset("pig.map.keytype");
-                payLoad = TezUtils.createUserPayloadFromConf(conf);
+                UserPayload payLoadWithoutCombiner = 
TezUtils.createUserPayloadFromConf(conf);
+                if (noCombineInMapper) {
+                    out.setUserPayload(payLoadWithoutCombiner);
+                }
+                if (noCombineInReducer) {
+                    in.setUserPayload(payLoadWithoutCombiner);
+                }
             }
         }
-        in.setUserPayload(payLoad);
 
         if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && 
(to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
@@ -593,6 +633,8 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
+        payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, 
pc.getExecType().isLocal());
+        payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, 
ObjectSerializer.serialize(pc.getLog4jProperties()));
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -611,11 +653,7 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);
-            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() 
instanceof DefaultIndexableLoader) {
-                inputPayLoad.set("pig.pigContext", serializedPigContext);
-            }
         }
-        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, 
tezOp.getSampleOperator().getOperatorKey().toString());
@@ -689,7 +727,7 @@ public class TezDagBuilder extends TezOp
                             PlanHelper.getPhysicalOperators(pred.plan, 
POLocalRearrangeTez.class);
                     for (POLocalRearrangeTez lr : lrs) {
                         if (lr.isConnectedToPackage()
-                                && 
lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
+                                && 
lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), 
inputKey);
                             if (isVertexGroup) {
                                 isMergedInput = true;
@@ -772,9 +810,25 @@ public class TezDagBuilder extends TezOp
 
         String vmPluginName = null;
         Configuration vmPluginConf = null;
+        boolean containScatterGather = false;
+        boolean containCustomPartitioner = false;
+        for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+            if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+                containScatterGather = true;
+            }
+            if (edge.partitionerClass != null) {
+                containCustomPartitioner = true;
+            }
+        }
+
+        if(containScatterGather) {
+            vmPluginName = ShuffleVertexManager.class.getName();
+            vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
+        }
 
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
+            boolean autoParallelism = false;
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                 if (tezOp.getVertexParallelism()==-1 && (
                         tezOp.isGlobalSort() 
&&getPlan().getPredecessors(tezOp).size()==1||
@@ -783,33 +837,12 @@ public class TezDagBuilder extends TezOp
                     // to decrease/increase parallelism of sorting vertex 
dynamically
                     // based on the numQuantiles calculated by sample 
aggregation vertex
                     vmPluginName = 
PartitionerDefinedVertexManager.class.getName();
+                    autoParallelism = true;
                     log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
                 }
             } else {
-                boolean containScatterGather = false;
-                boolean containCustomPartitioner = false;
-                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-                    if (edge.dataMovementType == 
DataMovementType.SCATTER_GATHER) {
-                        containScatterGather = true;
-                    }
-                    if (edge.partitionerClass!=null) {
-                        containCustomPartitioner = true;
-                    }
-                }
                 if (containScatterGather && !containCustomPartitioner) {
-                    vmPluginConf = (vmPluginConf == null) ? new 
Configuration(pigContextConf) : vmPluginConf;
-                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
-                    // reduce the parallelism of the vertex
-                    if 
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
-                            && 
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
-                        vmPluginName = 
PigGraceShuffleVertexManager.class.getName();
-                        tezOp.setUseGraceParallelism(true);
-                        vmPluginConf.set("pig.tez.plan", 
getSerializedTezPlan());
-                        vmPluginConf.set("pig.pigContext", 
serializedPigContext);
-                    } else {
-                        vmPluginName = ShuffleVertexManager.class.getName();
-                    }
-                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
+
                     // For Intermediate reduce, set the bytes per reducer to 
be block size.
                     long bytesPerReducer = intermediateTaskInputSize;
                     // If there are store statements, use 
BYTES_PER_REDUCER_PARAM configured by user.
@@ -818,8 +851,8 @@ public class TezDagBuilder extends TezOp
                     // In Tez, numReducers=(map output size/bytesPerReducer) 
we need lower values to avoid skews in reduce
                     // as map input sizes are mostly always high compared to 
map output.
                     if (stores.size() > 0) {
-                        if 
(vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
-                            bytesPerReducer = vmPluginConf.getLong(
+                        if 
(pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) 
{
+                            bytesPerReducer = pigContextConf.getLong(
                                             
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                             
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
                         } else if (tezOp.isGroupBy()) {
@@ -828,10 +861,28 @@ public class TezDagBuilder extends TezOp
                             bytesPerReducer = 
SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
                         }
                     }
+
+                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
+                    // reduce the parallelism of the vertex. Use 
PigGraceShuffleVertexManager
+                    // instead of ShuffleVertexManager if 
pig.tez.grace.parallelism is turned on
+                    if 
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && 
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
+                            && tezOp.getCrossKeys() == null) {
+                        vmPluginName = 
PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", 
getSerializedTezPlan());
+                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, 
serializedPigContext);
+                        
vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
bytesPerReducer);
+                    }
+                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
                     
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 bytesPerReducer);
+                    autoParallelism = true;
                     log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
                 }
             }
+            if 
(globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY,
 false) && autoParallelism) {
+                disableDAGRecovery = true;
+            }
         }
         if (tezOp.isLimit() && (vmPluginName == null || 
vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
                 vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
@@ -1409,22 +1460,12 @@ public class TezDagBuilder extends TezOp
 
     private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if it is supported by
-        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if 
PigConfiguration.PIG_OUTPUT_LAZY is set
         if 
("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY)))
 {
-            try {
-                Class<?> clazz = PigContext
-                        
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-                Method method = clazz.getMethod("setOutputFormatClass",
-                        org.apache.hadoop.mapreduce.Job.class, Class.class);
-                method.invoke(null, job, PigOutputFormatTez.class);
-            } catch (Exception e) {
-                job.setOutputFormatClass(PigOutputFormatTez.class);
-                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
-                        + " is set but LazyOutputFormat couldn't be loaded. 
Default PigOutputFormat will be used");
-            }
+            
LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
         } else {
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 Fri Feb 24 08:19:42 2017
@@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
@@ -51,7 +56,7 @@ import com.google.common.collect.Maps;
  */
 public class TezJob implements Runnable {
     private static final Log log = LogFactory.getLog(TezJob.class);
-    private Configuration conf;
+    private TezConfiguration conf;
     private EnumSet<StatusGetOpts> statusGetOpts;
     private Map<String, LocalResource> requestAMResources;
     private ApplicationId appId;
@@ -69,31 +74,71 @@ public class TezJob implements Runnable
 
     public TezJob(TezConfiguration conf, DAG dag,
             Map<String, LocalResource> requestAMResources,
-            int estimatedTotalParallelism) throws IOException {
+            TezOperPlan tezPlan) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = 
conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+        tezJobConf = new TezJobConfig(tezPlan);
     }
 
     static class TezJobConfig {
 
         private int estimatedTotalParallelism = -1;
+        private int maxOutputsinSingleVertex;
+        private int totalVertices  = 0;
 
-        public TezJobConfig(int estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
+            this.estimatedTotalParallelism = 
tezPlan.getEstimatedTotalParallelism();
+            MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
+            finder.visit();
+            this.maxOutputsinSingleVertex = 
finder.getMaxOutputsinSingleVertex();
+            this.totalVertices = finder.getTotalVertices();
         }
 
         public int getEstimatedTotalParallelism() {
             return estimatedTotalParallelism;
         }
 
-        public void setEstimatedTotalParallelism(int 
estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
         }
 
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+    }
+
+    private static class MaxOutputsFinder extends TezOpPlanVisitor {
+
+        private int maxOutputsinSingleVertex  = 1;
+        private int totalVertices  = 0;
+
+        public MaxOutputsFinder(TezOperPlan plan) {
+            super(plan, new DependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+        }
+
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
+        }
+
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOperator) throws 
VisitorException {
+            if (!tezOperator.isVertexGroup()) {
+                totalVertices++;
+                int outputs = tezOperator.outEdges.keySet().size();
+                maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs 
? maxOutputsinSingleVertex : outputs;
+            }
+        }
+
+
+
     }
 
     public DAG getDAG() {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 Fri Feb 24 08:19:42 2017
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
@@ -50,11 +52,12 @@ public class TezJobCompiler {
     private static final Log log = LogFactory.getLog(TezJobCompiler.class);
 
     private PigContext pigContext;
-    private TezConfiguration tezConf;
+    private Configuration conf;
+    private boolean disableDAGRecovery;
 
     public TezJobCompiler(PigContext pigContext, Configuration conf) throws 
IOException {
         this.pigContext = pigContext;
-        this.tezConf = new TezConfiguration(conf);
+        this.conf = conf;
     }
 
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, 
LocalResource> localResources)
@@ -64,6 +67,7 @@ public class TezJobCompiler {
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, 
tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
         dagBuilder.avoidContainerReuseIfInputSplitInDisk();
+        disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery();
         return tezDag;
     }
 
@@ -85,6 +89,7 @@ public class TezJobCompiler {
         return job;
     }
 
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer 
planContainer)
             throws JobCreationException {
         try {
@@ -107,8 +112,34 @@ public class TezJobCompiler {
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            // set Tez caller context
+            // Reflection for the following code since it is only available 
since tez 0.8.1:
+            // CallerContext context = 
CallerContext.create(ATSService.CallerContext, 
ATSService.getPigAuditId(pigContext),
+            //     ATSService.EntityType, "");
+            // tezDag.setCallerContext(context);
+            Class callerContextClass = null;
+            try {
+                callerContextClass = 
Class.forName("org.apache.tez.client.CallerContext");
+            } catch (ClassNotFoundException e) {
+                // If pre-Tez 0.8.1, skip setting CallerContext
+            }
+            if (callerContextClass != null) {
+                Method builderBuildMethod = 
callerContextClass.getMethod("create", String.class,
+                        String.class, String.class, String.class);
+                Object context = builderBuildMethod.invoke(null, 
PigATSClient.CALLER_CONTEXT,
+                        PigATSClient.getPigAuditId(pigContext), 
PigATSClient.ENTITY_TYPE, "");
+                Method dagSetCallerContext = 
tezDag.getClass().getMethod("setCallerContext",
+                        context.getClass());
+                dagSetCallerContext.invoke(tezDag, context);
+            }
             log.info("Total estimated parallelism is " + 
tezPlan.getEstimatedTotalParallelism());
-            return new TezJob(tezConf, tezDag, localResources, 
tezPlan.getEstimatedTotalParallelism());
+            TezConfiguration tezConf = new TezConfiguration(conf);
+            if (disableDAGRecovery
+                    && 
tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
+                            TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+                tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, 
false);
+            }
+            return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import java.io.PrintStream;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -166,7 +167,7 @@ public class TezLauncher extends Launche
         tezStats = new TezPigScriptStats(pc);
         PigStats.start(tezStats);
 
-        conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
+        conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
         TezJobCompiler jc = new TezJobCompiler(pc, conf);
         TezPlanContainer tezPlanContainer = compile(php, pc);
 
@@ -174,6 +175,10 @@ public class TezLauncher extends Launche
         tezScriptState.emitInitialPlanNotification(tezPlanContainer);
         tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); 
//number of DAGs to Launch
 
+        boolean stop_on_failure =
+                
Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
+        boolean stoppedOnFailure = false;
+
         TezPlanContainerNode tezPlanContainerNode;
         TezOperPlan tezPlan;
         int processedDAGs = 0;
@@ -252,7 +257,18 @@ public class TezLauncher extends Launche
                     ((tezPlanContainer.size() - 
processedDAGs)/tezPlanContainer.size()) * 100);
             }
             handleUnCaughtException(pc);
-            tezPlanContainer.updatePlan(tezPlan, 
reporter.notifyFinishedOrFailed());
+            boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
+            tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
+            // if stop_on_failure is enabled, we need to stop immediately when 
any job has failed
+            if (!tezDAGSucceeded) {
+                if (stop_on_failure) {
+                    stoppedOnFailure = true;
+                    break;
+                } else {
+                    log.warn("Ooops! Some job has failed! Specify 
-stop_on_failure if you "
+                            + "want Pig to stop immediately on failure.");
+                }
+            }
         }
 
         tezStats.finish();
@@ -279,6 +295,11 @@ public class TezLauncher extends Launche
             }
         }
 
+        if (stoppedOnFailure) {
+            throw new ExecException("Stopping execution on job failure with 
-stop_on_failure option", 6017,
+                    PigException.REMOTE_ENVIRONMENT);
+        }
+
         return tezStats;
     }
 
@@ -402,9 +423,11 @@ public class TezLauncher extends Launche
         TezCompiler comp = new TezCompiler(php, pc);
         comp.compile();
         TezPlanContainer planContainer = comp.getPlanContainer();
-        for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer
-                .getKeys().entrySet()) {
-            TezOperPlan tezPlan = entry.getValue().getTezOperPlan();
+        // Doing a sort so that test plan printed remains same between jdk7 
and jdk8
+        List<OperatorKey> opKeys = new 
ArrayList<>(planContainer.getKeys().keySet());
+        Collections.sort(opKeys);
+        for (OperatorKey opKey : opKeys) {
+            TezOperPlan tezPlan = 
planContainer.getOperator(opKey).getTezOperPlan();
             optimize(tezPlan, pc);
         }
         return planContainer;
@@ -499,7 +522,7 @@ public class TezLauncher extends Launche
 
     @Override
     public void killJob(String jobID, Configuration conf) throws 
BackendException {
-        if (runningJob != null && runningJob.getApplicationId().toString() == 
jobID) {
+        if (runningJob != null && 
runningJob.getApplicationId().toString().equals(jobID)) {
             try {
                 runningJob.killJob();
             } catch (Exception e) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 Fri Feb 24 08:19:42 2017
@@ -39,6 +39,8 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class TezResourceManager {
     private static TezResourceManager instance = null;
     private boolean inited = false;
@@ -59,6 +61,7 @@ public class TezResourceManager {
     /**
      * This method is only used by test code to reset state.
      */
+    @VisibleForTesting
     public static void dropInstance() {
         instance = null;
     }
@@ -66,7 +69,7 @@ public class TezResourceManager {
     public void init(PigContext pigContext, Configuration conf) throws 
IOException {
         if (!inited) {
             this.resourcesDir = 
FileLocalizer.getTemporaryResourcePath(pigContext);
-            this.remoteFs = FileSystem.get(conf);
+            this.remoteFs = resourcesDir.getFileSystem(conf);
             this.conf = conf;
             this.pigContext = pigContext;
             this.inited = true;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
 Fri Feb 24 08:19:42 2017
@@ -18,7 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -29,9 +31,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.client.TezAppMasterStatus;
@@ -46,13 +50,13 @@ public class TezSessionManager {
     private static final Log log = LogFactory.getLog(TezSessionManager.class);
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
+        Utils.addShutdownHookWithPriority(new Runnable() {
 
             @Override
             public void run() {
                 TezSessionManager.shutdown();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
     }
 
     private static ReentrantReadWriteLock sessionPoolLock = new 
ReentrantReadWriteLock();
@@ -61,11 +65,17 @@ public class TezSessionManager {
     private TezSessionManager() {
     }
 
-    public static class SessionInfo {
-        SessionInfo(TezClient session, Map<String, LocalResource> resources) {
+    private static class SessionInfo {
+
+        public SessionInfo(TezClient session, TezConfiguration config, 
Map<String, LocalResource> resources) {
             this.session = session;
+            this.config = config;
             this.resources = resources;
         }
+
+        public TezConfiguration getConfig() {
+            return config;
+        }
         public Map<String, LocalResource> getResources() {
             return resources;
         }
@@ -77,20 +87,23 @@ public class TezSessionManager {
         }
         private TezClient session;
         private Map<String, LocalResource> resources;
+        private TezConfiguration config;
         private boolean inUse = false;
     }
 
     private static List<SessionInfo> sessionPool = new 
ArrayList<SessionInfo>();
 
-    private static SessionInfo createSession(Configuration conf,
+    private static SessionInfo createSession(TezConfiguration amConf,
             Map<String, LocalResource> requestedAMResources, Credentials creds,
             TezJobConfig tezJobConf) throws TezException, IOException,
             InterruptedException {
-        TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        MRToTezHelper.translateMRSettingsForTezAM(amConf);
         TezScriptState ss = TezScriptState.get();
         ss.addDAGSettingsToConf(amConf);
-        adjustAMConfig(amConf, tezJobConf);
-        String jobName = conf.get(PigContext.JOB_NAME, "pig");
+        if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, 
true)) {
+            adjustAMConfig(amConf, tezJobConf);
+        }
+        String jobName = amConf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, 
requestedAMResources, creds);
         try {
             tezClient.start();
@@ -104,12 +117,10 @@ public class TezSessionManager {
             tezClient.stop();
             throw new RuntimeException(e);
         }
-        return new SessionInfo(tezClient, requestedAMResources);
+        return new SessionInfo(tezClient, amConf, requestedAMResources);
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig 
tezJobConf) {
-        int requiredAMMaxHeap = -1;
-        int requiredAMResourceMB = -1;
         String amLaunchOpts = amConf.get(
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -122,8 +133,10 @@ public class TezSessionManager {
 
             // Need more room for native memory/virtual address space
             // when close to 4G due to 32-bit jvm 4G limit
-            int minAMMaxHeap = 3200;
-            int minAMResourceMB = 4096;
+            int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
+            int maxAMResourceMB = 4096;
+            int requiredAMResourceMB = maxAMResourceMB;
+            int requiredAMMaxHeap = maxAMHeap;
 
             // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
             // Increment container size by 512 mb for every additional 5K 
tasks.
@@ -135,22 +148,38 @@ public class TezSessionManager {
             //     5000 and above  - 1024Xmx, 1536 (512 native memory)
             for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
                 if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
-                    requiredAMMaxHeap = minAMMaxHeap;
-                    requiredAMResourceMB = minAMResourceMB;
                     break;
                 }
-                minAMResourceMB = minAMResourceMB - 512;
-                minAMMaxHeap = minAMResourceMB - 512;
+                requiredAMResourceMB = requiredAMResourceMB - 512;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            if (tezJobConf.getTotalVertices() > 30) {
+                //Add 512 mb per 30 vertices
+                int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
+                //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
+                int additionaMem = 256 * 
(tezJobConf.getMaxOutputsinSingleVertex() / 5);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
             }
 
+            requiredAMResourceMB = Math.min(maxAMResourceMB, 
requiredAMResourceMB);
+            requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
+
             if (requiredAMResourceMB > -1 && configuredAMResourceMB < 
requiredAMResourceMB) {
                 amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 
requiredAMResourceMB);
                 log.info("Increasing "
                         + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
                         + configuredAMResourceMB + " to "
                         + requiredAMResourceMB
-                        + " as the number of total estimated tasks is "
-                        + tezJobConf.getEstimatedTotalParallelism());
+                        + " as total estimated tasks = " + 
tezJobConf.getEstimatedTotalParallelism()
+                        + ", total vertices = " + tezJobConf.getTotalVertices()
+                        + ", max outputs = " + 
tezJobConf.getMaxOutputsinSingleVertex());
 
                 if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < 
requiredAMMaxHeap) {
                     amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -158,8 +187,9 @@ public class TezSessionManager {
                     log.info("Increasing Tez AM Heap Size from "
                             + configuredAMMaxHeap + "M to "
                             + requiredAMMaxHeap
-                            + "M as the number of total estimated tasks is "
-                            + tezJobConf.getEstimatedTotalParallelism());
+                            + "M as total estimated tasks = " + 
tezJobConf.getEstimatedTotalParallelism()
+                            + ", total vertices = " + 
tezJobConf.getTotalVertices()
+                            + ", max outputs = " + 
tezJobConf.getMaxOutputsinSingleVertex());
                     log.info("Value of " + 
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
                             + 
amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
                 }
@@ -178,7 +208,22 @@ public class TezSessionManager {
         return true;
     }
 
-    static TezClient getClient(Configuration conf, Map<String, LocalResource> 
requestedAMResources,
+    private static boolean validateSessionConfig(SessionInfo currentSession,
+            Configuration newSessionConfig)
+            throws TezException, IOException {
+        // If DAG recovery is disabled for one and enabled for another, do not 
reuse
+        if (currentSession.getConfig().getBoolean(
+                    TezConfiguration.DAG_RECOVERY_ENABLED,
+                    TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)
+                != newSessionConfig.getBoolean(
+                        TezConfiguration.DAG_RECOVERY_ENABLED,
+                        TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) {
+            return false;
+        }
+        return true;
+    }
+
+    static TezClient getClient(TezConfiguration conf, Map<String, 
LocalResource> requestedAMResources,
             Credentials creds, TezJobConfig tezJobConf) throws TezException, 
IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
@@ -196,7 +241,8 @@ public class TezSessionManager {
                         sessionsToRemove.add(sessionInfo);
                     } else if (!sessionInfo.inUse
                             && appMasterStatus.equals(TezAppMasterStatus.READY)
-                            && 
validateSessionResources(sessionInfo,requestedAMResources)) {
+                            && 
validateSessionResources(sessionInfo,requestedAMResources)
+                            && validateSessionConfig(sessionInfo, conf)) {
                         sessionInfo.inUse = true;
                         return sessionInfo.session;
                     }
@@ -253,6 +299,11 @@ public class TezSessionManager {
                 synchronized (sessionInfo) {
                     if (sessionInfo.session == session) {
                         log.info("Stopping Tez session " + session);
+                        String timeStamp = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss")
+                                    .format(Calendar.getInstance().getTime());
+                        System.err.println(timeStamp + " Shutting down Tez 
session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + 
session.getAppMasterApplicationId());
                         session.stop();
                         sessionToRemove = sessionInfo;
                         break;
@@ -279,19 +330,30 @@ public class TezSessionManager {
             shutdown = true;
             for (SessionInfo sessionInfo : sessionPool) {
                 synchronized (sessionInfo) {
+                    TezClient session = sessionInfo.session;
                     try {
-                        if (sessionInfo.session.getAppMasterStatus().equals(
+                        String timeStamp = new SimpleDateFormat(
+                                "yyyy-MM-dd 
HH:mm:ss").format(Calendar.getInstance().getTime());
+                        if (session.getAppMasterStatus().equals(
                                 TezAppMasterStatus.SHUTDOWN)) {
                             log.info("Tez session is already shutdown "
-                                    + sessionInfo.session);
+                                    + session);
+                            System.err.println(timeStamp
+                                    + " Tez session is already shutdown " + 
session
+                                    + ", sessionName=" + 
session.getClientName()
+                                    + ", applicationId=" + 
session.getAppMasterApplicationId());
                             continue;
                         }
-                        log.info("Shutting down Tez session "
-                                + sessionInfo.session);
-                        sessionInfo.session.stop();
+                        log.info("Shutting down Tez session " + session);
+                        // Since hadoop calls 
org.apache.log4j.LogManager.shutdown();
+                        // the log.info message is not displayed with shutdown 
hook in Oozie
+                        System.err.println(timeStamp + " Shutting down Tez 
session "
+                                + ", sessionName=" + session.getClientName()
+                                + ", applicationId=" + 
session.getAppMasterApplicationId());
+                        session.stop();
                     } catch (Exception e) {
                         log.error("Error shutting down Tez session "
-                                + sessionInfo.session, e);
+                                + session, e);
                     }
                 }
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Fri Feb 24 08:19:42 2017
@@ -32,10 +32,12 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.hash.Hash;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -44,8 +46,10 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -82,7 +86,10 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
@@ -110,6 +117,7 @@ import org.apache.pig.impl.builtin.GetMe
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.Operator;
@@ -167,6 +175,10 @@ public class TezCompiler extends PhyPlan
 
     private Map<PhysicalOperator, TezOperator> phyToTezOpMap;
 
+    // Contains the inputs to operator like join, with the list maintaining the
+    // same order of join from left to right
+    private Map<TezOperator, List<TezOperator>> inputsMap;
+
     public static final String USER_COMPARATOR_MARKER = 
"user.comparator.func:";
     public static final String FILE_CONCATENATION_THRESHOLD = 
"pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = 
"pig.optimistic.files.concatenation";
@@ -175,6 +187,8 @@ public class TezCompiler extends PhyPlan
     private boolean optimisticFileConcatenation = false;
     private List<String> readOnceLoadFuncs = null;
 
+    private Configuration conf;
+
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
     public TezCompiler(PhysicalPlan plan, PigContext pigContext)
@@ -184,6 +198,7 @@ public class TezCompiler extends PhyPlan
         this.pigContext = pigContext;
 
         pigProperties = pigContext.getProperties();
+        conf = ConfigurationUtil.toConfiguration(pigProperties, false);
         splitsSeen = Maps.newHashMap();
         tezPlan = new TezOperPlan();
         nig = NodeIdGenerator.getGenerator();
@@ -197,6 +212,7 @@ public class TezCompiler extends PhyPlan
         scope = roots.get(0).getOperatorKey().getScope();
         localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig);
         phyToTezOpMap = Maps.newHashMap();
+        inputsMap = Maps.newHashMap();
 
         fileConcatenationThreshold = Integer.parseInt(pigProperties
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
@@ -655,15 +671,8 @@ public class TezCompiler extends PhyPlan
             blocking();
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), 
curTezOp);
 
-            // Add the DISTINCT plan as the combine plan. In MR Pig, the 
combiner is implemented
-            // with a global variable and a specific DistinctCombiner class. 
This seems better.
-            PhysicalPlan combinePlan = 
curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan;
-            addDistinctPlan(combinePlan, 1);
-
-            POLocalRearrangeTez clr = localRearrangeFactory.create();
-            clr.setOutputKey(curTezOp.getOperatorKey().toString());
-            clr.setDistinct(true);
-            combinePlan.addAsLeaf(clr);
+            TezEdgeDescriptor edge = 
curTezOp.inEdges.get(lastOp.getOperatorKey());
+            edge.setNeedsDistinctCombiner(true);
 
             curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
@@ -856,6 +865,7 @@ public class TezCompiler extends PhyPlan
             } else {
                 curTezOp.plan.addAsLeaf(op);
             }
+            phyToTezOpMap.put(op, curTezOp);
 
         } catch (Exception e) {
             int errCode = 2034;
@@ -900,6 +910,7 @@ public class TezCompiler extends PhyPlan
     public void visitGlobalRearrange(POGlobalRearrange op) throws 
VisitorException {
         try {
             blocking();
+            inputsMap.put(curTezOp, new 
ArrayList<>(Arrays.asList(compiledInputs)));
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), 
curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
@@ -1088,7 +1099,7 @@ public class TezCompiler extends PhyPlan
         indexerTezOp.setDontEstimateParallelism(true);
 
         POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec();
+        FileSpec strFile = getTempFileSpec(pigContext);
         st.setSFile(strFile);
         indexAggrOper.plan.addAsLeaf(st);
         indexAggrOper.setClosed(true);
@@ -1255,7 +1266,7 @@ public class TezCompiler extends PhyPlan
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec();
+                FileSpec strFile = getTempFileSpec(pigContext);
                 st.setSFile(strFile);
                 rightTezOprAggr.plan.addAsLeaf(st);
                 rightTezOprAggr.setClosed(true);
@@ -1346,6 +1357,9 @@ public class TezCompiler extends PhyPlan
                 } else if (op.getNumInps() > 1) {
                     curTezOp.markCogroup();
                 }
+            } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) 
{
+                curTezOp.markRegularJoin();
+                addBloomToJoin(op, curTezOp);
             }
         } catch (Exception e) {
             int errCode = 2034;
@@ -1354,6 +1368,132 @@ public class TezCompiler extends PhyPlan
         }
     }
 
+    private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws 
PlanException {
+
+        List<TezOperator> inputs = inputsMap.get(curTezOp);
+        TezOperator buildBloomOp;
+        List<TezOperator> applyBloomOps = new ArrayList<>();
+
+        String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, 
POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY);
+        boolean createBloomInMap = "map".equals(strategy);
+        if (!createBloomInMap && !strategy.equals("reduce")) {
+            throw new PlanException(new IllegalArgumentException(
+                    "Invalid value for "
+                            + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " -  "
+                            + strategy + ". Valid values are map and reduce"));
+        }
+        int numHash = 
conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, 
POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS);
+        int vectorSizeBytes =  
conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, 
POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES);
+        int numBloomFilters = 
POBuildBloomRearrangeTez.getNumBloomFilters(conf);
+        int hashType = 
Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, 
POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE));
+
+        // We build bloom of the right most input and apply the bloom filter 
on the left inputs by default.
+        // But in case of left outer join we build bloom of the left input and 
use it on the right input
+        boolean[] inner = op.getPkgr().getInner();
+        boolean skipNullKeys = true;
+        if (inner[inner.length - 1]) {  // inner has from right to left while 
inputs has from left to right
+            buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is 
built from right most input
+            for (int i = 0; i < (inner.length - 1); i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+            skipNullKeys = inner[0];
+        } else {
+            // Left outer join
+            skipNullKeys = false;
+            buildBloomOp = inputs.get(0); // Bloom filter is built from left 
most input
+            for (int i = 1; i < inner.length; i++) {
+                applyBloomOps.add(inputs.get(i));
+            }
+        }
+
+        // Add BuildBloom operator to the input
+        POLocalRearrangeTez lr = (POLocalRearrangeTez) 
buildBloomOp.plan.getLeaves().get(0);
+        POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, 
createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType);
+        bbr.setSkipNullKeys(skipNullKeys);
+        buildBloomOp.plan.remove(lr);
+        buildBloomOp.plan.addAsLeaf(bbr);
+
+        // Add a new reduce vertex that will construct the final bloom filter
+        //    - by combining the bloom filters from the buildBloomOp input 
tasks in the map strategy
+        //    - or directly from the keys from the buildBloomOp input tasks in 
the reduce strategy
+        TezOperator combineBloomOp = getTezOp();
+        tezPlan.add(combineBloomOp);
+        combineBloomOp.markBuildBloom();
+        // Explicitly set the parallelism for the new vertex to number of 
bloom filters.
+        // Auto parallelism will bring it down based on the actual output size
+        combineBloomOp.setEstimatedParallelism(numBloomFilters);
+        // We don't want parallelism to be changed during the run by grace 
auto parallelism
+        // It will take the whole input size and estimate way higher
+        combineBloomOp.setDontEstimateParallelism(true);
+
+        String combineBloomOpKey = combineBloomOp.getOperatorKey().toString();
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge);
+        bbr.setBloomOutputKey(combineBloomOpKey);
+
+
+        POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
+        pkg.setNumInps(1);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, 
vectorSizeBytes, numHash, hashType);;
+        pkgr.setKeyType(DataType.INTEGER);
+        pkg.setPkgr(pkgr);
+        POValueOutputTez combineBloomOutput = new 
POValueOutputTez(OperatorKey.genOpKey(scope));
+        combineBloomOp.plan.addAsLeaf(pkg);
+        combineBloomOp.plan.addAsLeaf(combineBloomOutput);
+
+        
edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+        
edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+
+        // Add combiner as well.
+        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, 
vectorSizeBytes, numHash, hashType);
+        combinerPkgr.setCombiner(true);
+        combinerPkgr.setKeyType(DataType.INTEGER);
+        pkg_c.setPkgr(combinerPkgr);
+        pkg_c.setNumInps(1);
+        edge.combinePlan.addAsLeaf(pkg_c);
+        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+        prjKey.setResultType(DataType.INTEGER);
+        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(prjKey);
+        clrInps.add(pp);
+        POLocalRearrangeTez clr = localRearrangeFactory.create(0, 
LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+        clr.setOutputKey(combineBloomOpKey);
+        edge.combinePlan.addAsLeaf(clr);
+
+        if (createBloomInMap) {
+            // No combiner needed on map as there will be only one bloom 
filter per map for each partition
+            // In the reducer, the bloom filters will be combined with same 
logic of reduce in BloomPackager
+            edge.setCombinerInMap(false);
+            edge.setCombinerInReducer(true);
+        } else {
+            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            // Do distinct of the keys on the map side to reduce data sent to 
reducers.
+            // In case of reduce, not adding a combiner and doing the distinct 
during reduce itself.
+            // If needed one can be added later
+            edge.setCombinerInMap(true);
+            edge.setCombinerInReducer(false);
+        }
+
+        // Broadcast the final bloom filter to other inputs
+        for (TezOperator applyBloomOp : applyBloomOps) {
+            applyBloomOp.markFilterBloom();
+            lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0);
+            POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, 
numBloomFilters);
+            applyBloomOp.plan.remove(lr);
+            applyBloomOp.plan.addAsLeaf(bfr);
+            bfr.setInputKey(combineBloomOpKey);
+            edge = new TezEdgeDescriptor();
+            
edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            
edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+            TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, 
edge);
+            
combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString());
+        }
+
+    }
+
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{
@@ -1513,7 +1653,7 @@ public class TezCompiler extends PhyPlan
 
             for (int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(true);
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
             }
 
             // This foreach will pick the sort key columns from the 
POPoissonSample output
@@ -1722,7 +1862,7 @@ public class TezCompiler extends PhyPlan
      * @return
      * @throws IOException
      */
-    private FileSpec getTempFileSpec() throws IOException {
+    public static FileSpec getTempFileSpec(PigContext pigContext) throws 
IOException {
         return new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
 Fri Feb 24 08:19:42 2017
@@ -31,8 +31,13 @@ import org.apache.tez.runtime.library.ou
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
 public class TezEdgeDescriptor implements Serializable {
-    // Combiner runs on both input and output of Tez edge.
-    transient public PhysicalPlan combinePlan;
+
+    public transient PhysicalPlan combinePlan;
+    private boolean needsDistinctCombiner;
+    // Combiner runs on both input and output of Tez edge by default
+    // It can be configured to run only in output(map) or input(reduce)
+    private Boolean combinerInMap;
+    private Boolean combinerInReducer;
 
     public String inputClassName;
     public String outputClassName;
@@ -65,6 +70,30 @@ public class TezEdgeDescriptor implement
         dataMovementType = DataMovementType.SCATTER_GATHER;
     }
 
+    public boolean needsDistinctCombiner() {
+        return needsDistinctCombiner;
+    }
+
+    public void setNeedsDistinctCombiner(boolean nic) {
+        needsDistinctCombiner = nic;
+    }
+
+    public Boolean getCombinerInMap() {
+        return combinerInMap;
+    }
+
+    public void setCombinerInMap(Boolean combinerInMap) {
+        this.combinerInMap = combinerInMap;
+    }
+
+    public Boolean getCombinerInReducer() {
+        return combinerInReducer;
+    }
+
+    public void setCombinerInReducer(Boolean combinerInReducer) {
+        this.combinerInReducer = combinerInReducer;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
 Fri Feb 24 08:19:42 2017
@@ -25,8 +25,9 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -217,8 +218,12 @@ public class TezOperPlan extends Operato
             newPlan.add(node);
         }
 
-        Set<Pair<TezOperator, TezOperator>> toReconnect = new 
HashSet<Pair<TezOperator, TezOperator>>();
-        for (TezOperator from : mFromEdges.keySet()) {
+        // Using a LinkedHashSet and doing a sort so that
+        // test plan printed remains same between jdk7 and jdk8
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new 
LinkedHashSet<Pair<TezOperator, TezOperator>>();
+        List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet());
+        Collections.sort(fromEdges);
+        for (TezOperator from : fromEdges) {
             List<TezOperator> tos = mFromEdges.get(from);
             for (TezOperator to : tos) {
                 if (list.contains(from) || list.contains(to)) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Fri Feb 24 08:19:42 2017
@@ -181,7 +181,11 @@ public class TezOperator extends Operato
         // Indicate if this job is a native job
         NATIVE,
         // Indicate if this job does rank counter
-        RANK_COUNTER;
+        RANK_COUNTER,
+        // Indicate if this job constructs bloom filter
+        BUILDBLOOM,
+        // Indicate if this job applies bloom filter
+        FILTERBLOOM;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -235,6 +239,7 @@ public class TezOperator extends Operato
     }
 
     private LoaderInfo loaderInfo = new LoaderInfo();
+    private long totalInputFilesSize = -1;
 
     public TezOperator(OperatorKey k) {
         super(k);
@@ -452,6 +457,22 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
     }
 
+    public boolean isBuildBloom() {
+        return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public void markBuildBloom() {
+        feature.set(OPER_FEATURE.BUILDBLOOM.ordinal());
+    }
+
+    public boolean isFilterBloom() {
+        return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
+    public void markFilterBloom() {
+        feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> 
excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
@@ -651,6 +672,14 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
+    public long getTotalInputFilesSize() {
+        return totalInputFilesSize;
+    }
+
+    public void setTotalInputFilesSize(long totalInputFilesSize) {
+        this.totalInputFilesSize = totalInputFilesSize;
+    }
+
     public void setUseGraceParallelism(boolean useGraceParallelism) {
         this.useGraceParallelism = useGraceParallelism;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
 Fri Feb 24 08:19:42 2017
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -161,7 +162,7 @@ public class TezPOPackageAnnotator exten
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws 
VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
-            if (!(lr.isConnectedToPackage() && 
lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) {
+            if (!(lr.isConnectedToPackage() && 
lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) {
                 return;
             }
             loRearrangeFound++;
@@ -180,7 +181,9 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, 
Integer>>>();
 
-            Integer index = Integer.valueOf(lrearrange.getIndex());
+            // For BloomPackager there is only one input, but the
+            // POBuildBloomRearrangeTez index is that of the join's index and 
can be non-zero
+            Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : 
Integer.valueOf(lrearrange.getIndex());
             if(keyInfo.get(index) != null) {
                 if (isPOSplit) {
                     // Case of POSplit having more than one input in case of 
self join or union
@@ -197,12 +200,20 @@ public class TezPOPackageAnnotator exten
 
             }
 
-            keyInfo.put(index,
-                    new Pair<Boolean, Map<Integer, Integer>>(
-                            lrearrange.isProjectStar(), 
lrearrange.getProjectedColsMap()));
-            pkg.getPkgr().setKeyInfo(keyInfo);
-            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
-            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            if (pkg.getPkgr() instanceof BloomPackager ) {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                Boolean.FALSE, new HashMap<Integer, 
Integer>()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+            } else {
+                keyInfo.put(index,
+                        new Pair<Boolean, Map<Integer, Integer>>(
+                                lrearrange.isProjectStar(), 
lrearrange.getProjectedColsMap()));
+                pkg.getPkgr().setKeyInfo(keyInfo);
+                pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+                pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
+            }
+
         }
 
         /**


Reply via email to