Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
 Fri Mar  4 18:17:39 2016
@@ -25,7 +25,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataType;
@@ -46,6 +45,7 @@ public class PigTupleDefaultRawComparato
         super(TupleFactory.getInstance().tupleClass());
     }
 
+    @Override
     public void setConf(Configuration conf) {
         try {
             mAsc = (boolean[]) 
ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
@@ -62,6 +62,7 @@ public class PigTupleDefaultRawComparato
         mWholeTuple = (mAsc.length == 1);
     }
 
+    @Override
     public Configuration getConf() {
         return null;
     }
@@ -78,9 +79,9 @@ public class PigTupleDefaultRawComparato
      * IntWritable.compare() is used. If both are null then the indices are
      * compared. Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
-        mHasNullField = false;
 
         Tuple t1;
         Tuple t2;
@@ -99,9 +100,16 @@ public class PigTupleDefaultRawComparato
 
         rc = compareTuple(t1, t2); //TODO think about how SchemaTuple could 
speed this up
 
+        // handle PIG-927. If tuples are equal but any field inside tuple is 
null,
+        // then we do not merge keys if indices are not same
+        if (rc == 0 && mHasNullField) {
+            rc = ((NullableTuple) t1).getIndex() - ((NullableTuple) 
t2).getIndex();
+        }
+
         return rc;
     }
 
+    @Override
     public int compare(Object o1, Object o2) {
         NullableTuple nt1 = (NullableTuple) o1;
         NullableTuple nt2 = (NullableTuple) o2;
@@ -110,10 +118,16 @@ public class PigTupleDefaultRawComparato
         // If either are null, handle differently.
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) 
nt2.getValueAsPigType());
+            // handle PIG-927. If tuples are equal but any field inside tuple 
is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mHasNullField) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull())
                 rc = -1;
             else
@@ -125,6 +139,7 @@ public class PigTupleDefaultRawComparato
     }
 
     private int compareTuple(Tuple t1, Tuple t2) {
+        mHasNullField = false;
         int sz1 = t1.size();
         int sz2 = t2.size();
         if (sz2 < sz1) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
 Fri Mar  4 18:17:39 2016
@@ -23,9 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -90,18 +88,26 @@ public class PigTupleSortComparator exte
     }
 
     /**
-     * Compare two NullableTuples as raw bytes. Tuples are compared 
field-wise. If both are null they are defined equal.
+     * Compare two NullableTuples as raw bytes. Tuples are compared field-wise.
+     * If both are null, then the indices are compared.
      * Otherwise the null one is defined to be less.
      */
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int rc = 0;
         if (b1[s1] == 0 && b2[s2] == 0) {
             // skip mNull and mIndex
             rc = mComparator.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2);
