Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
 Tue Jan 27 02:27:45 2015
@@ -18,14 +18,18 @@
 
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
+import java.util.Properties;
 import java.util.UUID;
 
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.dag.api.TezConfiguration;
 
 public class TezExecutionEngine extends HExecutionEngine {
 
@@ -45,4 +49,11 @@ public class TezExecutionEngine extends
     public PigStats instantiatePigStats() {
         return new TezPigScriptStats(pigContext);
     }
+
+    @Override
+    public JobConf getExecConf(Properties properties) throws ExecException {
+        JobConf jc = super.getExecConf(properties);
+        jc.addResource(TezConfiguration.TEZ_SITE_XML);
+        return jc;
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 Tue Jan 27 02:27:45 2015
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -177,13 +176,18 @@ public class TezJob implements Runnable
 
         while (true) {
             try {
-                dagStatus = dagClient.getDAGStatus(statusGetOpts);
+                dagStatus = dagClient.getDAGStatus(null);
             } catch (Exception e) {
                 log.info("Cannot retrieve DAG status", e);
                 break;
             }
 
             if (dagStatus.isCompleted()) {
+                try {
+                    dagStatus = dagClient.getDAGStatus(statusGetOpts);
+                } catch (Exception e) {
+                    log.warn("Failed to retrieve DAG counters", e);
+                }
                 // For tez_local mode where PigProcessor destroys all 
UDFContext
                 UDFContext.setUdfContext(udfContext);
 
@@ -219,16 +223,10 @@ public class TezJob implements Runnable
 
     private class DAGStatusReporter extends TimerTask {
 
-        private final String LINE_SEPARATOR = 
System.getProperty("line.separator");
-
         @Override
         public void run() {
             if (dagStatus == null) return;
-            String msg = "status=" + dagStatus.getState()
-              + ", progress=" + dagStatus.getDAGProgress()
-              + ", diagnostics="
-              + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
-            log.info("DAG Status: " + msg);
+            log.info("DAG Status: " + dagStatus.toString());
         }
     }
 
@@ -248,7 +246,7 @@ public class TezJob implements Runnable
     public String getDiagnostics() {
         try {
             if (dagClient != null && dagStatus == null) {
-                dagStatus = dagClient.getDAGStatus(new 
HashSet<StatusGetOpts>());
+                dagStatus = dagClient.getDAGStatus(null);
             }
             if (dagStatus != null) {
                 return StringUtils.join(dagStatus.getDiagnostics(), "\n");

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Tue Jan 27 02:27:45 2015
@@ -75,6 +75,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -95,18 +96,24 @@ public class TezLauncher extends Launche
         if (namedThreadFactory == null) {
             namedThreadFactory = new ThreadFactoryBuilder()
                 .setNameFormat("PigTezLauncher-%d")
+                .setDaemon(true)
                 .setUncaughtExceptionHandler(new 
JobControlThreadExceptionHandler())
                 .build();
         }
-        executor = Executors.newSingleThreadExecutor(namedThreadFactory);
     }
 
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) 
throws Exception {
+        synchronized (this) {
+            if (executor == null) {
+                executor = 
Executors.newSingleThreadExecutor(namedThreadFactory);
+            }
+        }
         if (pc.getExecType().isLocal()) {
             pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, 
"true");
             
pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
 "true");
             pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
+            pc.getProperties().setProperty("tez.am.dag.scheduler.class", 
DAGSchedulerNaturalOrderControlled.class.getName());
         }
         Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         if (pc.defaultParallel == -1 && 
!conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Tue Jan 27 02:27:45 2015
@@ -297,9 +297,11 @@ public class TezCompiler extends PhyPlan
                         userFunc.getInputs().remove(1);
                     }
 
-                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, 
from, tezOp);
-                    //TODO shared edge once support is available in Tez
-                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+                    if (tezPlan.getPredecessors(tezOp)==null || 
!tezPlan.getPredecessors(tezOp).contains(from)) {
+                        TezEdgeDescriptor edge = 
TezCompilerUtil.connect(tezPlan, from, tezOp);
+                        //TODO shared edge once support is available in Tez
+                        TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+                    }
                 }
             }
         }
@@ -366,7 +368,7 @@ public class TezCompiler extends PhyPlan
                     storeOnlyPhyPlan.addAsLeaf(store);
                     storeOnlyTezOperator.plan = storeOnlyPhyPlan;
                     tezPlan.add(storeOnlyTezOperator);
-                    phyToTezOpMap.put(store, storeOnlyTezOperator);
+                    phyToTezOpMap.put(p, storeOnlyTezOperator);
 
                     // Create new operator as second splittee
                     curTezOp = getTezOp();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Tue Jan 27 02:27:45 2015
@@ -608,6 +608,7 @@ public class TezOperator extends Operato
         private POStore store;
         private OutputDescriptor storeOutDescriptor;
         private VertexGroup vertexGroup;
+        private FileSpec sFile;
 
         public VertexGroupInfo() {
         }
@@ -659,6 +660,13 @@ public class TezOperator extends Operato
             this.vertexGroup = vertexGroup;
         }
 
+        public void setSFile(FileSpec sFile) {
+            this.sFile = sFile;
+        }
+
+        public FileSpec getSFile() {
+            return sFile;
+        }
     }
 }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
 Tue Jan 27 02:27:45 2015
@@ -18,8 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
 import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map.Entry;
 
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -42,7 +43,7 @@ public class TezPrinter extends TezOpPla
      * @param plan tez plan to print
      */
     public TezPrinter(PrintStream ps, TezOperPlan plan) {
-        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, 
true));
         mStream = ps;
     }
 