+            // handle PIG-927. If tuples are equal but any field inside tuple 
is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mComparator.hasComparedTupleNull()) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
         } else {
-            // for sorting purposes two nulls are equal, null sorts first
-            if (b1[s1] != 0 && b2[s2] != 0)
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (b1[s1] != 0 && b2[s2] != 0) {
+                rc = b1[s1 + 1] - b2[s2 + 1];
+            }
             else if (b1[s1] != 0)
                 rc = -1;
             else
@@ -112,6 +118,7 @@ public class PigTupleSortComparator exte
         return rc;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public int compare(Object o1, Object o2) {
         NullableTuple nt1 = (NullableTuple) o1;
@@ -121,10 +128,16 @@ public class PigTupleSortComparator exte
         // If either are null, handle differently.
         if (!nt1.isNull() && !nt2.isNull()) {
             rc = mComparator.compare((Tuple) nt1.getValueAsPigType(), (Tuple) 
nt2.getValueAsPigType());
+            // handle PIG-927. If tuples are equal but any field inside tuple 
is null,
+            // then we do not merge keys if indices are not same
+            if (rc == 0 && mComparator.hasComparedTupleNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
         } else {
-            // For sorting purposes two nulls are equal.
-            if (nt1.isNull() && nt2.isNull())
-                rc = 0;
+            // Two nulls are equal if indices are same
+            if (nt1.isNull() && nt2.isNull()) {
+                rc = nt1.getIndex() - nt2.getIndex();
+            }
             else if (nt1.isNull())
                 rc = -1;
             else

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java
 Fri Mar  4 18:17:39 2016
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import java.io.IOException;
-
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
 
@@ -26,7 +24,7 @@ public class ProgressableReporter implem
     TaskAttemptContext rep;
 
     public ProgressableReporter(){
-        
+
     }
 
     public ProgressableReporter(TaskAttemptContext rep) {
@@ -34,14 +32,19 @@ public class ProgressableReporter implem
         this.rep = rep;
     }
 
+    @Override
     public void progress() {
-        if(rep!=null)
+        if (rep != null) {
             rep.progress();
+        }
     }
 
+    @Override
     public void progress(String msg) {
         try {
-            rep.setStatus(msg);
+            if (rep != null) {
+                rep.setStatus(msg);
+            }
         }catch (Exception e) {
             rep.progress();
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java
 Fri Mar  4 18:17:39 2016
@@ -47,7 +47,7 @@ import com.google.common.collect.Maps;
 public class SkewedPartitioner extends Partitioner<PigNullableWritable, 
Writable> implements Configurable {
     protected static final TupleFactory tf = TupleFactory.getInstance();
 
-    protected Map<Tuple, Pair<Integer, Integer>> reducerMap = 
Maps.newHashMap();
+    protected Map<Object, Pair<Integer, Integer>> reducerMap;
     protected Integer totalReducers = -1;
     protected boolean inited = false;
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 Fri Mar  4 18:17:39 2016
@@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
@@ -110,6 +111,18 @@ public class EndOfAllInputSetter extends
             endOfAllInputFlag = true;
         }
 
+        @Override
+        public void visitPOForEach(POForEach foreach) throws VisitorException {
+            try {
+                if (foreach.needEndOfAllInputProcessing()) {
+                    endOfAllInputFlag = true;
+                }
+            } catch (Exception e) {
+                throw new VisitorException(e);
+            }
+        }
+
+
         /**
          * @return if end of all input is present
          */

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Fri Mar  4 18:17:39 2016
@@ -24,8 +24,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.JVMReuseManager;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -33,6 +31,7 @@ import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -69,6 +68,8 @@ public abstract class PhysicalOperator e
     protected static final long serialVersionUID = 1L;
     protected static final Result RESULT_EMPTY = new 
Result(POStatus.STATUS_NULL, null);
     protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, 
null);
+    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
 
     // The degree of parallelism requested
     protected int requestedParallelism;
@@ -122,10 +123,6 @@ public abstract class PhysicalOperator e
 
     private List<OriginalLocation> originalLocations =  new 
ArrayList<OriginalLocation>();
 
-    static {
-        
JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class);
-    }
-
     public PhysicalOperator(OperatorKey k) {
         this(k, -1, null);
     }
@@ -295,12 +292,13 @@ public abstract class PhysicalOperator e
         try {
             if (input == null && (inputs == null || inputs.size() == 0)) {
                 // log.warn("No inputs found. Signaling End of Processing.");
-                return new Result(POStatus.STATUS_EOP, null);
+                return RESULT_EOP;
             }
 
             // Should be removed once the model is clear
-            if (getReporter() != null) {
-                getReporter().progress();
+            PigProgressable progRep = getReporter();
+            if (progRep != null) {
+                progRep.progress();
             }
 
             if (!isInputAttached()) {
@@ -409,7 +407,7 @@ public abstract class PhysicalOperator e
 
     public Result getNextDataBag() throws ExecException {
         Result val = new Result();
-        DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
+        DataBag tmpBag = mBagFactory.newDefaultBag();
         for (Result ret = getNextTuple(); ret.returnStatus != 
POStatus.STATUS_EOP; ret = getNextTuple()) {
             if (ret.returnStatus == POStatus.STATUS_ERR) {
                 return ret;
@@ -457,14 +455,17 @@ public abstract class PhysicalOperator e
         PhysicalOperator.reporter.set(reporter);
     }
 
-    @StaticDataCleanup
+    //@StaticDataCleanup
     public static void staticDataCleanup() {
         reporter = new ThreadLocal<PigProgressable>();
     }
 
     /**
-     * Make a deep copy of this operator. This function is blank, however,
+     * Make a copy of this operator. This function is blank, however,
      * we should leave a place holder so that the subclasses can clone
+     * to make deep copy as this one creates a shallow copy of
+     * non-primitive types (objects, arrays and lists)
+     *
      * @throws CloneNotSupportedException
      */
     @Override
@@ -477,6 +478,14 @@ public abstract class PhysicalOperator e
         originalLocations.addAll(op.originalLocations);
     }
 
+    protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan> 
origPlans) throws CloneNotSupportedException {
+        List<PhysicalPlan> clonePlans = new 
ArrayList<PhysicalPlan>(origPlans.size());
+        for (PhysicalPlan plan : origPlans) {
+            clonePlans.add(plan.clone());
+        }
+        return clonePlans;
+    }
+
     /**
      * @param physicalPlan
      */

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 Fri Mar  4 18:17:39 2016
@@ -1984,7 +1984,22 @@ public class POCast extends ExpressionOp
 
     @Override
     public Result getNextDataByteArray() throws ExecException {
+      PhysicalOperator in = inputs.get(0);
+      Byte resultType = in.getResultType();
+      if  (resultType != DataType.BYTEARRAY) 
         return error();
+      
+      DataByteArray dba = null;
+      Result res = in.getNextDataByteArray();
+      if (res.returnStatus == POStatus.STATUS_OK && res.result != null) {
+          try {
+              dba = (DataByteArray) res.result;
+          } catch (ClassCastException e) {
+              return error();
+          }
+          if (dba != null) return res;
+      }
+      return res;
     }
 
     private void readObject(ObjectInputStream is) throws IOException,

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 Fri Mar  4 18:17:39 2016
@@ -27,12 +27,10 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -51,10 +49,6 @@ public class POProject extends Expressio
      */
     private static final long serialVersionUID = 1L;
 
-    private static TupleFactory tupleFactory = TupleFactory.getInstance();
-
-    protected static final BagFactory bagFactory = BagFactory.getInstance();
-
     private boolean resultSingleTupleBag = false;
 
     //The column to project
@@ -62,20 +56,20 @@ public class POProject extends Expressio
 
     //True if we are in the middle of streaming tuples
     //in a bag
-    boolean processingBagOfTuples = false;
+    private boolean processingBagOfTuples = false;
 
     //The bag iterator used while straeming tuple
-    transient Iterator<Tuple> bagIterator = null;
+    private transient Iterator<Tuple> bagIterator = null;
 
     //Represents the fact that this instance of POProject
     //is overloaded to stream tuples in the bag rather
     //than passing the entire bag. It is the responsibility
     //of the translator to set this.
-    boolean overloaded = false;
+    protected boolean overloaded = false;
 
 
-    private boolean isProjectToEnd = false;
-    private int startCol;
+    protected boolean isProjectToEnd = false;
+    protected int startCol;
 
     public POProject(OperatorKey k) {
         this(k,-1,0);
@@ -191,7 +185,7 @@ public class POProject extends Expressio
             for(int col : columns) {
                 addColumn(objList, inpValue, col);
             }
-            ret = tupleFactory.newTupleNoCopy(objList);
+            ret = mTupleFactory.newTupleNoCopy(objList);
         }
         res.result = ret;
         illustratorMarkup(inpValue, res.result, -1);
@@ -277,20 +271,20 @@ public class POProject extends Expressio
                     for (int col : columns) {
                         addColumn(objList, tuple, col);
                     }
-                    outBag = new SingleTupleBag( 
tupleFactory.newTupleNoCopy(objList) );
+                    outBag = new SingleTupleBag( 
mTupleFactory.newTupleNoCopy(objList) );
                 }else {
                     Tuple tmpTuple = getRangeTuple(tuple);
                     outBag = new SingleTupleBag(tmpTuple);
                 }
             } else {
-                outBag = bagFactory.newDefaultBag();
+                outBag = mBagFactory.newDefaultBag();
                 for (Tuple tuple : inpBag) {
                     if(!isProjectToEnd){
                         ArrayList<Object> objList = new 
ArrayList<Object>(columns.size());
                         for (int col : columns) {
                             addColumn(objList, tuple, col);
                         }
-                        outBag.add( tupleFactory.newTupleNoCopy(objList) );
+                        outBag.add( mTupleFactory.newTupleNoCopy(objList) );
                     }else{
                         Tuple outTuple = getRangeTuple(tuple);
                         outBag.add(outTuple);
@@ -321,14 +315,14 @@ public class POProject extends Expressio
         Tuple outTuple;
         if(isRangeInvalid(lastColIdx)){
             //invalid range - return empty tuple
-            outTuple = tupleFactory.newTuple();
+            outTuple = mTupleFactory.newTuple();
         }
         else {
             ArrayList<Object> objList = new ArrayList<Object>(lastColIdx - 
startCol + 1);
             for(int i = startCol; i <= lastColIdx ; i++){
                 addColumn(objList, tuple, i);
             }
-            outTuple = tupleFactory.newTupleNoCopy(objList);
+            outTuple = mTupleFactory.newTupleNoCopy(objList);
         }
         return outTuple;
     }
@@ -451,7 +445,7 @@ public class POProject extends Expressio
                         objList.add(null);
                     }
                 }
-                ret = tupleFactory.newTuple(objList);
+                ret = mTupleFactory.newTuple(objList);
                 res.result = (Tuple)ret;
                 return res;
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java
 Fri Mar  4 18:17:39 2016
@@ -139,23 +139,24 @@ public class PORelationToExprProject ext
         sendEmptyBagOnEOP = false;
         return(r);
     }
-
+       
     // See PIG-4644
     @Override
     public PORelationToExprProject clone() throws CloneNotSupportedException {
-        ArrayList<Integer> cols = new ArrayList<>(columns.size());
+        ArrayList<Integer> cols = new ArrayList<Integer>(columns.size());
         // Can reuse the same Integer objects, as they are immutable
         for (Integer i : columns) {
             cols.add(i);
         }
         PORelationToExprProject clone = new PORelationToExprProject(new 
OperatorKey(mKey.scope,
-                NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-                requestedParallelism, cols);
+            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
+            requestedParallelism, cols);
         clone.cloneHelper(this);
         clone.overloaded = overloaded;
+        clone.startCol = startCol;
+        clone.isProjectToEnd = isProjectToEnd;
         clone.resultType = resultType;
         clone.sendEmptyBagOnEOP = sendEmptyBagOnEOP;
         return clone;
     }
-    
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
 Fri Mar  4 18:17:39 2016
@@ -45,7 +45,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor;
 import org.apache.pig.builtin.MonitoredUDF;
-import org.apache.pig.builtin.RollupDimensions;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
@@ -87,28 +86,6 @@ public class POUserFunc extends Expressi
     private long timingFrequency = 100L;
     private boolean doTiming = false;
 
-    private static final String ROLLUP_UDF = RollupDimensions.class.getName();
-    //the pivot value
-    private int pivot = -1;
-
-    private boolean rollupHIIoptimizable = false;
-
-    public void setPivot(int pvt) {
-        this.pivot = pvt;
-    }
-
-    public int getPivot() {
-        return this.pivot;
-    }
-
-    public void setRollupHIIOptimizable(boolean check) {
-        this.rollupHIIoptimizable = check;
-    }
-
-    public boolean getRollupHIIOptimizable() {
-        return this.rollupHIIoptimizable;
-    }
-
     public PhysicalOperator getReferencedOperator() {
         return referencedOperator;
     }
@@ -154,17 +131,6 @@ public class POUserFunc extends Expressi
         if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
             executor = new MonitoredUDFExecutor(func);
         }
-
-        if (funcSpec.getClassName().equals(ROLLUP_UDF) && 
this.rollupHIIoptimizable != false) {
-            try {
-                ((RollupDimensions) func).setPivot(this.pivot);
-            } catch (IOException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-            }
-            ((RollupDimensions) 
func).setRollupHIIOptimizable(this.rollupHIIoptimizable);
-        }
-
         //the next couple of initializations do not work as intended for the 
following reasons
         //the reporter and pigLogger are member variables of PhysicalOperator
         //when instanitateFunc is invoked at deserialization time, both
@@ -355,6 +321,9 @@ public class POUserFunc extends Expressi
                         }
                     }
                 } else {
+                    if (parentPlan!=null && parentPlan.endOfAllInput && 
needEndOfAllInputProcessing()) {
+                        func.setEndOfAllInput(true);
+                    }
                     if (executor != null) {
                         result.result = executor.monitorExec((Tuple) 
result.result);
                     } else {
@@ -601,6 +570,8 @@ public class POUserFunc extends Expressi
             requestedParallelism, null, funcSpec.clone());
         clone.setResultType(resultType);
         clone.signature = signature;
+        clone.cacheFiles = cacheFiles;
+        clone.shipFiles = shipFiles;
         return clone;
     }
 
@@ -640,6 +611,10 @@ public class POUserFunc extends Expressi
         return func;
     }
 
+    public String getSignature() {
+        return signature;
+    }
+
     public void setSignature(String signature) {
         this.signature = signature;
         if (this.func!=null) {
@@ -659,4 +634,7 @@ public class POUserFunc extends Expressi
         }
     }
 
+    public boolean needEndOfAllInputProcessing() {
+        return getFunc().needEndOfAllInputProcessing();
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
 Fri Mar  4 18:17:39 2016
@@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * This is a base class for all unary comparison operators. Supports the
  * use of operand type instead of result type as the result type is
  * always boolean.
- * 
+ *
  */
 public abstract class UnaryComparisonOperator extends UnaryExpressionOperator
         implements ComparisonOperator {
@@ -35,7 +34,7 @@ public abstract class UnaryComparisonOpe
     //The result will be comunicated using the Status object.
     //This is a slight abuse of the status object.
     protected byte operandType;
-    
+
     public UnaryComparisonOperator(OperatorKey k) {
         this(k,-1);
     }
@@ -44,14 +43,16 @@ public abstract class UnaryComparisonOpe
         super(k, rp);
     }
 
+    @Override
     public byte getOperandType() {
         return operandType;
     }
 
+    @Override
     public void setOperandType(byte operandType) {
         this.operandType = operandType;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
@@ -59,4 +60,9 @@ public abstract class UnaryComparisonOpe
         }
         return null;
     }