@@ -62,13 +63,16 @@ public class TezPrinter extends TezOpPla
             mStream.println("Tez vertex " + 
tezOper.getOperatorKey().toString());
         }
         if (tezOper.inEdges.size() > 0) {
-            for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : 
tezOper.inEdges.entrySet()) {
+            List<OperatorKey> inEdges = new 
ArrayList<OperatorKey>(tezOper.inEdges.keySet());
+            Collections.sort(inEdges);
+            for (OperatorKey inEdge : inEdges) {
                 //TODO: Print other edge properties like custom partitioner
-                if (!inEdge.getValue().combinePlan.isEmpty()) {
-                    mStream.println("# Combine plan on edge <" + 
inEdge.getKey() + ">");
+                TezEdgeDescriptor edgeDesc = tezOper.inEdges.get(inEdge);
+                if (!edgeDesc.combinePlan.isEmpty()) {
+                    mStream.println("# Combine plan on edge <" + inEdge + ">");
                     PlanPrinter<PhysicalOperator, PhysicalPlan> printer =
                             new PlanPrinter<PhysicalOperator, PhysicalPlan>(
-                                    inEdge.getValue().combinePlan, mStream);
+                                    edgeDesc.combinePlan, mStream);
                     printer.setVerbose(isVerbose);
                     printer.visit();
                     mStream.println();
@@ -93,7 +97,7 @@ public class TezPrinter extends TezOpPla
         StringBuffer buf;
 
         public TezGraphPrinter(TezOperPlan plan) {
-            super(plan, new DependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+            super(plan, new DependencyOrderWalker<TezOperator, 
TezOperPlan>(plan, true));
             buf = new StringBuffer();
         }
 
@@ -106,6 +110,7 @@ public class TezPrinter extends TezOpPla
             }
             List<TezOperator> succs = mPlan.getSuccessors(tezOper);
             if (succs != null) {
+                Collections.sort(succs);
                 buf.append("\t->\t");
                 for (TezOperator op : succs) {
                     if (op.isVertexGroup()) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 Tue Jan 27 02:27:45 2015
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -160,6 +161,8 @@ public class POShuffledValueInputTez ext
 
     @Override
     public String name() {
-        return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + 
inputKeys;
+        List<String> inputKeyList = new ArrayList<String>(inputKeys);
+        Collections.sort(inputKeyList);
+        return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + 
inputKeyList;
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
 Tue Jan 27 02:27:45 2015
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -189,7 +190,9 @@ public class POValueOutputTez extends Ph
 
     @Override
     public String name() {
-        return "POValueOutputTez - " + mKey.toString() + "\t->\t " + 
outputKeys;
+        List<String> outputKeyList = new ArrayList<String>(outputKeys);
+        Collections.sort(outputKeyList);
+        return "POValueOutputTez - " + mKey.toString() + "\t->\t " + 
outputKeyList;
     }
 
     public static class EmptyWritable implements Writable {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Tue Jan 27 02:27:45 2015
@@ -18,8 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
@@ -54,7 +56,6 @@ public class MultiQueryOptimizerTez exte
             List<TezOperator> splittees = new ArrayList<TezOperator>();
 
             List<TezOperator> successors = getPlan().getSuccessors(tezOp);
-            List<TezOperator> succ_successors = new ArrayList<TezOperator>();
             for (TezOperator successor : successors) {
 
                 // If has other dependency, don't merge into split,
@@ -65,20 +66,21 @@ public class MultiQueryOptimizerTez exte
                 // Detect diamond shape, we cannot merge it into split, since 
Tez
                 // does not handle double edge between vertexes
                 // TODO: PIG-3876 to handle this by writing to same edge
-                boolean sharedSucc = false;
-                if (getPlan().getSuccessors(successor)!=null) {
-                    for (TezOperator succ_successor : 
getPlan().getSuccessors(successor)) {
-                        if (succ_successors.contains(succ_successor)) {
-                            sharedSucc = true;
-                            break;
-                        }
+                Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
+                Set<TezOperator> toMergeSuccessors = new 
HashSet<TezOperator>();
+                mergedSuccessors.addAll(successors);
+                for (TezOperator splittee : splittees) {
+                    if (getPlan().getSuccessors(splittee) != null) {
+                        
mergedSuccessors.addAll(getPlan().getSuccessors(splittee));
                     }
-                    succ_successors.addAll(getPlan().getSuccessors(successor));
                 }
-                if (sharedSucc) {
-                    continue;
+                if (getPlan().getSuccessors(successor) != null) {
+                    
toMergeSuccessors.addAll(getPlan().getSuccessors(successor));
+                }
+                mergedSuccessors.retainAll(toMergeSuccessors);
+                if (mergedSuccessors.isEmpty()) { // no shared edge after merge
+                    splittees.add(successor);
                 }
-                splittees.add(successor);
             }
 
             if (splittees.size()==0) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Tue Jan 27 02:27:45 2015
@@ -35,9 +35,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
+import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -97,11 +97,28 @@ public class UnionOptimizer extends TezO
         // Union followed by Split followed by Store could have multiple stores
         List<POStoreTez> unionStoreOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
         TezOperator[] storeVertexGroupOps = new 
TezOperator[unionStoreOutputs.size()];
+        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
+        // Create a copy as disconnect while iterating modifies the original 
list
+        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
+        
         for (int i=0; i < storeVertexGroupOps.length; i++) {
-            storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
-            storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
-            
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
-            tezPlan.add(storeVertexGroupOps[i]);
+            TezOperator existingVertexGroup = null;
+            if (successors != null) {
+                for (TezOperator succ : successors) {
+                    if (succ.isVertexGroup() && 
succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile()))
 {
+                        existingVertexGroup = succ;
+                    }
+                }
+            }
+            if (existingVertexGroup != null) {
+                storeVertexGroupOps[i] = existingVertexGroup;
+            } else {
+                storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
+                storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
+                
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
+                
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+                tezPlan.add(storeVertexGroupOps[i]);
+            }
         }
 
         // Case of split, orderby, skewed join, rank, etc will have multiple 
outputs
@@ -182,7 +199,6 @@ public class UnionOptimizer extends TezO
                 tezPlan.disconnect(pred, unionOp);
             }
 
-            List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
             List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
             for (TezOutput tezOutput : unionOutputs) {
                 if (tezOutput instanceof POValueOutputTez) {
@@ -243,9 +259,6 @@ public class UnionOptimizer extends TezO
             throw new VisitorException(e);
         }
 
-        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
-        // Create a copy as disconnect while iterating modifies the original 
list
-        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
         if (successors != null) {
             // Successor inputs should now point to the vertex groups.
             for (TezOperator succ : successors) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ObjectCache.java
 Tue Jan 27 02:27:45 2015
@@ -21,15 +21,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.runtime.api.ObjectRegistry;
-import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 
[email protected]
[email protected]
 public class ObjectCache {
 
     private static final Log LOG = LogFactory.getLog(ObjectCache.class);
-    private final ObjectRegistry registry = new ObjectRegistryImpl();
     private static ObjectCache cache = new ObjectCache();
 
+    private ObjectRegistry registry;
+
     private ObjectCache() {
     }
 
@@ -37,11 +37,34 @@ public class ObjectCache {
         return cache;
     }
 
+    /**
+     * Returns the tez ObjectRegistry which allows caching of objects at the
+     * Session, DAG and Vertex level on container reuse for better performance
+     * and savings
+     */
+    public ObjectRegistry getObjectRegistry() {
+        return registry;
+    }
+
+    /**
+     * For internal use only. This method to be called only by PigProcessor
+     */
+    @InterfaceAudience.Private
+    void setObjectRegistry(ObjectRegistry registry) {
+        this.registry = registry;
+    }
+
+    /**
+     * Convenience method to cache objects in ObjectRegistry for a vertex
+     */
     public void cache(String key, Object value) {
       LOG.info("Adding " + key + " to cache");
       registry.cacheForVertex(key, value);
     }
 
+    /**
+     * Convenience method to retrieve objects cached for the vertex from 
ObjectRegistry
+     */
     public Object retrieve(String key) {
       Object o = registry.get(key);
       if (o != null) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Tue Jan 27 02:27:45 2015
@@ -103,6 +103,7 @@ public class PigProcessor extends Abstra
 
     public PigProcessor(ProcessorContext context) {
         super(context);
+        
ObjectCache.getInstance().setObjectRegistry(context.getObjectRegistry());
     }
 
     @SuppressWarnings("unchecked")

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Tue Jan 27 02:27:45 2015
@@ -37,7 +37,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.RoundRobinPartitioner;
+import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
 Tue Jan 27 02:27:45 2015
@@ -472,28 +472,38 @@ public class MapRedUtil {
                 result.add(combinedSplits);
                 resultLengths.add(split.getLength());
             } else {
-                ComparableSplit csplit = new ComparableSplit(split, 
comparableSplitId++);
                 String[] locations = split.getLocations();
-                // sort the locations to stabilize the number of maps: PIG-1757
-                Arrays.sort(locations);
-                HashSet<String> locationSeen = new HashSet<String>();
-                for (String location : locations)
-                {
-                    if (!locationSeen.contains(location))
+                if (locations.length == 0) {
+                    // This split is missing blocks, or the split returned bad 
locations.
+                    // Don't try to combine.
+                    comparableSplitId++;
+                    ArrayList<InputSplit> combinedSplits = new 
ArrayList<InputSplit>();
+                    combinedSplits.add(split);
+                    result.add(combinedSplits);
+                    resultLengths.add(split.getLength());
+                } else {
+                    ComparableSplit csplit = new ComparableSplit(split, 
comparableSplitId++);
+                    // sort the locations to stabilize the number of maps: 
PIG-1757
+                    Arrays.sort(locations);
+                    HashSet<String> locationSeen = new HashSet<String>();
+                    for (String location : locations)
                     {
-                        Node node = nodeMap.get(location);
-                        if (node == null) {
-                            node = new Node();
-                            nodes.add(node);
-                            nodeMap.put(location, node);
+                        if (!locationSeen.contains(location))
+                        {
+                            Node node = nodeMap.get(location);
+                            if (node == null) {
+                                node = new Node();
+                                nodes.add(node);
+                                nodeMap.put(location, node);
+                            }
+                            node.add(csplit);
+                            csplit.add(node);
+                            locationSeen.add(location);
                         }
-                        node.add(csplit);
-                        csplit.add(node);
-                        locationSeen.add(location);
                     }
+                    lastSplit = split;
+                    size++;
                 }
-                lastSplit = split;
-                size++;
             }
         }
         /* verification code: debug purpose

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
Tue Jan 27 02:27:45 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -178,6 +179,8 @@ public class HBaseStorage extends LoadFu
     private final long minTimestamp_;
     private final long maxTimestamp_;
     private final long timestamp_;
+    private boolean includeTimestamp_;
+    private boolean includeTombstone_;
 
     protected transient byte[] gt_;
     protected transient byte[] gte_;
@@ -211,6 +214,8 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("minTimestamp", true, "Record must have 
timestamp greater or equal to this value");
         validOptions_.addOption("maxTimestamp", true, "Record must have 
timestamp less then this value");
         validOptions_.addOption("timestamp", true, "Record must have timestamp 
equal to this value");
+        validOptions_.addOption("includeTimestamp", false, "Record will 
include the timestamp after the rowkey on store (rowkey, timestamp, ...)");
+        validOptions_.addOption("includeTombstone", false, "Record will 
include a tombstone marker on store after the rowKey and timestamp (if 
included) (rowkey, [timestamp,] tombstone, ...)");
     }
 
     /**
@@ -254,6 +259,8 @@ public class HBaseStorage extends LoadFu
      * <li>-minTimestamp= Scan's timestamp for min timeRange
      * <li>-maxTimestamp= Scan's timestamp for max timeRange
      * <li>-timestamp= Scan's specified timestamp
+     * <li>-includeTimestamp= Record will include the timestamp after the 
rowkey on store (rowkey, timestamp, ...)
+     * <li>-includeTombstone= Record will include a tombstone marker on store 
after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, 
...)
      * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) 
Utf8StorageConverter is the default
      * To be used with extreme caution, since this could result in data loss
      * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
@@ -268,7 +275,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", 
validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] 
[-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp] [-includeTombstone]", validOptions_ );
             throw e;
         }
 
@@ -343,6 +350,22 @@ public class HBaseStorage extends LoadFu
             timestamp_ = 0;
         }
 
+        includeTimestamp_ = false;
+        if (configuredOptions_.hasOption("includeTimestamp")) {
+            String value = 
configuredOptions_.getOptionValue("includeTimestamp");
+            if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) 
|| value == null ) {//the empty string and null check is for backward compat.
+                includeTimestamp_ = true;
+            }
+        }
+
+        includeTombstone_ = false;
+        if (configuredOptions_.hasOption("includeTombstone")) {
+            String value = 
configuredOptions_.getOptionValue("includeTombstone");
+            if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) 
|| value == null ) {
+                includeTombstone_ = true;
+            }
+        }
+
         initScan();
     }
 
@@ -930,7 +953,41 @@ public class HBaseStorage extends LoadFu
     public void putNext(Tuple t) throws IOException {
         ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : 
schema_.getFields();
         byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : 
fieldSchemas[0].getType();
-        long ts=System.currentTimeMillis();
+        long ts;
+
+        int startIndex=1;
+        if (includeTimestamp_) {
+            byte timestampType = (fieldSchemas == null) ? 
DataType.findType(t.get(startIndex)) : fieldSchemas[startIndex].getType();
+            LoadStoreCaster caster = (LoadStoreCaster) caster_;
+
+            switch (timestampType) {
+            case DataType.BYTEARRAY: ts = 
caster.bytesToLong(((DataByteArray)t.get(startIndex)).get()); break;
+            case DataType.LONG: ts = ((Long)t.get(startIndex)).longValue(); 
break;
+            case DataType.DATETIME: ts = 
((DateTime)t.get(startIndex)).getMillis(); break;
+            default: throw new IOException("Unable to find a converter for 
timestamp field " + t.get(startIndex));
+            }
+
+            startIndex++;
+        } else {
+            ts = System.currentTimeMillis();
+        }
+
+        // check for deletes
+        if (includeTombstone_) {
+            if (((Boolean)t.get(startIndex)).booleanValue()) {
+                Delete delete = createDelete(t.get(0), type, ts);
+                try {
+                    // this is a delete so there will be
+                    // no put and we are done here
+                    writer.write(null, delete);
+                    return;
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            startIndex++;
+        }
 
         Put put = createPut(t.get(0), type);
 
@@ -941,8 +998,8 @@ public class HBaseStorage extends LoadFu
             }
         }
 
-        for (int i=1;i<t.size();++i){
-            ColumnInfo columnInfo = columnInfo_.get(i-1);
+        for (int i=startIndex;i<t.size();++i){
+            ColumnInfo columnInfo = columnInfo_.get(i-startIndex);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
                         ", cf:column=" + columnInfo);
@@ -979,6 +1036,25 @@ public class HBaseStorage extends LoadFu
     }
 
     /**
+     * Public method to initialize a Delete.
+     *
+     * @param key
+     * @param type
+     * @param timestamp
+     * @return new delete
+     * @throws IOException
+     */
+    public Delete createDelete(Object key, byte type, long timestamp) throws 
IOException {
+        Delete delete = new Delete(objToBytes(key, type), timestamp);
+
+        if(noWAL_) {
+            delete.setWriteToWAL(false);
+        }
+
+        return delete;
+    }
+
+    /**
      * Public method to initialize a Put. Used to allow assertions of how Puts
      * are initialized by unit tests.
      *

Modified: pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/JsonLoader.java Tue Jan 27 
02:27:45 2015
@@ -54,10 +54,8 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
-import org.apache.pig.parser.ParserException;
 
 /**
  * A loader for data stored using {@link JsonStorage}.  This is not a generic
@@ -171,7 +169,7 @@ public class JsonLoader extends LoadFunc
                 return t;
             }
             
-        } catch (JsonParseException jpe) {
+        } catch (Exception jpe) {
             warn("Bad record, returning null for " + val, 
PigWarning.UDF_WARNING_1);
         } finally {
             p.close();
@@ -180,6 +178,52 @@ public class JsonLoader extends LoadFunc
         return t;
     }
 
+    private Object readPrimitive(JsonParser p, JsonToken tok, 
ResourceFieldSchema field) throws IOException {
+
+        if (tok == JsonToken.VALUE_NULL) return null;
+
+        switch(field.getType()) {
+            // Read based on our expected type
+            case DataType.BOOLEAN:
+                return p.getBooleanValue();
+    
+            case DataType.INTEGER:
+                return p.getIntValue();
+    
+            case DataType.LONG:
+                return p.getLongValue();
+    
+            case DataType.FLOAT:
+                return p.getFloatValue();
+    
+            case DataType.DOUBLE:
+                return p.getDoubleValue();
+    
+            case DataType.DATETIME:
+                DateTimeFormatter formatter = 
ISODateTimeFormat.dateTimeParser();
+                return formatter.withOffsetParsed().parseDateTime(p.getText());
+    
+            case DataType.BYTEARRAY:
+                byte[] b = p.getText().getBytes();
+                // Use the DBA constructor that copies the bytes so that we own
+                // the memory
+                return new DataByteArray(b, 0, b.length);
+    
+            case DataType.CHARARRAY:
+                return p.getText();
+    
+            case DataType.BIGINTEGER:
+                return p.getBigIntegerValue();
+    
+            case DataType.BIGDECIMAL:
+                return new BigDecimal(p.getText());
+    
+            default:
+                throw new IOException("Unknown type in input schema: " +
+                        field.getType() );
+        }
+    }
+    
     private Object readField(JsonParser p,
                              ResourceFieldSchema field,
                              int fieldnum) throws IOException {
@@ -193,67 +237,14 @@ public class JsonLoader extends LoadFunc
 
         // Check to see if this value was null
         if (tok == JsonToken.VALUE_NULL) return null;
+        
+        tok = p.nextToken();
 
         // Read based on our expected type
         switch (field.getType()) {
-        case DataType.BOOLEAN:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getBooleanValue();
-
-        case DataType.INTEGER:
-            // Read the field name
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getIntValue();
-
-        case DataType.LONG:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getLongValue();
-
-        case DataType.FLOAT:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getFloatValue();
-
-        case DataType.DOUBLE:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getDoubleValue();
-
-        case DataType.DATETIME:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser();
-            return formatter.withOffsetParsed().parseDateTime(p.getText());
-
-        case DataType.BYTEARRAY:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            byte[] b = p.getText().getBytes();
-            // Use the DBA constructor that copies the bytes so that we own
-            // the memory
-            return new DataByteArray(b, 0, b.length);
-
-        case DataType.CHARARRAY:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getText();
-
-        case DataType.BIGINTEGER:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return p.getBigIntegerValue();
-
-        case DataType.BIGDECIMAL:
-            tok = p.nextToken();
-            if (tok == JsonToken.VALUE_NULL) return null;
-            return new BigDecimal(p.getText());
-
         case DataType.MAP:
             // Should be a start of the map object
-            if (p.nextToken() != JsonToken.START_OBJECT) {
+            if (tok != JsonToken.START_OBJECT) {
                 warn("Bad map field, could not find start of object, field "
                     + fieldnum, PigWarning.UDF_WARNING_1);
                 return null;
@@ -267,7 +258,7 @@ public class JsonLoader extends LoadFunc
             return m;
 
         case DataType.TUPLE:
-            if (p.nextToken() != JsonToken.START_OBJECT) {
+            if (tok != JsonToken.START_OBJECT) {
                 warn("Bad tuple field, could not find start of object, "
                     + "field " + fieldnum, PigWarning.UDF_WARNING_1);
                 return null;
@@ -289,7 +280,7 @@ public class JsonLoader extends LoadFunc
             return t;
 
         case DataType.BAG:
-            if (p.nextToken() != JsonToken.START_ARRAY) {
+            if (tok != JsonToken.START_ARRAY) {
                 warn("Bad bag field, could not find start of array, "
                     + "field " + fieldnum, PigWarning.UDF_WARNING_1);
                 return null;
@@ -305,28 +296,29 @@ public class JsonLoader extends LoadFunc
 
             JsonToken innerTok;
             while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
-                if (innerTok != JsonToken.START_OBJECT) {
-                    warn("Bad bag tuple field, could not find start of "
-                        + "object, field " + fieldnum, 
PigWarning.UDF_WARNING_1);
-                    return null;
-                }
-
                 t = tupleFactory.newTuple(fs.length);
-                for (int j = 0; j < fs.length; j++) {
-                    t.set(j, readField(p, fs[j], j));
+                if (innerTok == JsonToken.START_OBJECT) {
+                    for (int j = 0; j < fs.length; j++) {
+                        t.set(j, readField(p, fs[j], j));
+                    }
+
+                    if (p.nextToken() != JsonToken.END_OBJECT) {
+                        warn("Bad bag tuple field, could not find end of "
+                             + "object, field " + fieldnum, 
PigWarning.UDF_WARNING_1);
+                        return null;
+                    }
+                    bag.add(t);
+                } else {
+
+                    // handle array of kind [ primitive, primitive ... ]
+                    t.set(0, readPrimitive(p, innerTok, fs[0]));
+                    bag.add(t);
                 }
-
-                if (p.nextToken() != JsonToken.END_OBJECT) {
-                    warn("Bad bag tuple field, could not find end of "
-                        + "object, field " + fieldnum, 
PigWarning.UDF_WARNING_1);
-                    return null;
-                }
-                bag.add(t);
             }
             return bag;
+
         default:
-            throw new IOException("Unknown type in input schema: " +
-                field.getType());
+            return readPrimitive(p, tok, field);
         }
 
     }

Modified: pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RollupDimensions.java Tue Jan 
27 02:27:45 2015
@@ -47,6 +47,10 @@ public class RollupDimensions extends Ev
     private static BagFactory bf = BagFactory.getInstance();
     private static TupleFactory tf = TupleFactory.getInstance();
     private final String allMarker;
+    // the pivot position
+    private int pivot = -1;
+    // to check if rollup is optimized or not
+    private boolean rollupHIIoptimizable = false;
 
     public RollupDimensions() {
        this(null);
@@ -57,6 +61,18 @@ public class RollupDimensions extends Ev
        this.allMarker = allMarker;
     }
 
+    public void setRollupHIIOptimizable(boolean check) {
+        this.rollupHIIoptimizable = check;
+    }
+
+    public boolean getRollupHIIOptimizable() {
+        return this.rollupHIIoptimizable;
+    }
+
+    public void setPivot(int pvt) throws IOException {
+        this.pivot = pvt;
+    }
+
     @Override
     public DataBag exec(Tuple tuple) throws IOException {
        List<Tuple> result = Lists.newArrayListWithCapacity(tuple.size() + 1);
@@ -66,12 +82,32 @@ public class RollupDimensions extends Ev
        return bf.newDefaultBag(result);
     }
 
-    private void iterativelyRollup(List<Tuple> result, Tuple input) throws 
ExecException {
-       Tuple tempTup = tf.newTuple(input.getAll());
-       for (int i = input.size() - 1; i >= 0; i--) {
-           tempTup.set(i, allMarker);
-           result.add(tf.newTuple(tempTup.getAll()));
-       }
+    private void iterativelyRollup(List<Tuple> result, Tuple input)
+            throws IOException {
+
+        Tuple tempTup = tf.newTuple(input.getAll());
+
+        //if (this.rollupHIIoptimizable != null) { // rule is enabled
+            if (this.rollupHIIoptimizable == true) {
+                if (this.pivot == -1) // user did not specify the pivot 
position
+                                      // --> IRG approach
+                    return;
+                else { // user did specify the pivot position --> IRG + IRG
+                    if (this.pivot == 0) // we use the IRG approach
+                        return;
+                    else { // we use IRG+IRG approach
+                        for (int i = this.pivot - 1; i < input.size(); i++)
+                            tempTup.set(i, allMarker);
+                        result.add(tf.newTuple(tempTup.getAll()));
+                    }
+                }
+            }
+            else { // we can not optimize --> Vanilla approach
+            for (int i = input.size() - 1; i >= 0; i--) {
+                tempTup.set(i, allMarker);
+                result.add(tf.newTuple(tempTup.getAll()));
+            }
+        }
     }
 
     @Override

Modified: 
pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Utf8StorageConverter.java Tue 
Jan 27 02:27:45 2015
@@ -196,6 +196,7 @@ public class Utf8StorageConverter implem
 
     private Map<String, Object> consumeMap(PushbackInputStream in, 
ResourceFieldSchema fieldSchema) throws IOException {
         int buf;
+        boolean emptyMap = true;
 
         while ((buf=in.read())!='[') {
             if (buf==-1) {
@@ -207,9 +208,14 @@ public class Utf8StorageConverter implem
         while (true) {
             // Read key (assume key can not contains special character such as 
#, (, [, {, }, ], )
             while ((buf=in.read())!='#') {
+                // end of map
+                if (emptyMap && buf==']') {
+                    return m;
+                }
                 if (buf==-1) {
                     throw new IOException("Unexpect end of map");
                 }
+                emptyMap = false;
                 mOut.write(buf);
             }
             String key = bytesToCharArray(mOut.toByteArray());

Modified: pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/mock/Storage.java Tue Jan 27 
02:27:45 2015
@@ -459,7 +459,7 @@ private MockRecordWriter mockRecordWrite
 
   @Override
   public void putNext(Tuple t) throws IOException {
-      mockRecordWriter.dataBeingWritten.add(t);
+      mockRecordWriter.dataBeingWritten.add(TF.newTuple(t.getAll()));
   }
 
   @Override
@@ -648,6 +648,10 @@ private MockRecordWriter mockRecordWrite
     @Override
     public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext 
arg0) throws IOException,
     InterruptedException {
+      if (arg0.getConfiguration().get("mapreduce.output.basename")!=null) {
+          return new 
MockRecordWriter(arg0.getConfiguration().get("mapreduce.output.basename") + "-" 
+
+                  arg0.getTaskAttemptID().getTaskID().getId());
+      }
       return new MockRecordWriter(getUniqueFile(arg0, "part", ".mock"));
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/DependencyOrderWalker.java 
Tue Jan 27 02:27:45 2015
@@ -19,6 +19,7 @@ package org.apache.pig.impl.plan;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -37,18 +38,31 @@ import org.apache.pig.impl.util.Utils;
 public class DependencyOrderWalker <O extends Operator, P extends 
OperatorPlan<O>>
     extends PlanWalker<O, P> {
 
+    private final boolean walkLeavesInOrder;
+
     /**
      * @param plan Plan for this walker to traverse.
      */
     public DependencyOrderWalker(P plan) {
+        this(plan, false);
+    }
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     * @param boolean walkLeavesInOrder Sort the leaves before walking
+     */
+    public DependencyOrderWalker(P plan, boolean walkLeavesInOrder) {
         super(plan);
+        this.walkLeavesInOrder = walkLeavesInOrder;
     }
 
+
     /**
      * Begin traversing the graph.
      * @param visitor Visitor this walker is being used by.
      * @throws VisitorException if an error is encountered while walking.
      */
+    @Override
     @SuppressWarnings("unchecked")
     public void walk(PlanVisitor<O, P> visitor) throws VisitorException {
         // This is highly inefficient, but our graphs are small so it should 
be okay.
@@ -63,6 +77,9 @@ public class DependencyOrderWalker <O ex
         Set<O> seen = new HashSet<O>();
         List<O> leaves = mPlan.getLeaves();
         if (leaves == null) return;
+        if (walkLeavesInOrder) {
+            Collections.sort(leaves);
+        }
         for (O op : leaves) {
             doAllPredecessors(op, seen, fifo);
         }
@@ -71,8 +88,9 @@ public class DependencyOrderWalker <O ex
         }
     }
 
-    public PlanWalker<O, P> spawnChildWalker(P plan) { 
-        return new DependencyOrderWalker<O, P>(plan);
+    @Override
+    public PlanWalker<O, P> spawnChildWalker(P plan) {
+        return new DependencyOrderWalker<O, P>(plan, walkLeavesInOrder);
     }
 
     protected void doAllPredecessors(O node,

Modified: 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
 Tue Jan 27 02:27:45 2015
@@ -219,21 +219,32 @@ public class AvroStorageSchemaConversion
       ResourceSchema mapSchema = new ResourceSchema();
       ResourceSchema.ResourceFieldSchema[] mapSchemaFields =
           new ResourceSchema.ResourceFieldSchema[1];
-      if (mapAvroSchema.getType() == Type.RECORD) {
-        ResourceSchema innerResourceSchema =
-            avroSchemaToResourceSchema(fieldSchema.getValueType(), 
schemasInStack,
-            alreadyDefinedSchemas, allowRecursiveSchema);
+      switch(mapAvroSchema.getType()) {
+      case RECORD:
+        ResourceSchema innerResourceSchemaRecord =
+          avroSchemaToResourceSchema(fieldSchema.getValueType(), 
schemasInStack,
+          alreadyDefinedSchemas, allowRecursiveSchema);
         mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
         mapSchemaFields[0].setType(DataType.TUPLE);
         mapSchemaFields[0].setName(mapAvroSchema.getName());
-        mapSchemaFields[0].setSchema(innerResourceSchema);
+        mapSchemaFields[0].setSchema(innerResourceSchemaRecord);
         mapSchemaFields[0].setDescription(fieldSchema.getDoc());
-      } else {
+        mapSchema.setFields(mapSchemaFields);
+        rf.setSchema(mapSchema);
+        break;
+      case MAP:
+      case ARRAY:
+        ResourceSchema innerResourceSchema =
+            avroSchemaToResourceSchema(fieldSchema.getValueType(), 
schemasInStack,
+            alreadyDefinedSchemas, allowRecursiveSchema);
+        rf.setSchema(innerResourceSchema);
+        break;
+      default:
         mapSchemaFields[0] = new ResourceSchema.ResourceFieldSchema();
         mapSchemaFields[0].setType(getPigType(mapAvroSchema));
+        mapSchema.setFields(mapSchemaFields);
+        rf.setSchema(mapSchema);
       }
-      mapSchema.setFields(mapSchemaFields);
-      rf.setSchema(mapSchema);
     }
       break;
     case DataType.TUPLE:

Modified: 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
 Tue Jan 27 02:27:45 2015
@@ -206,7 +206,7 @@ public class DereferenceExpression exten
                    throw new FrontendException("Index "+rawColumn + " out of 
range in schema:" + schema.toString(false), 1127);
                }
                 columns.add( (Integer)rawColumn );
-            } else {
+            } else if (schema!=null) {
                 int pos = schema.getFieldPosition((String)rawColumn);
                 if( pos != -1) {
                     columns.add( pos );

Modified: 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
 Tue Jan 27 02:27:45 2015
@@ -60,6 +60,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.RollupDimensions;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -78,13 +79,13 @@ public class ExpToPhyTranslationVisitor
 
     // This value points to the current LogicalRelationalOperator we are 
working on
     protected LogicalRelationalOperator currentOp;
-    
-    public ExpToPhyTranslationVisitor(OperatorPlan plan, 
LogicalRelationalOperator op, PhysicalPlan phyPlan, 
+
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, 
LogicalRelationalOperator op, PhysicalPlan phyPlan,
             Map<Operator, PhysicalOperator> map) throws FrontendException {
         this(plan, new DependencyOrderWalker(plan), op, phyPlan, map);
     }
-    
-    public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, 
LogicalRelationalOperator op, PhysicalPlan phyPlan, 
+
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, 
LogicalRelationalOperator op, PhysicalPlan phyPlan,
             Map<Operator, PhysicalOperator> map) throws FrontendException {
         super(plan, walker);
         currentOp = op;
@@ -92,7 +93,7 @@ public class ExpToPhyTranslationVisitor
         currentPlan = phyPlan;
         currentPlans = new LinkedList<PhysicalPlan>();
     }
-    
+
     protected Map<Operator, PhysicalOperator> logToPhyMap;
 
     protected Deque<PhysicalPlan> currentPlans;
@@ -102,7 +103,7 @@ public class ExpToPhyTranslationVisitor
     protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
 
     protected PigContext pc;
-    
+
     public void setPigContext(PigContext pc) {
         this.pc = pc;
     }
@@ -110,13 +111,13 @@ public class ExpToPhyTranslationVisitor
     public PhysicalPlan getPhysicalPlan() {
         return currentPlan;
     }
-    
-    private void attachBinaryComparisonOperator( BinaryExpression op, 
+
+    private void attachBinaryComparisonOperator( BinaryExpression op,
             BinaryComparisonOperator exprOp ) throws FrontendException {
         // We dont have aliases in ExpressionOperators
         // exprOp.setAlias(op.getAlias());
-        
-        
+
+
         exprOp.setOperandType(op.getLhs().getType());
         exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
         exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
@@ -140,13 +141,13 @@ public class ExpToPhyTranslationVisitor
             }
         }
     }
-    
-    private void attachBinaryExpressionOperator( BinaryExpression op, 
+
+    private void attachBinaryExpressionOperator( BinaryExpression op,
             BinaryExpressionOperator exprOp ) throws FrontendException {
         // We dont have aliases in ExpressionOperators
         // exprOp.setAlias(op.getAlias());
-        
-        
+
+
         exprOp.setResultType(op.getLhs().getType());
         exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
         exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
@@ -173,81 +174,81 @@ public class ExpToPhyTranslationVisitor
 
     @Override
     public void visit( AndExpression op ) throws FrontendException {
-        
+
 //        System.err.println("Entering And");
         BinaryComparisonOperator exprOp = new POAnd(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( OrExpression op ) throws FrontendException {
-        
+
 //        System.err.println("Entering Or");
         BinaryComparisonOperator exprOp = new POOr(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( EqualExpression op ) throws FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( NotEqualExpression op ) throws FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new NotEqualToExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( GreaterThanExpression op ) throws FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( GreaterThanEqualExpression op ) throws 
FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new GTOrEqualToExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( LessThanExpression op ) throws FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
-    
+
+
     @Override
     public void visit( LessThanEqualExpression op ) throws FrontendException {
-        
+
         BinaryComparisonOperator exprOp = new LTOrEqualToExpr(new OperatorKey(
                 DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
-        
+
         attachBinaryComparisonOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit(ProjectExpression op) throws FrontendException {
         POProject exprOp;
-       
+
         if(op.getAttachedRelationalOp() instanceof LOGenerate && 
op.getPlan().getSuccessors(op)==null &&
             !(op.findReferent() instanceof LOInnerLoad)) {
             exprOp = new PORelationToExprProject(new 
OperatorKey(DEFAULT_SCOPE, nodeGen
@@ -256,7 +257,7 @@ public class ExpToPhyTranslationVisitor
             exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
         }
-        
+
         if (op.getFieldSchema()==null && op.isRangeOrStarProject())
             exprOp.setResultType(DataType.TUPLE);
         else
@@ -278,9 +279,9 @@ public class ExpToPhyTranslationVisitor
         // TODO implement this
 //        exprOp.setOverloaded(op.getOverloaded());
         logToPhyMap.put(op, exprOp);
-        currentPlan.add(exprOp);        
+        currentPlan.add(exprOp);
     }
-    
+
     @Override
     public void visit( MapLookupExpression op ) throws FrontendException {
         ExpressionOperator physOp = new POMapLookUp(new 
OperatorKey(DEFAULT_SCOPE,
@@ -302,10 +303,10 @@ public class ExpToPhyTranslationVisitor
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void 
visit(org.apache.pig.newplan.logical.expression.ConstantExpression op) throws 
FrontendException {
-        
+
 //        System.err.println("Entering Constant");
         ConstantExpression ce = new ConstantExpression(new 
OperatorKey(DEFAULT_SCOPE,
                 nodeGen.getNextNodeId(DEFAULT_SCOPE)));
@@ -318,7 +319,7 @@ public class ExpToPhyTranslationVisitor
         logToPhyMap.put(op, ce);
 //        System.err.println("Exiting Constant");
     }
-    
+
     @Override
     public void visit( CastExpression op ) throws FrontendException {
         POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
@@ -351,10 +352,10 @@ public class ExpToPhyTranslationVisitor
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visit( NotExpression op ) throws FrontendException {
-        
+
         PONot pNot = new PONot(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
 //        physOp.setAlias(op.getAlias());
@@ -374,7 +375,7 @@ public class ExpToPhyTranslationVisitor
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visit( IsNullExpression op ) throws FrontendException {
         POIsNull pIsNull = new POIsNull(new OperatorKey(DEFAULT_SCOPE, nodeGen
@@ -408,7 +409,7 @@ public class ExpToPhyTranslationVisitor
         ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
                 .getExpression());
         pNegative.setExpr(from);
-        pNegative.setResultType(op.getType());        
+        pNegative.setResultType(op.getType());
         try {
             currentPlan.connect(from, pNegative);
         } catch (PlanException e) {
@@ -417,60 +418,60 @@ public class ExpToPhyTranslationVisitor
             throw new LogicalToPhysicalTranslatorException(msg, errCode, 
PigException.BUG, e);
         }
     }
-    
+
     @Override
-    public void visit( AddExpression op ) throws FrontendException {        
-        BinaryExpressionOperator exprOp = new Add(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( AddExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new Add(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
     }
-    
+
     @Override
-    public void visit( RegexExpression op ) throws FrontendException {        
-        BinaryExpressionOperator exprOp = new PORegexp(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( RegexExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new PORegexp(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
-        
+
         List<Operator> successors = op.getPlan().getSuccessors(op);
         if (successors.get(1) instanceof 
org.apache.pig.newplan.logical.expression.ConstantExpression) {
             ((PORegexp)exprOp).setConstExpr(true);
         }
     }
-    
+
     @Override
-    public void visit( SubtractExpression op ) throws FrontendException {      
  
-        BinaryExpressionOperator exprOp = new Subtract(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( SubtractExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new Subtract(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
     }
-    
+
     @Override
-    public void visit( MultiplyExpression op ) throws FrontendException {      
  
-        BinaryExpressionOperator exprOp = new Multiply(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( MultiplyExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new Multiply(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
     }
-    
+
     @Override
-    public void visit( DivideExpression op ) throws FrontendException {        
-        BinaryExpressionOperator exprOp = new Divide(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( DivideExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new Divide(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
     }
-    
+
     @Override
-    public void visit( ModExpression op ) throws FrontendException {        
-        BinaryExpressionOperator exprOp = new Mod(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));        
-        
+    public void visit( ModExpression op ) throws FrontendException {
+        BinaryExpressionOperator exprOp = new Mod(new 
OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+
         attachBinaryExpressionOperator(op, exprOp);
     }
-    
+
     @Override
     public void visit( BinCondExpression op ) throws FrontendException {
-        
+
         POBinCond exprOp = new POBinCond( new OperatorKey(DEFAULT_SCOPE,
                 nodeGen.getNextNodeId(DEFAULT_SCOPE)) );
-        
+
         exprOp.setResultType(op.getType());
         exprOp.setCond((ExpressionOperator) 
logToPhyMap.get(op.getCondition()));
         exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
@@ -495,17 +496,34 @@ public class ExpToPhyTranslationVisitor
             }
         }
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
-    public void visit( UserFuncExpression op ) throws FrontendException {      
 
+    public void visit( UserFuncExpression op ) throws FrontendException {
         Object f = PigContext.instantiateFuncFromSpec(op.getFuncSpec());
         PhysicalOperator p;
+        String ROLLUP_UDF = RollupDimensions.class.getName();
         if (f instanceof EvalFunc) {
             p = new POUserFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen
                     .getNextNodeId(DEFAULT_SCOPE)), -1,
                     null, op.getFuncSpec(), (EvalFunc) f);
             ((POUserFunc)p).setSignature(op.getSignature());
+            if( op.getFuncSpec().toString().equals(ROLLUP_UDF)) {
+                //Set the pivot value
+                ((POUserFunc)p).setPivot(op.getPivot());
+                if(op.getRollupHIIOptimizable()!=false) {
+                    ((POUserFunc)p).setRollupHIIOptimizable(true);
+                    //Set value for RollupHIIOptimizable and pivot of 
RollupDimension
+                    EvalFunc<?> tmp = ((POUserFunc)p).getFunc();
+                    ((RollupDimensions)tmp).setRollupHIIOptimizable(true);
+                    try {
+                        ((RollupDimensions)tmp).setPivot(op.getPivot());
+                    } catch (IOException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                }
+            }
             //reinitialize input schema from signature
             if (((POUserFunc)p).getFunc().getInputSchema() == null) {
                 ((POUserFunc)p).setFuncInputSchema(op.getSignature());
@@ -535,7 +553,7 @@ public class ExpToPhyTranslationVisitor
             }
         }
         logToPhyMap.put(op, p);
-        
+
         //We need to track all the scalars
         if( op instanceof ScalarExpression ) {
             Operator refOp = 
((ScalarExpression)op).getImplicitReferencedOperator();
@@ -543,20 +561,20 @@ public class ExpToPhyTranslationVisitor
         }
 
     }
-    
+
     @Override
     public void visit( DereferenceExpression op ) throws FrontendException {
         POProject exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
 
         exprOp.setResultType(op.getType());
-        exprOp.setColumns((ArrayList<Integer>)op.getBagColumns());        
+        exprOp.setColumns((ArrayList<Integer>)op.getBagColumns());
         exprOp.setStar(false);
         logToPhyMap.put(op, exprOp);
         currentPlan.add(exprOp);
-        
+
         PhysicalOperator from = logToPhyMap.get( op.getReferredExpression() );
-        
+
         if( from != null ) {
             currentPlan.connect(from, exprOp);
         }

Modified: 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
 Tue Jan 27 02:27:45 2015
@@ -57,6 +57,26 @@ public class UserFuncExpression extends
     private static int sigSeq=0;
     private boolean viaDefine=false; //this represents whether the function 
was instantiate via a DEFINE statement or not
 
+    private boolean rollupHIIoptimizable = false;
+    //the pivot value
+    private int pivot = -1;
+
+    public void setPivot(int pvt) {
+        this.pivot = pvt;
+    }
+
+    public int getPivot() {
+        return this.pivot;
+    }
+
+    public void setRollupHIIOptimizable(boolean check) {
+        this.rollupHIIoptimizable = check;
+    }
+
+    public boolean getRollupHIIOptimizable() {
+        return this.rollupHIIoptimizable;
+    }
+
     public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) {
         super("UserFunc", plan);
         mFuncSpec = funcSpec;
@@ -66,7 +86,6 @@ public class UserFuncExpression extends
         }
     }
 
-
     public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec, 
List<LogicalExpression> args) {
         this( plan, funcSpec );
 

Modified: 
pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1654955&r1=1654954&r2=1654955&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
 Tue Jan 27 02:27:45 2015
@@ -44,6 +44,7 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.rules.PredicatePushdownOptimizer;
 import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
+import org.apache.pig.newplan.logical.rules.RollupHIIOptimizer;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
 import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
@@ -56,6 +57,7 @@ public class LogicalPlanOptimizer extend
     private boolean allRulesDisabled = false;
     private SetMultimap<RulesReportKey, String> rulesReport = 
TreeMultimap.create();
     private PigContext pc = null;
+    private static final String MAPREDUCE_FW = "MAPREDUCE";
 
     public LogicalPlanOptimizer(OperatorPlan p, int iterations, Set<String> 
turnOffRules) {
         this(p, iterations, turnOffRules, null);
@@ -203,6 +205,20 @@ public class LogicalPlanOptimizer extend
         if (!s.isEmpty())
             ls.add(s);
 
+        // RollupHIIOptimizer Set
+        // This set of rules for rollup hii
+        // If pig is not running in MR mode, this rule will be disabled
+        if (pc!=null)
+            if (pc.getExecType().toString().equals(MAPREDUCE_FW)) {
+                s = new HashSet<Rule>();
+                // Optimize RollupHII
+                r = new RollupHIIOptimizer("RollupHIIOptimizer");
+                checkAndAddRule(s, r);
+                if (!s.isEmpty())
+                    ls.add(s);
+            } else {
+                LOG.info("Not MR mode. RollupHIIOptimizer is disabled");
+            }
         return ls;
     }
 


Reply via email to