+
+    protected void cloneHelper(UnaryComparisonOperator op) {
+        super.cloneHelper(op);
+        this.operandType = op.operandType;
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
 Fri Mar  4 18:17:39 2016
@@ -67,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -153,10 +152,6 @@ public class PhyPlanVisitor extends Plan
         }
     }
 
-    public void visitPORollupHIIForEach(PORollupHIIForEach nhfe) throws 
VisitorException {
-        visitPOForEach(nhfe);
-    }
-
     public void visitUnion(POUnion un) throws VisitorException{
         //do nothing
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
 Fri Mar  4 18:17:39 2016
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -69,8 +68,6 @@ public class POCounter extends PhysicalO
      **/
     private boolean isRowNumber = false;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     /**
      * Local counter for tuples on the same task.
      **/
@@ -321,4 +318,14 @@ public class POCounter extends PhysicalO
     public String getOperationID() {
         return operationID;
     }
+
+    @Override
+    public POCounter clone() throws CloneNotSupportedException {
+        POCounter clone = (POCounter)super.clone();
+        clone.localCount = new Long(localCount);
+        clone.taskID = new Integer(taskID);
+        // counterPlans and mAscCols unused. Not cloning them
+        return clone;
+    }
+
 }
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 Fri Mar  4 18:17:39 2016
@@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -37,20 +36,20 @@ import org.apache.pig.pen.util.LineageTr
 
 /**
  * Recover this class for nested cross operation.
- * 
- * 
+ *
+ *
  */
 public class POCross extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
 
-    protected DataBag[] inputBags;
+    protected transient DataBag[] inputBags;
 
-    protected Tuple[] data;
+    protected transient Tuple[] data;
 
     protected transient Iterator<Tuple>[] its;
-    
-    protected Tuple tupleOfLastBag;
+
+    protected transient Tuple tupleOfLastBag;
 
     public POCross(OperatorKey k) {
         super(k);
@@ -197,7 +196,7 @@ public class POCross extends PhysicalOpe
         its = new Iterator[length];
         for (int i = 0; i < length; ++i) {
             PhysicalOperator op = inputs.get(i);
-            DataBag bag = BagFactory.getInstance().newDefaultBag();
+            DataBag bag = mBagFactory.newDefaultBag();
             inputBags[count] = bag;
             for (Result res = op.getNextTuple(); res.returnStatus != 
POStatus.STATUS_EOP; res = op
                     .getNextTuple()) {
@@ -226,7 +225,7 @@ public class POCross extends PhysicalOpe
 
         return illustratorMarkup(out, out, 0);
     }
-    
+
     private boolean loadLastBag() throws ExecException {
         Result resOfLastBag = null;
         int index = inputs.size() - 1;
@@ -247,7 +246,7 @@ public class POCross extends PhysicalOpe
                     "Error accumulating data in the local Cross operator");
         }
     }
-    
+
     private void clearMemory() {
         // reset inputBags, its, data and tupleOfLastBag to null so that in the
         // next round of getNext, the new input data will be loaded.

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 Fri Mar  4 18:17:39 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalDistinctBag;
@@ -49,9 +48,8 @@ import org.apache.pig.impl.plan.VisitorE
 public class PODistinct extends PhysicalOperator implements Cloneable {
     private static final Log log = LogFactory.getLog(PODistinct.class);
     private static final long serialVersionUID = 1L;
-    private boolean inputsAccumulated = false;
-    private DataBag distinctBag = null;
-
+    private transient boolean inputsAccumulated;
+    private transient DataBag distinctBag;
     private transient boolean initialized;
     private transient boolean useDefaultBag;
     private transient Iterator<Tuple> it;
@@ -102,7 +100,7 @@ public class PODistinct extends Physical
                      }
                  }
              }
-             distinctBag = useDefaultBag ? 
BagFactory.getInstance().newDistinctBag()
+             distinctBag = useDefaultBag ? mBagFactory.newDistinctBag()
                      : new InternalDistinctBag(3);
 
             Result in = processInput();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Fri Mar  4 18:17:39 2016
@@ -46,7 +46,6 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -70,6 +69,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class POFRJoin extends PhysicalOperator {
     private static final Log log = LogFactory.getLog(POFRJoin.class);
     private static final long serialVersionUID = 1L;
+
     // The number in the input list which denotes the fragmented input
     protected int fragment;
     // There can be n inputs each being a List<PhysicalPlan>
@@ -85,18 +85,7 @@ public class POFRJoin extends PhysicalOp
     protected ConstantExpression[] constExps;
     // Used to produce the cross product of various bags
     protected POForEach fe;
-    // The array of Hashtables one per replicated input. replicates[fragment] =
-    // null
-    // fragment is the input which is fragmented and not replicated.
-    protected TupleToMapKey replicates[];
-    // varaible which denotes whether we are returning tuples from the foreach
-    // operator
-    protected boolean processingPlan;
-    // A dummy tuple
-    protected Tuple dumTup = TupleFactory.getInstance().newTuple(1);
-    // An instance of tuple factory
-    protected transient TupleFactory mTupleFactory;
-    protected boolean setUp;
+
     // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
     // Join
     protected boolean isLeftOuterJoin;
@@ -106,6 +95,16 @@ public class POFRJoin extends PhysicalOp
     protected Schema[] inputSchemas;
     protected Schema[] keySchemas;
 
+    // The array of Hashtables one per replicated input. replicates[fragment] =
+    // null fragment is the input which is fragmented and not replicated.
+    protected transient TupleToMapKey replicates[];
+    // varaible which denotes whether we are returning tuples from the foreach
+    // operator
+    protected transient boolean processingPlan;
+    // A dummy tuple
+    protected transient Tuple dumTup;
+    protected transient boolean setUp;
+
     public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
             List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
             FileSpec[] replFiles, int fragment, boolean isLeftOuter,
@@ -126,12 +125,10 @@ public class POFRJoin extends PhysicalOp
         this.fragment = fragment;
         this.keyTypes = keyTypes;
         this.replFiles = replFiles;
-        replicates = new TupleToMapKey[ppLists.size()];
+
         LRs = new POLocalRearrange[ppLists.size()];
         constExps = new ConstantExpression[ppLists.size()];
         createJoinPlans(k);
-        processingPlan = false;
-        mTupleFactory = TupleFactory.getInstance();
         List<Tuple> tupList = new ArrayList<Tuple>();
         tupList.add(nullTuple);
         nullBag = new NonSpillableDataBag(tupList);
@@ -159,7 +156,6 @@ public class POFRJoin extends PhysicalOp
         this.fe = copy.fe;
         this.constExps = copy.constExps;
         this.processingPlan = copy.processingPlan;
-        this.mTupleFactory = copy.mTupleFactory;
         this.nullBag = copy.nullBag;
         this.isLeftOuterJoin = copy.isLeftOuterJoin;
         this.inputSchemas = copy.inputSchemas;
@@ -173,7 +169,7 @@ public class POFRJoin extends PhysicalOp
 
     /**
      * Configures the Local Rearrange operators & the foreach operator
-     * 
+     *
      * @param old
      * @throws ExecException
      */
@@ -238,6 +234,8 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
+            replicates = new TupleToMapKey[phyPlanLists.size()];
+            dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
         }
@@ -254,7 +252,7 @@ public class POFRJoin extends PhysicalOp
                 if (res.returnStatus == POStatus.STATUS_EOP) {
                     // We have completed all cross-products now its time to 
move
                     // to next tuple of left side
-                    processingPlan = false;                    
+                    processingPlan = false;
                     break;
                 }
                 if (res.returnStatus == POStatus.STATUS_ERR) {
@@ -284,7 +282,7 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Tuple key = TupleFactory.getInstance().newTuple(1);
+            Tuple key = mTupleFactory.newTuple(1);
             key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
@@ -390,9 +388,9 @@ public class POFRJoin extends PhysicalOp
 
             POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
                     replFile);
-            
+
             Properties props = ConfigurationUtil.getLocalFSProperties();
-            PigContext pc = new PigContext(ExecType.LOCAL, props);   
+            PigContext pc = new PigContext(ExecType.LOCAL, props);
             ld.setPc(pc);
             // We use LocalRearrange Operator to seperate Key and Values
             // eg. ( a, b, c ) would generate a, ( a, b, c )
@@ -437,11 +435,10 @@ public class POFRJoin extends PhysicalOp
         }
         return false;
     }
-    
+
     private void readObject(ObjectInputStream is) throws IOException,
             ClassNotFoundException, ExecException {
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
         // setUpHashTable();
     }
 
@@ -535,4 +532,28 @@ public class POFRJoin extends PhysicalOp
         // no op: all handled by the preceding POForEach
         return null;
     }
+
+    @Override
+    public POFRJoin clone() throws CloneNotSupportedException {
+        POFRJoin clone = (POFRJoin) super.clone();
+        // Not doing deep copy of nullBag, nullBag, inputSchemas, keySchemas
+        // as they are read only
+        clone.phyPlanLists = new 
ArrayList<List<PhysicalPlan>>(phyPlanLists.size());
+        for (List<PhysicalPlan> ppLst : phyPlanLists) {
+            clone.phyPlanLists.add(clonePlans(ppLst));
+        }
+
+        clone.LRs = new POLocalRearrange[phyPlanLists.size()];
+        clone.constExps = new ConstantExpression[phyPlanLists.size()];
+        try {
+            clone.createJoinPlans(getOperatorKey());
+        } catch (ExecException e) {
+            CloneNotSupportedException cnse = new 
CloneNotSupportedException("Problem with setting plans of " + 
this.getClass().getSimpleName());
+            cnse.initCause(e);
+            throw cnse;
+        }
+        return clone;
+    }
+
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 Fri Mar  4 18:17:39 2016
@@ -36,30 +36,29 @@ import org.apache.pig.impl.plan.VisitorE
  * avoid many function calls, the filter operator, stores the Comparison
  * Operator that is the root of the Expression Plan and uses its getNext
  * directly.
- * 
+ *
  * Since the filter is supposed to return tuples only, getNext is not supported
  * on any other data type.
- * 
+ *
  */
 public class POFilter extends PhysicalOperator {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
 
     // The expression plan
-    PhysicalPlan plan;
+    private PhysicalPlan plan;
 
     // The root comparison operator of the expression plan
-//    ComparisonOperator comOp;
-    PhysicalOperator comOp;
-    
+    // ComparisonOperator comOp;
+    private PhysicalOperator comOp;
 
     // The operand type for the comparison operator needed
     // to call the comparison operators getNext with the
     // appropriate type
-    byte compOperandType;
+    // private byte compOperandType;
 
     public POFilter(OperatorKey k) {
         this(k, -1, null);
@@ -186,7 +185,7 @@ public class POFilter extends PhysicalOp
     public PhysicalPlan getPlan() {
         return plan;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
       if (illustrator != null) {
@@ -202,4 +201,12 @@ public class POFilter extends PhysicalOp
       }
       return (Tuple) out;
     }
+
+    @Override
+    public PhysicalOperator clone() throws CloneNotSupportedException {
+        POFilter opClone = (POFilter) super.clone();
+        opClone.setPlan(plan.clone());
+        return opClone;
+    }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Fri Mar  4 18:17:39 2016
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
@@ -41,6 +42,7 @@ import org.apache.pig.data.SchemaTupleFa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleMaker;
+import org.apache.pig.data.UnlimitedNullTuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -55,26 +57,13 @@ public class POForEach extends PhysicalO
     private static final long serialVersionUID = 1L;
 
     protected List<PhysicalPlan> inputPlans;
-    protected List<PhysicalOperator> opsToBeReset;
-    //Since the plan has a generate, this needs to be maintained
-    //as the generate can potentially return multiple tuples for
-    //same call.
-    protected boolean processingPlan = false;
-
-    //its holds the iterators of the databags given by the input expressions 
which need flattening.
-    transient protected Iterator<Tuple> [] its = null;
 
-    //This holds the outputs given out by the input expressions of any datatype
-    protected Object [] bags = null;
+    protected List<PhysicalOperator> opsToBeReset;
 
-    //This is the template whcih contains tuples and is flattened out in 
createTuple() to generate the final output
-    protected Object[] data = null;
+    protected PhysicalOperator[] planLeafOps;
 
     // store result types of the plan leaves
-    protected byte[] resultTypes = null;
-
-    // store whether or not an accumulative UDF has terminated early
-    protected BitSet earlyTermination = null;
+    protected byte[] resultTypes;
 
     // array version of isToBeFlattened - this is purely
     // for optimization - instead of calling isToBeFlattened.get(i)
@@ -83,19 +72,40 @@ public class POForEach extends PhysicalO
     // so we can also save on the Boolean.booleanValue() calls
     protected boolean[] isToBeFlattenedArray;
 
-    ExampleTuple tIn = null;
     protected int noItems;
 
-    protected PhysicalOperator[] planLeafOps = null;
+    //Since the plan has a generate, this needs to be maintained
+    //as the generate can potentially return multiple tuples for
+    //same call.
+    protected transient boolean processingPlan;
+
+    //its holds the iterators of the databags given by the input expressions 
which need flattening.
+    protected transient Iterator<Tuple> [] its = null;
+
+    //This holds the outputs given out by the input expressions of any datatype
+    protected transient Object[] bags ;
+
+    //This is the template whcih contains tuples and is flattened out in 
createTuple() to generate the final output
+    protected transient Object[] data;
+
+    // store whether or not an accumulative UDF has terminated early
+    protected transient BitSet earlyTermination;
+
+    protected transient ExampleTuple tIn;
+
 
     protected transient AccumulativeTupleBuffer buffer;
 
-    protected Tuple inpTuple;
+    protected transient Tuple inpTuple;
+
+    protected transient boolean endOfAllInputProcessed;
 
     // Indicate the foreach statement can only in map side
     // Currently only used in MR cross (See PIG-4175)
     protected boolean mapSideOnly = false;
 
+    protected Boolean endOfAllInputProcessing = false;
+
     private Schema schema;
 
     public POForEach(OperatorKey k) {
@@ -244,13 +254,21 @@ public class POForEach extends PhysicalO
             //read
             while (true) {
                 inp = processInput();
-                if (inp.returnStatus == POStatus.STATUS_EOP ||
-                        inp.returnStatus == POStatus.STATUS_ERR) {
+
+                if (inp.returnStatus == POStatus.STATUS_ERR) {
                     return inp;
                 }
                 if (inp.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 }
+                if (inp.returnStatus == POStatus.STATUS_EOP) {
+                    if (parentPlan!=null && parentPlan.endOfAllInput && 
!endOfAllInputProcessed && endOfAllInputProcessing) {
+                        // continue pull one more output
+                        inp = new Result(POStatus.STATUS_OK, new 
UnlimitedNullTuple());
+                    } else {
+                        return inp;
+                    }
+                }
 
                 attachInputToPlans((Tuple) inp.result);
                 inpTuple = (Tuple)inp.result;
@@ -357,6 +375,9 @@ public class POForEach extends PhysicalO
 
 
         if(its == null) {
+            if (endOfAllInputProcessed) {
+                return RESULT_EOP;
+            }
             //getNext being called for the first time OR starting with a set 
of new data from inputs
             its = new Iterator[noItems];
             bags = new Object[noItems];
@@ -424,6 +445,9 @@ public class POForEach extends PhysicalO
                     its[i] = null;
                 }
             }
+            if (parentPlan!=null && parentPlan.endOfAllInput && 
endOfAllInputProcessing) {
+                endOfAllInputProcessed = true;
+            }
         }
 
         // if accumulating, we haven't got data yet for some fields, just 
return
@@ -658,16 +682,13 @@ public class POForEach extends PhysicalO
             }
         }
 
-        List<PhysicalOperator> ops = new 
ArrayList<PhysicalOperator>(opsToBeReset.size());
-        for (PhysicalOperator op : opsToBeReset) {
-            ops.add(op);
-        }
         POForEach clone = new POForEach(new OperatorKey(mKey.scope,
                 NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
                 requestedParallelism, plans, flattens);
-        clone.setOpsToBeReset(ops);
         clone.setResultType(getResultType());
         clone.addOriginalLocation(alias, getOriginalLocations());
+        clone.endOfAllInputProcessing = endOfAllInputProcessing;
+        clone.mapSideOnly = mapSideOnly;
         return clone;
     }
 
@@ -798,4 +819,21 @@ public class POForEach extends PhysicalO
     public boolean isMapSideOnly() {
         return mapSideOnly;
     }
+
+    public boolean needEndOfAllInputProcessing() throws ExecException {
+        try {
+            for (PhysicalPlan innerPlan : inputPlans) {
+                UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor
+                     = new UDFEndOfAllInputNeededVisitor(innerPlan);
+                endOfAllInputNeededVisitor.visit();
+                if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) {
+                    endOfAllInputProcessing = true;
+                    return true;
+                }
+            }
+            return false;
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 Fri Mar  4 18:17:39 2016
@@ -27,7 +27,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -38,14 +37,14 @@ public class POLimit extends PhysicalOpe
      */
     private static final long serialVersionUID = 1L;
 
-    // Counts for outputs processed
-    private long soFar = 0;
-
     // Number of limited outputs
-    long mLimit;
+    private long mLimit;
 
     // The expression plan
-    PhysicalPlan expressionPlan;
+    private PhysicalPlan expressionPlan;
+
+    // Counts for outputs processed
+    private transient long soFar = 0;
 
     public POLimit(OperatorKey k) {
         this(k, -1, null);
@@ -159,15 +158,11 @@ public class POLimit extends PhysicalOpe
 
     @Override
     public POLimit clone() throws CloneNotSupportedException {
-        POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
-            this.requestedParallelism, this.inputs);
-        newLimit.mLimit = this.mLimit;
+        POLimit clone = (POLimit) super.clone();
         if (this.expressionPlan != null) {
-            newLimit.expressionPlan = this.expressionPlan.clone();
+            clone.expressionPlan = this.expressionPlan.clone();
         }
-        newLimit.addOriginalLocation(alias, getOriginalLocations());
-        return newLimit;
+        return clone;
     }
 
     @Override

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 Fri Mar  4 18:17:39 2016
@@ -35,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -57,8 +55,6 @@ public class POLocalRearrange extends Ph
      */
     protected static final long serialVersionUID = 1L;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
@@ -82,8 +78,6 @@ public class POLocalRearrange extends Ph
 
     protected boolean isCross = false;
 
-    protected Result inp;
-
     // map to store mapping of projected columns to
     // the position in the "Key" where these will be projected to.
     // We use this information to strip off these columns
@@ -133,6 +127,8 @@ public class POLocalRearrange extends Ph
     // By default, we strip keys from the value.
     private boolean stripKeyFromValue = true;
 
+    protected transient Result inp;
+
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -709,32 +705,18 @@ public class POLocalRearrange extends Ph
      */
     @Override
     public POLocalRearrange clone() throws CloneNotSupportedException {
-        POLocalRearrange clone = new POLocalRearrange(new OperatorKey(
-            mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism);
-        deepCopyTo(clone);
-        return clone;
-    }
-
-    protected void deepCopyTo(POLocalRearrange clone)
-            throws CloneNotSupportedException {
-
-        clone.setParentPlan(parentPlan);
-        clone.index = index;
-        if (useSecondaryKey) {
-            clone.keyType = mainKeyType;
-        } else {
-            clone.keyType = keyType;
-        }
-        clone.setUseSecondaryKey(useSecondaryKey);
+        POLocalRearrange clone = (POLocalRearrange) super.clone();
+        // Constructor
+        clone.leafOps = new ArrayList<ExpressionOperator>();
+        clone.secondaryLeafOps = new ArrayList<ExpressionOperator>();
         // Needs to be called as setDistinct so that the fake index tuple gets
         // created.
         clone.setDistinct(mIsDistinct);
-        clone.setCross(isCross);
-        clone.addOriginalLocation(alias, getOriginalLocations());
-        clone.setStripKeyFromValue(stripKeyFromValue);
-
+        // Set the keyType to mainKeyType. setSecondaryPlans will calculate
+        // based on that and set keyType to the final value
+        if (useSecondaryKey) {
+            clone.keyType = mainKeyType;
+        }
         try {
             clone.setPlans(clonePlans(plans));
             if (secondaryPlans != null) {
@@ -745,14 +727,7 @@ public class POLocalRearrange extends Ph
             cnse.initCause(pe);
             throw cnse;
         }
-    }
-
-    private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws 
CloneNotSupportedException {
-        List<PhysicalPlan> clonePlans = new 
ArrayList<PhysicalPlan>(origPlans.size());
-        for (PhysicalPlan plan : origPlans) {
-            clonePlans.add(plan.clone());
-        }
-        return clonePlans;
+        return clone;
     }
 
     public boolean isCross() {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 Fri Mar  4 18:17:39 2016
@@ -581,7 +581,7 @@ public class POMergeCogroup extends Phys
                 } catch (ExecException e) {
 
                     // Alas, no choice but to throw Runtime exception.
-                    String errMsg = "Exception occured in compare() of heap in 
POMergeCogroup.";
+                    String errMsg = "Exception occurred in compare() of heap 
in POMergeCogroup.";
                     throw new RuntimeException(errMsg,e);
                 }
             } 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Fri Mar  4 18:17:39 2016
@@ -70,6 +70,12 @@ public class POMergeJoin extends Physica
 
     private static final long serialVersionUID = 1L;
 
+    private static final String keyOrderReminder = "Remember that you should " 
+
+        "not change the order of keys before a merge join in a FOREACH or " +
+        "manipulate join keys in a UDF in a way that would change the sort " +
+        "order. UDFs in a FOREACH are allowed as long as they do not change" +
+        "the join key values in a way that would change the sort order.\n";
+
     // flag to indicate when getNext() is called first.
     private boolean firstTime = true;
 
@@ -123,7 +129,7 @@ public class POMergeJoin extends Physica
 
     private String signature;
 
-    private byte endOfRecordMark;
+    private byte endOfRecordMark = POStatus.STATUS_NULL;
 
     // This serves as the default TupleFactory
     private transient TupleFactory mTupleFactory;
@@ -159,15 +165,6 @@ public class POMergeJoin extends Physica
         this.joinType = joinType;
         this.leftInputSchema = leftInputSchema;
         this.mergedInputSchema = mergedInputSchema;
-        this.endOfRecordMark = POStatus.STATUS_EOP;
-    }
-
-    // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL 
for Tez.
-    // This is because:
-    // For MR, we send EOP at the end of every record
-    // For Tez, we only use a global EOP, so send NULL for end of record
-    public void setEndOfRecordMark(byte endOfRecordMark) {
-        this.endOfRecordMark = endOfRecordMark;
     }
 
     /**
@@ -379,7 +376,9 @@ public class POMergeJoin extends Physica
                     }
                     else{   // At this point right side can't be behind.
                         int errCode = 1102;
-                        String errMsg = "Data is not sorted on right side. 
Last two tuples encountered were: \n"+
+                        String errMsg = "Data is not sorted on right side. \n" 
+
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
                     }    
@@ -407,7 +406,9 @@ public class POMergeJoin extends Physica
             }
             else{   // Current key < Prev Key
                 int errCode = 1102;
-                String errMsg = "Data is not sorted on left side. Last two 
keys encountered were: \n"+
+                String errMsg = "Data is not sorted on left side. \n" +
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n" +
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
@@ -477,7 +478,9 @@ public class POMergeJoin extends Physica
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
-                String errMsg = "Data is not sorted on right side. Last two 
keys encountered were: \n"+
+                String errMsg = "Data is not sorted on right side. \n" +
+                            keyOrderReminder +
+                            "Last two tuples encountered were: \n"+
                 prevRightKey+ "\n" + rightKey ;
                 throw new ExecException(errMsg,errCode);
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 Fri Mar  4 18:17:39 2016
@@ -17,22 +17,16 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.LinkedList;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * A specialized version of POForeach with the difference
@@ -45,10 +39,10 @@ import org.apache.pig.impl.util.Identity
 public class POOptimizedForEach extends POForEach {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
     public POOptimizedForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -64,7 +58,7 @@ public class POOptimizedForEach extends
     public POOptimizedForEach(OperatorKey k, List inp) {
         this(k,-1,inp,null);
     }
-    
+
     public POOptimizedForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, 
List<Boolean>  isToBeFlattened){
         super(k, rp);
         setUpFlattens(isToBeFlattened);
@@ -82,7 +76,7 @@ public class POOptimizedForEach extends
         String fString = getFlatStr();
         return "Optimized For Each" + "(" + fString + ")" + "[" + 
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
-    
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan and returns it maintaining an additional state
@@ -120,40 +114,25 @@ public class POOptimizedForEach extends
         //nested plan processing on the input tuple
         //read
         while (true) {
-            
-            // we know that input has been attached 
+
+            // we know that input has been attached
             attachInputToPlans(input);
             detachInput();
             res = processPlan();
-            
+
             processingPlan = true;
-            
+
             return res;
         }
     }
 
-    
+
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override
     public POOptimizedForEach clone() throws CloneNotSupportedException {
-        List<PhysicalPlan> plans = new
-            ArrayList<PhysicalPlan>(inputPlans.size());
-        for (PhysicalPlan plan : inputPlans) {
-            plans.add(plan.clone());
-        }
-        List<Boolean> flattens = null;
-        if(isToBeFlattenedArray != null ) {
-            flattens = new 
-            ArrayList<Boolean>(isToBeFlattenedArray.length);
-            for (boolean b : isToBeFlattenedArray) {
-                flattens.add(b);
-            }
-        }
-        return new POOptimizedForEach(new OperatorKey(mKey.scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism, plans, flattens);
+        return (POOptimizedForEach) super.clone();
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Fri Mar  4 18:17:39 2016
@@ -30,13 +30,11 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -74,9 +72,6 @@ public class POPackage extends PhysicalO
     //key, no value.
     protected int numInputs;
 
-    protected static final BagFactory mBagFactory = BagFactory.getInstance();
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     private boolean lastBagReadOnly = true;
 
     protected Packager pkgr;
@@ -87,58 +82,6 @@ public class POPackage extends PhysicalO
     private transient boolean useDefaultBag;
     private transient int accumulativeBatchSize;
 
-    //the pivot value
-    private int pivot = -1;
-    //the index of the first field involves in ROLLUP
-    protected int rollupFieldIndex = 0;
-    //the original index of the first field involves in ROLLUP in case it was 
moved to the end
-    //(if we have the combination of cube and rollup)
-    private int rollupOldFieldIndex = 0;
-    //the size of total fields that involve in CUBE clause
-    private int dimensionSize = 0;
-    //number of algebraic function that used after rollup
-    private int nAlgebraic = 0;
-
-    public void setPivot(int pvt) {
-        this.pivot = pvt;
-    }
-
-    public int getPivot() {
-        return this.pivot;
-    }
-
-    public void setDimensionSize(int ds) {
-        this.dimensionSize = ds;
-    }
-
-    public int getDimensionSize() {
-        return this.dimensionSize;
-    }
-
-    public void setNumberAlgebraic(int na) {
-        this.nAlgebraic = na;
-    }
-
-    public int getNumberAlgebraic() {
-        return this.nAlgebraic;
-    }
-
-    public void setRollupOldFieldIndex(int rofi) {
-        this.rollupOldFieldIndex = rofi;
-    }
-
-    public int getRollupOldFieldIndex() {
-        return this.rollupOldFieldIndex;
-    }
-
-    public void setRollupFieldIndex(int rfi) {
-        this.rollupFieldIndex = rfi;
-    }
-
-    public int getRollupFieldIndex() {
-        return this.rollupFieldIndex;
-    }
-
     public POPackage(OperatorKey k) {
         this(k, -1, null);
     }
@@ -292,8 +235,7 @@ public class POPackage extends PhysicalO
 
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
-                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
-                            .newDefaultBag()
+                    dbs[i] = useDefaultBag ? mBagFactory.newDefaultBag()
                             // In a very rare case if there is a POStream 
after this
                             // POPackage in the pipeline and is also blocking 
the
                             // pipeline;
@@ -309,8 +251,15 @@ public class POPackage extends PhysicalO
                     NullableTuple ntup = tupIter.next();
                     int index = ntup.getIndex();
                     if (index == numInputs - 1) {
-                        dbs[index] = new PeekedBag(pkgr, ntup, tupIter, 
keyWritable);
-                        break;
+                        if (pkgr.getUseSecondaryKey()) {
+                            if (dbs[index] == null) {
+                                dbs[index] = useDefaultBag ? mBagFactory
+                                        .newDefaultBag() : new 
InternalCachedBag(numInputs);
+                            }
+                        } else {
+                            dbs[index] = new PeekedBag(pkgr, ntup, tupIter, 
keyWritable);
+                            break;
+                        }
                     }
                     Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
 


Reply via email to