Author: rohini
Date: Tue Jan 12 23:21:32 2016
New Revision: 1724339

URL: http://svn.apache.org/viewvc?rev=1724339&view=rev
Log:
PIG-4737: Check and fix clone implementation for all classes extending 
PhysicalOperator (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/test/e2e/pig/tests/multiquery.conf
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 12 23:21:32 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4737: Check and fix clone implementation for all classes extending 
PhysicalOperator (rohini)
+
 PIG-4770: OOM with POPartialAgg in some cases (rohini)
 
 PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after 
union does ascending instead (rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Tue Jan 12 23:21:32 2016
@@ -31,6 +31,7 @@ import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -67,6 +68,8 @@ public abstract class PhysicalOperator e
     protected static final long serialVersionUID = 1L;
     protected static final Result RESULT_EMPTY = new 
Result(POStatus.STATUS_NULL, null);
     protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, 
null);
+    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
 
     // The degree of parallelism requested
     protected int requestedParallelism;
@@ -289,7 +292,7 @@ public abstract class PhysicalOperator e
         try {
             if (input == null && (inputs == null || inputs.size() == 0)) {
                 // log.warn("No inputs found. Signaling End of Processing.");
-                return new Result(POStatus.STATUS_EOP, null);
+                return RESULT_EOP;
             }
 
             // Should be removed once the model is clear
@@ -458,8 +461,11 @@ public abstract class PhysicalOperator e
     }
 
     /**
-     * Make a deep copy of this operator. This function is blank, however,
+     * Make a copy of this operator. This function is blank, however,
      * we should leave a place holder so that the subclasses can clone
+     * to make deep copy as this one creates a shallow copy of
+     * non-primitive types (objects, arrays and lists)
+     *
      * @throws CloneNotSupportedException
      */
     @Override
@@ -472,6 +478,14 @@ public abstract class PhysicalOperator e
         originalLocations.addAll(op.originalLocations);
     }
 
+    protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan> 
origPlans) throws CloneNotSupportedException {
+        List<PhysicalPlan> clonePlans = new 
ArrayList<PhysicalPlan>(origPlans.size());
+        for (PhysicalPlan plan : origPlans) {
+            clonePlans.add(plan.clone());
+        }
+        return clonePlans;
+    }
+
     /**
      * @param physicalPlan
      */

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 Tue Jan 12 23:21:32 2016
@@ -27,12 +27,10 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -51,10 +49,6 @@ public class POProject extends Expressio
      */
     private static final long serialVersionUID = 1L;
 
-    private static TupleFactory tupleFactory = TupleFactory.getInstance();
-
-    protected static final BagFactory bagFactory = BagFactory.getInstance();
-
     private boolean resultSingleTupleBag = false;
 
     //The column to project
@@ -191,7 +185,7 @@ public class POProject extends Expressio
             for(int col : columns) {
                 addColumn(objList, inpValue, col);
             }
-            ret = tupleFactory.newTupleNoCopy(objList);
+            ret = mTupleFactory.newTupleNoCopy(objList);
         }
         res.result = ret;
         illustratorMarkup(inpValue, res.result, -1);
@@ -277,20 +271,20 @@ public class POProject extends Expressio
                     for (int col : columns) {
                         addColumn(objList, tuple, col);
                     }
-                    outBag = new SingleTupleBag( 
tupleFactory.newTupleNoCopy(objList) );
+                    outBag = new SingleTupleBag( 
mTupleFactory.newTupleNoCopy(objList) );
                 }else {
                     Tuple tmpTuple = getRangeTuple(tuple);
                     outBag = new SingleTupleBag(tmpTuple);
                 }
             } else {
-                outBag = bagFactory.newDefaultBag();
+                outBag = mBagFactory.newDefaultBag();
                 for (Tuple tuple : inpBag) {
                     if(!isProjectToEnd){
                         ArrayList<Object> objList = new 
ArrayList<Object>(columns.size());
                         for (int col : columns) {
                             addColumn(objList, tuple, col);
                         }
-                        outBag.add( tupleFactory.newTupleNoCopy(objList) );
+                        outBag.add( mTupleFactory.newTupleNoCopy(objList) );
                     }else{
                         Tuple outTuple = getRangeTuple(tuple);
                         outBag.add(outTuple);
@@ -321,14 +315,14 @@ public class POProject extends Expressio
         Tuple outTuple;
         if(isRangeInvalid(lastColIdx)){
             //invalid range - return empty tuple
-            outTuple = tupleFactory.newTuple();
+            outTuple = mTupleFactory.newTuple();
         }
         else {
             ArrayList<Object> objList = new ArrayList<Object>(lastColIdx - 
startCol + 1);
             for(int i = startCol; i <= lastColIdx ; i++){
                 addColumn(objList, tuple, i);
             }
-            outTuple = tupleFactory.newTupleNoCopy(objList);
+            outTuple = mTupleFactory.newTupleNoCopy(objList);
         }
         return outTuple;
     }
@@ -451,7 +445,7 @@ public class POProject extends Expressio
                         objList.add(null);
                     }
                 }
-                ret = tupleFactory.newTuple(objList);
+                ret = mTupleFactory.newTuple(objList);
                 res.result = (Tuple)ret;
                 return res;
             }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
 Tue Jan 12 23:21:32 2016
@@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex
 
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * This is a base class for all unary comparison operators. Supports the
  * use of operand type instead of result type as the result type is
  * always boolean.
- * 
+ *
  */
 public abstract class UnaryComparisonOperator extends UnaryExpressionOperator
         implements ComparisonOperator {
@@ -35,7 +34,7 @@ public abstract class UnaryComparisonOpe
     //The result will be comunicated using the Status object.
     //This is a slight abuse of the status object.
     protected byte operandType;
-    
+
     public UnaryComparisonOperator(OperatorKey k) {
         this(k,-1);
     }
@@ -44,14 +43,16 @@ public abstract class UnaryComparisonOpe
         super(k, rp);
     }
 
+    @Override
     public byte getOperandType() {
         return operandType;
     }
 
+    @Override
     public void setOperandType(byte operandType) {
         this.operandType = operandType;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
@@ -59,4 +60,9 @@ public abstract class UnaryComparisonOpe
         }
         return null;
     }
+
+    protected void cloneHelper(UnaryComparisonOperator op) {
+        super.cloneHelper(op);
+        this.operandType = op.operandType;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
 Tue Jan 12 23:21:32 2016
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -69,8 +68,6 @@ public class POCounter extends PhysicalO
      **/
     private boolean isRowNumber = false;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     /**
      * Local counter for tuples on the same task.
      **/
@@ -321,4 +318,14 @@ public class POCounter extends PhysicalO
     public String getOperationID() {
         return operationID;
     }
+
+    @Override
+    public POCounter clone() throws CloneNotSupportedException {
+        POCounter clone = (POCounter)super.clone();
+        clone.localCount = new Long(localCount);
+        clone.taskID = new Integer(taskID);
+        // counterPlans and mAscCols unused. Not cloning them
+        return clone;
+    }
+
 }
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 Tue Jan 12 23:21:32 2016
@@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -37,20 +36,20 @@ import org.apache.pig.pen.util.LineageTr
 
 /**
  * Recover this class for nested cross operation.
- * 
- * 
+ *
+ *
  */
 public class POCross extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
 
-    protected DataBag[] inputBags;
+    protected transient DataBag[] inputBags;
 
-    protected Tuple[] data;
+    protected transient Tuple[] data;
 
     protected transient Iterator<Tuple>[] its;
-    
-    protected Tuple tupleOfLastBag;
+
+    protected transient Tuple tupleOfLastBag;
 
     public POCross(OperatorKey k) {
         super(k);
@@ -197,7 +196,7 @@ public class POCross extends PhysicalOpe
         its = new Iterator[length];
         for (int i = 0; i < length; ++i) {
             PhysicalOperator op = inputs.get(i);
-            DataBag bag = BagFactory.getInstance().newDefaultBag();
+            DataBag bag = mBagFactory.newDefaultBag();
             inputBags[count] = bag;
             for (Result res = op.getNextTuple(); res.returnStatus != 
POStatus.STATUS_EOP; res = op
                     .getNextTuple()) {
@@ -226,7 +225,7 @@ public class POCross extends PhysicalOpe
 
         return illustratorMarkup(out, out, 0);
     }
-    
+
     private boolean loadLastBag() throws ExecException {
         Result resOfLastBag = null;
         int index = inputs.size() - 1;
@@ -247,7 +246,7 @@ public class POCross extends PhysicalOpe
                     "Error accumulating data in the local Cross operator");
         }
     }
-    
+
     private void clearMemory() {
         // reset inputBags, its, data and tupleOfLastBag to null so that in the
         // next round of getNext, the new input data will be loaded.

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
 Tue Jan 12 23:21:32 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalDistinctBag;
@@ -49,9 +48,8 @@ import org.apache.pig.impl.plan.VisitorE
 public class PODistinct extends PhysicalOperator implements Cloneable {
     private static final Log log = LogFactory.getLog(PODistinct.class);
     private static final long serialVersionUID = 1L;
-    private boolean inputsAccumulated = false;
-    private DataBag distinctBag = null;
-
+    private transient boolean inputsAccumulated;
+    private transient DataBag distinctBag;
     private transient boolean initialized;
     private transient boolean useDefaultBag;
     private transient Iterator<Tuple> it;
@@ -102,7 +100,7 @@ public class PODistinct extends Physical
                      }
                  }
              }
-             distinctBag = useDefaultBag ? 
BagFactory.getInstance().newDistinctBag()
+             distinctBag = useDefaultBag ? mBagFactory.newDistinctBag()
                      : new InternalDistinctBag(3);
 
             Result in = processInput();

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Tue Jan 12 23:21:32 2016
@@ -46,7 +46,6 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -70,6 +69,7 @@ import org.apache.pig.impl.plan.VisitorE
 public class POFRJoin extends PhysicalOperator {
     private static final Log log = LogFactory.getLog(POFRJoin.class);
     private static final long serialVersionUID = 1L;
+
     // The number in the input list which denotes the fragmented input
     protected int fragment;
     // There can be n inputs each being a List<PhysicalPlan>
@@ -85,18 +85,7 @@ public class POFRJoin extends PhysicalOp
     protected ConstantExpression[] constExps;
     // Used to produce the cross product of various bags
     protected POForEach fe;
-    // The array of Hashtables one per replicated input. replicates[fragment] =
-    // null
-    // fragment is the input which is fragmented and not replicated.
-    protected TupleToMapKey replicates[];
-    // varaible which denotes whether we are returning tuples from the foreach
-    // operator
-    protected boolean processingPlan;
-    // A dummy tuple
-    protected Tuple dumTup = TupleFactory.getInstance().newTuple(1);
-    // An instance of tuple factory
-    protected transient TupleFactory mTupleFactory;
-    protected boolean setUp;
+
     // A Boolean variable which denotes if this is a LeftOuter Join or an Inner
     // Join
     protected boolean isLeftOuterJoin;
@@ -106,6 +95,16 @@ public class POFRJoin extends PhysicalOp
     protected Schema[] inputSchemas;
     protected Schema[] keySchemas;
 
+    // The array of Hashtables one per replicated input. replicates[fragment] =
+    // null fragment is the input which is fragmented and not replicated.
+    protected transient TupleToMapKey replicates[];
+    // varaible which denotes whether we are returning tuples from the foreach
+    // operator
+    protected transient boolean processingPlan;
+    // A dummy tuple
+    protected transient Tuple dumTup;
+    protected transient boolean setUp;
+
     public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
             List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
             FileSpec[] replFiles, int fragment, boolean isLeftOuter,
@@ -126,12 +125,10 @@ public class POFRJoin extends PhysicalOp
         this.fragment = fragment;
         this.keyTypes = keyTypes;
         this.replFiles = replFiles;
-        replicates = new TupleToMapKey[ppLists.size()];
+
         LRs = new POLocalRearrange[ppLists.size()];
         constExps = new ConstantExpression[ppLists.size()];
         createJoinPlans(k);
-        processingPlan = false;
-        mTupleFactory = TupleFactory.getInstance();
         List<Tuple> tupList = new ArrayList<Tuple>();
         tupList.add(nullTuple);
         nullBag = new NonSpillableDataBag(tupList);
@@ -159,7 +156,6 @@ public class POFRJoin extends PhysicalOp
         this.fe = copy.fe;
         this.constExps = copy.constExps;
         this.processingPlan = copy.processingPlan;
-        this.mTupleFactory = copy.mTupleFactory;
         this.nullBag = copy.nullBag;
         this.isLeftOuterJoin = copy.isLeftOuterJoin;
         this.inputSchemas = copy.inputSchemas;
@@ -173,7 +169,7 @@ public class POFRJoin extends PhysicalOp
 
     /**
      * Configures the Local Rearrange operators & the foreach operator
-     * 
+     *
      * @param old
      * @throws ExecException
      */
@@ -238,6 +234,8 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
+            replicates = new TupleToMapKey[phyPlanLists.size()];
+            dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
         }
@@ -254,7 +252,7 @@ public class POFRJoin extends PhysicalOp
                 if (res.returnStatus == POStatus.STATUS_EOP) {
                     // We have completed all cross-products now its time to 
move
                     // to next tuple of left side
-                    processingPlan = false;                    
+                    processingPlan = false;
                     break;
                 }
                 if (res.returnStatus == POStatus.STATUS_ERR) {
@@ -284,7 +282,7 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Tuple key = TupleFactory.getInstance().newTuple(1);
+            Tuple key = mTupleFactory.newTuple(1);
             key.set(0, lrOutTuple.get(1));
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
@@ -390,9 +388,9 @@ public class POFRJoin extends PhysicalOp
 
             POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
                     replFile);
-            
+
             Properties props = ConfigurationUtil.getLocalFSProperties();
-            PigContext pc = new PigContext(ExecType.LOCAL, props);   
+            PigContext pc = new PigContext(ExecType.LOCAL, props);
             ld.setPc(pc);
             // We use LocalRearrange Operator to seperate Key and Values
             // eg. ( a, b, c ) would generate a, ( a, b, c )
@@ -437,11 +435,10 @@ public class POFRJoin extends PhysicalOp
         }
         return false;
     }
-    
+
     private void readObject(ObjectInputStream is) throws IOException,
             ClassNotFoundException, ExecException {
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
         // setUpHashTable();
     }
 
@@ -531,4 +528,28 @@ public class POFRJoin extends PhysicalOp
         // no op: all handled by the preceding POForEach
         return null;
     }
+
+    @Override
+    public POFRJoin clone() throws CloneNotSupportedException {
+        POFRJoin clone = (POFRJoin) super.clone();
+        // Not doing deep copy of nullBag, nullBag, inputSchemas, keySchemas
+        // as they are read only
+        clone.phyPlanLists = new 
ArrayList<List<PhysicalPlan>>(phyPlanLists.size());
+        for (List<PhysicalPlan> ppLst : phyPlanLists) {
+            clone.phyPlanLists.add(clonePlans(ppLst));
+        }
+
+        clone.LRs = new POLocalRearrange[phyPlanLists.size()];
+        clone.constExps = new ConstantExpression[phyPlanLists.size()];
+        try {
+            clone.createJoinPlans(getOperatorKey());
+        } catch (ExecException e) {
+            CloneNotSupportedException cnse = new 
CloneNotSupportedException("Problem with setting plans of " + 
this.getClass().getSimpleName());
+            cnse.initCause(e);
+            throw cnse;
+        }
+        return clone;
+    }
+
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
 Tue Jan 12 23:21:32 2016
@@ -49,17 +49,16 @@ public class POFilter extends PhysicalOp
     private static final long serialVersionUID = 1L;
 
     // The expression plan
-    PhysicalPlan plan;
+    private PhysicalPlan plan;
 
     // The root comparison operator of the expression plan
-//    ComparisonOperator comOp;
-    PhysicalOperator comOp;
-
+    // ComparisonOperator comOp;
+    private PhysicalOperator comOp;
 
     // The operand type for the comparison operator needed
     // to call the comparison operators getNext with the
     // appropriate type
-    byte compOperandType;
+    // private byte compOperandType;
 
     public POFilter(OperatorKey k) {
         this(k, -1, null);
@@ -205,8 +204,7 @@ public class POFilter extends PhysicalOp
 
     @Override
     public PhysicalOperator clone() throws CloneNotSupportedException {
-        Object o = super.clone();
-        POFilter opClone = (POFilter)o;
+        POFilter opClone = (POFilter) super.clone();
         opClone.setPlan(plan.clone());
         return opClone;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Tue Jan 12 23:21:32 2016
@@ -57,26 +57,13 @@ public class POForEach extends PhysicalO
     private static final long serialVersionUID = 1L;
 
     protected List<PhysicalPlan> inputPlans;
-    protected List<PhysicalOperator> opsToBeReset;
-    //Since the plan has a generate, this needs to be maintained
-    //as the generate can potentially return multiple tuples for
-    //same call.
-    protected boolean processingPlan = false;
-
-    //its holds the iterators of the databags given by the input expressions 
which need flattening.
-    transient protected Iterator<Tuple> [] its = null;
 
-    //This holds the outputs given out by the input expressions of any datatype
-    protected Object [] bags = null;
+    protected List<PhysicalOperator> opsToBeReset;
 
-    //This is the template whcih contains tuples and is flattened out in 
createTuple() to generate the final output
-    protected Object[] data = null;
+    protected PhysicalOperator[] planLeafOps;
 
     // store result types of the plan leaves
-    protected byte[] resultTypes = null;
-
-    // store whether or not an accumulative UDF has terminated early
-    protected BitSet earlyTermination = null;
+    protected byte[] resultTypes;
 
     // array version of isToBeFlattened - this is purely
     // for optimization - instead of calling isToBeFlattened.get(i)
@@ -85,16 +72,33 @@ public class POForEach extends PhysicalO
     // so we can also save on the Boolean.booleanValue() calls
     protected boolean[] isToBeFlattenedArray;
 
-    ExampleTuple tIn = null;
     protected int noItems;
 
-    protected PhysicalOperator[] planLeafOps = null;
+    //Since the plan has a generate, this needs to be maintained
+    //as the generate can potentially return multiple tuples for
+    //same call.
+    protected transient boolean processingPlan;
+
+    //its holds the iterators of the databags given by the input expressions 
which need flattening.
+    protected transient Iterator<Tuple> [] its = null;
+
+    //This holds the outputs given out by the input expressions of any datatype
+    protected transient Object[] bags ;
+
+    //This is the template whcih contains tuples and is flattened out in 
createTuple() to generate the final output
+    protected transient Object[] data;
+
+    // store whether or not an accumulative UDF has terminated early
+    protected transient BitSet earlyTermination;
+
+    protected transient ExampleTuple tIn;
+
 
     protected transient AccumulativeTupleBuffer buffer;
 
-    protected Tuple inpTuple;
+    protected transient Tuple inpTuple;
 
-    protected boolean endOfAllInputProcessed = false;
+    protected transient boolean endOfAllInputProcessed;
 
     // Indicate the foreach statement can only in map side
     // Currently only used in MR cross (See PIG-4175)
@@ -372,7 +376,7 @@ public class POForEach extends PhysicalO
 
         if(its == null) {
             if (endOfAllInputProcessed) {
-                return new Result(POStatus.STATUS_EOP, null);
+                return RESULT_EOP;
             }
             //getNext being called for the first time OR starting with a set 
of new data from inputs
             its = new Iterator[noItems];
@@ -688,6 +692,8 @@ public class POForEach extends PhysicalO
         clone.setOpsToBeReset(ops);
         clone.setResultType(getResultType());
         clone.addOriginalLocation(alias, getOriginalLocations());
+        clone.endOfAllInputProcessing = endOfAllInputProcessing;
+        clone.mapSideOnly = mapSideOnly;
         return clone;
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
 Tue Jan 12 23:21:32 2016
@@ -27,7 +27,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -38,14 +37,14 @@ public class POLimit extends PhysicalOpe
      */
     private static final long serialVersionUID = 1L;
 
-    // Counts for outputs processed
-    private long soFar = 0;
-
     // Number of limited outputs
-    long mLimit;
+    private long mLimit;
 
     // The expression plan
-    PhysicalPlan expressionPlan;
+    private PhysicalPlan expressionPlan;
+
+    // Counts for outputs processed
+    private transient long soFar = 0;
 
     public POLimit(OperatorKey k) {
         this(k, -1, null);
@@ -159,15 +158,11 @@ public class POLimit extends PhysicalOpe
 
     @Override
     public POLimit clone() throws CloneNotSupportedException {
-        POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
-            this.requestedParallelism, this.inputs);
-        newLimit.mLimit = this.mLimit;
+        POLimit clone = (POLimit) super.clone();
         if (this.expressionPlan != null) {
-            newLimit.expressionPlan = this.expressionPlan.clone();
+            clone.expressionPlan = this.expressionPlan.clone();
         }
-        newLimit.addOriginalLocation(alias, getOriginalLocations());
-        return newLimit;
+        return clone;
     }
 
     @Override

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
 Tue Jan 12 23:21:32 2016
@@ -35,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -57,8 +55,6 @@ public class POLocalRearrange extends Ph
      */
     protected static final long serialVersionUID = 1L;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
@@ -82,8 +78,6 @@ public class POLocalRearrange extends Ph
 
     protected boolean isCross = false;
 
-    protected Result inp;
-
     // map to store mapping of projected columns to
     // the position in the "Key" where these will be projected to.
     // We use this information to strip off these columns
@@ -133,6 +127,8 @@ public class POLocalRearrange extends Ph
     // By default, we strip keys from the value.
     private boolean stripKeyFromValue = true;
 
+    protected transient Result inp;
+
     public POLocalRearrange(OperatorKey k) {
         this(k, -1, null);
     }
@@ -709,32 +705,18 @@ public class POLocalRearrange extends Ph
      */
     @Override
     public POLocalRearrange clone() throws CloneNotSupportedException {
-        POLocalRearrange clone = new POLocalRearrange(new OperatorKey(
-            mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism);
-        deepCopyTo(clone);
-        return clone;
-    }
-
-    protected void deepCopyTo(POLocalRearrange clone)
-            throws CloneNotSupportedException {
-
-        clone.setParentPlan(parentPlan);
-        clone.index = index;
-        if (useSecondaryKey) {
-            clone.keyType = mainKeyType;
-        } else {
-            clone.keyType = keyType;
-        }
-        clone.setUseSecondaryKey(useSecondaryKey);
+        POLocalRearrange clone = (POLocalRearrange) super.clone();
+        // Constructor
+        clone.leafOps = new ArrayList<ExpressionOperator>();
+        clone.secondaryLeafOps = new ArrayList<ExpressionOperator>();
         // Needs to be called as setDistinct so that the fake index tuple gets
         // created.
         clone.setDistinct(mIsDistinct);
-        clone.setCross(isCross);
-        clone.addOriginalLocation(alias, getOriginalLocations());
-        clone.setStripKeyFromValue(stripKeyFromValue);
-
+        // Set the keyType to mainKeyType. setSecondaryPlans will calculate
+        // based on that and set keyType to the final value
+        if (useSecondaryKey) {
+            clone.keyType = mainKeyType;
+        }
         try {
             clone.setPlans(clonePlans(plans));
             if (secondaryPlans != null) {
@@ -745,14 +727,7 @@ public class POLocalRearrange extends Ph
             cnse.initCause(pe);
             throw cnse;
         }
-    }
-
-    private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws 
CloneNotSupportedException {
-        List<PhysicalPlan> clonePlans = new 
ArrayList<PhysicalPlan>(origPlans.size());
-        for (PhysicalPlan plan : origPlans) {
-            clonePlans.add(plan.clone());
-        }
-        return clonePlans;
+        return clone;
     }
 
     public boolean isCross() {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
 Tue Jan 12 23:21:32 2016
@@ -17,22 +17,16 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.LinkedList;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * A specialized version of POForeach with the difference
@@ -45,10 +39,10 @@ import org.apache.pig.impl.util.Identity
 public class POOptimizedForEach extends POForEach {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 1L;
-    
+
     public POOptimizedForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -64,7 +58,7 @@ public class POOptimizedForEach extends
     public POOptimizedForEach(OperatorKey k, List inp) {
         this(k,-1,inp,null);
     }
-    
+
     public POOptimizedForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, 
List<Boolean>  isToBeFlattened){
         super(k, rp);
         setUpFlattens(isToBeFlattened);
@@ -82,7 +76,7 @@ public class POOptimizedForEach extends
         String fString = getFlatStr();
         return "Optimized For Each" + "(" + fString + ")" + "[" + 
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
     }
-    
+
     /**
      * Calls getNext on the generate operator inside the nested
      * physical plan and returns it maintaining an additional state
@@ -120,40 +114,25 @@ public class POOptimizedForEach extends
         //nested plan processing on the input tuple
         //read
         while (true) {
-            
-            // we know that input has been attached 
+
+            // we know that input has been attached
             attachInputToPlans(input);
             detachInput();
             res = processPlan();
-            
+
             processingPlan = true;
-            
+
             return res;
         }
     }
 
-    
+
     /**
-     * Make a deep copy of this operator.  
+     * Make a deep copy of this operator.
      * @throws CloneNotSupportedException
      */
     @Override
     public POOptimizedForEach clone() throws CloneNotSupportedException {
-        List<PhysicalPlan> plans = new
-            ArrayList<PhysicalPlan>(inputPlans.size());
-        for (PhysicalPlan plan : inputPlans) {
-            plans.add(plan.clone());
-        }
-        List<Boolean> flattens = null;
-        if(isToBeFlattenedArray != null ) {
-            flattens = new 
-            ArrayList<Boolean>(isToBeFlattenedArray.length);
-            for (boolean b : isToBeFlattenedArray) {
-                flattens.add(b);
-            }
-        }
-        return new POOptimizedForEach(new OperatorKey(mKey.scope, 
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism, plans, flattens);
+        return (POOptimizedForEach) super.clone();
     }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
 Tue Jan 12 23:21:32 2016
@@ -30,13 +30,11 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -74,9 +72,6 @@ public class POPackage extends PhysicalO
     //key, no value.
     protected int numInputs;
 
-    protected static final BagFactory mBagFactory = BagFactory.getInstance();
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     private boolean lastBagReadOnly = true;
 
     protected Packager pkgr;
@@ -240,8 +235,7 @@ public class POPackage extends PhysicalO
 
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
-                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
-                            .newDefaultBag()
+                    dbs[i] = useDefaultBag ? mBagFactory.newDefaultBag()
                             // In a very rare case if there is a POStream 
after this
                             // POPackage in the pipeline and is also blocking 
the
                             // pipeline;
@@ -259,7 +253,7 @@ public class POPackage extends PhysicalO
                     if (index == numInputs - 1) {
                         if (pkgr.getUseSecondaryKey()) {
                             if (dbs[index] == null) {
-                                dbs[index] = useDefaultBag ? 
BagFactory.getInstance()
+                                dbs[index] = useDefaultBag ? mBagFactory
                                         .newDefaultBag() : new 
InternalCachedBag(numInputs);
                             }
                         } else {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 Tue Jan 12 23:21:32 2016
@@ -41,7 +41,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.SelfSpillBag.MemoryLimits;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.GroupingSpillable;
@@ -92,8 +91,6 @@ public class POPartialAgg extends Physic
 
     private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new 
WeakHashMap<POPartialAgg, Byte>();
 
-    private static final TupleFactory TF = TupleFactory.getInstance();
-
     private PhysicalPlan keyPlan;
     private ExpressionOperator keyLeaf;
     private List<PhysicalPlan> valuePlans;
@@ -496,7 +493,7 @@ public class POPartialAgg extends Physic
     }
 
     private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws 
ExecException {
-        Tuple valueTuple = TF.newTuple(valuePlans.size() + 1);
+        Tuple valueTuple = mTupleFactory.newTuple(valuePlans.size() + 1);
         valueTuple.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
@@ -576,7 +573,7 @@ public class POPartialAgg extends Physic
      * @throws ExecException
      */
     private Result getOutput(Object key, Tuple value) throws ExecException {
-        Tuple output = TF.newTuple(valuePlans.size() + 1);
+        Tuple output = mTupleFactory.newTuple(valuePlans.size() + 1);
         output.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
@@ -657,4 +654,14 @@ public class POPartialAgg extends Physic
         return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
     }
 
+    @Override
+    public PhysicalOperator clone() throws CloneNotSupportedException {
+        POPartialAgg clone = (POPartialAgg) super.clone();
+        clone.setKeyPlan(keyPlan.clone());
+        clone.setValuePlans(clonePlans(valuePlans));
+        return clone;
+    }
+
+
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
 Tue Jan 12 23:21:32 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -39,8 +38,6 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 
-import com.google.common.collect.Maps;
-
 /**
  * The partition rearrange operator is a part of the skewed join
  * implementation. It has an embedded physical plan that
@@ -50,12 +47,11 @@ import com.google.common.collect.Maps;
 public class POPartitionRearrange extends POLocalRearrange {
 
     private static final long serialVersionUID = 1L;
-    private static final BagFactory mBagFactory = BagFactory.getInstance();
 
-    private Integer totalReducers = -1;
+    private transient Integer totalReducers;
     // ReducerMap will store the tuple, max reducer index & min reducer index
-    private Map<Object, Pair<Integer, Integer> > reducerMap = 
Maps.newHashMap();
-    private boolean inited;
+    private transient Map<Object, Pair<Integer, Integer> > reducerMap;
+    private transient boolean inited;
 
     private PigContext pigContext;
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
 Tue Jan 12 23:21:32 2016
@@ -23,7 +23,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -32,51 +31,43 @@ public class POPoissonSample extends Phy
 
     private static final long serialVersionUID = 1L;
 
-    private static final TupleFactory tf = TupleFactory.getInstance();
-    private static Result eop = new Result(POStatus.STATUS_EOP, null);
+    // 17 is not a magic number. It can be obtained by using a poisson
+    // cumulative distribution function with the mean set to 10 (empirically,
+    // minimum number of samples) and the confidence set to 95%
+    public static final int DEFAULT_SAMPLE_RATE = 17;
+
+    private int sampleRate = 0;
+
+    private float heapPerc = 0f;
+
+    private Long totalMemory;
+
+    private transient boolean initialized;
 
     // num of rows sampled so far
-    private int numRowsSampled = 0;
+    private transient int numRowsSampled;
 
     // average size of tuple in memory, for tuples sampled
-    private long avgTupleMemSz = 0;
+    private transient long avgTupleMemSz;
 
     // current row number
-    private long rowNum = 0;
+    private transient long rowNum;
 
     // number of tuples to skip after each sample
-    private long skipInterval = -1;
+    private transient long skipInterval;
 
     // bytes in input to skip after every sample.
     // divide this by avgTupleMemSize to get skipInterval
-    private long memToSkipPerSample = 0;
+    private transient long memToSkipPerSample;
 
     // has the special row with row number information been returned
-    private boolean numRowSplTupleReturned = false;
-
-    // 17 is not a magic number. It can be obtained by using a poisson
-    // cumulative distribution function with the mean set to 10 (empirically,
-    // minimum number of samples) and the confidence set to 95%
-    public static final int DEFAULT_SAMPLE_RATE = 17;
-
-    private int sampleRate = 0;
-
-    private float heapPerc = 0f;
-
-    private long totalMemory = Runtime.getRuntime().maxMemory();
+    private transient boolean numRowSplTupleReturned;
 
     // new Sample result
-    private Result newSample = null;
+    private transient Result newSample;
 
     public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
         super(k, rp, null);
-        numRowsSampled = 0;
-        avgTupleMemSz = 0;
-        rowNum = 0;
-        skipInterval = -1;
-        memToSkipPerSample = 0;
-        numRowSplTupleReturned = false;
-        newSample = null;
         sampleRate = sr;
         heapPerc = hp;
         if (tm != -1) {
@@ -97,10 +88,22 @@ public class POPoissonSample extends Phy
 
     @Override
     public Result getNextTuple() throws ExecException {
+        if (!initialized) {
+            numRowsSampled = 0;
+            avgTupleMemSz = 0;
+            rowNum = 0;
+            skipInterval = -1;
+            memToSkipPerSample = 0;
+            if (totalMemory == null) {
+                // Initialize in backend to get memory of task
+                totalMemory = Runtime.getRuntime().maxMemory();
+            }
+            initialized = true;
+        }
         if (numRowSplTupleReturned) {
             // row num special row has been returned after all inputs
             // were read, nothing more to read
-            return eop;
+            return RESULT_EOP;
         }
 
         Result res = null;
@@ -216,7 +219,7 @@ public class POPoissonSample extends Phy
      */
     private Result createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        Tuple t = tf.newTuple(sz + 2);
+        Tuple t = mTupleFactory.newTuple(sz + 2);
 
         if (sample != null) {
             for (int i=0; i<sample.size(); i++){

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
 Tue Jan 12 23:21:32 2016
@@ -29,12 +29,10 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -53,9 +51,6 @@ public class POPreCombinerLocalRearrange
 
     protected static final long serialVersionUID = 1L;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-    protected static BagFactory mBagFactory = BagFactory.getInstance();
-
     private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
@@ -223,4 +218,12 @@ public class POPreCombinerLocalRearrange
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
+
+    @Override
+    public POPreCombinerLocalRearrange clone() throws 
CloneNotSupportedException {
+        POPreCombinerLocalRearrange clone = (POPreCombinerLocalRearrange) 
super.clone();
+        clone.leafOps = new ArrayList<ExpressionOperator>();
+        clone.setPlans(clonePlans(plans));
+        return clone;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
 Tue Jan 12 23:21:32 2016
@@ -32,7 +32,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -55,8 +54,6 @@ public class PORank extends PhysicalOper
     private List<Boolean> mAscCols;
     private List<Byte> ExprOutputTypes;
 
-    protected static final TupleFactory mTupleFactory = 
TupleFactory.getInstance();
-
     /**
      * Unique identifier that links POCounter and PORank,
      * through the global counter labeled with it.
@@ -230,4 +227,11 @@ public class PORank extends PhysicalOper
     public String getOperationID() {
         return operationID;
     }
+
+    @Override
+    public PORank clone() throws CloneNotSupportedException {
+        PORank clone = (PORank)super.clone();
+        // rankPlans, mAscCols, ExprOutputTypes are unused. Not cloning them
+        return clone;
+    }
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
 Tue Jan 12 23:21:32 2016
@@ -26,31 +26,28 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POReservoirSample extends PhysicalOperator {
 
-    private static final TupleFactory tf = TupleFactory.getInstance();
-
     private static final long serialVersionUID = 1L;
 
     // number of samples to be sampled
     protected int numSamples;
 
-    private transient int nextSampleIdx= 0;
+    private transient int nextSampleIdx = 0;
 
-    private int rowProcessed = 0;
+    private transient int rowProcessed = 0;
 
-    private boolean sampleCollectionDone = false;
+    private transient boolean sampleCollectionDone = false;
 
     //array to store the result
     private transient Result[] samples = null;
 
     // last sample result
-    private Result lastSample = null;
+    private transient Result lastSample = null;
 
     public POReservoirSample(OperatorKey k) {
         this(k, -1, null);
@@ -170,14 +167,12 @@ public class POReservoirSample extends P
             }
             Result res = samples[nextSampleIdx++];
             if (res == null) { // Input data has lesser rows than numSamples
-                return new Result(POStatus.STATUS_NULL, null);
+                return RESULT_EMPTY;
             }
             return res;
         }
         else{
-            Result res;
-            res = new Result(POStatus.STATUS_EOP, null);
-            return res;
+            return RESULT_EOP;
         }
     }
 
@@ -203,7 +198,7 @@ public class POReservoirSample extends P
      */
     private Result createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        Tuple t = tf.newTuple(sz + 2);
+        Tuple t = mTupleFactory.newTuple(sz + 2);
 
         if (sample != null) {
             for (int i=0; i<sample.size(); i++){

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
 Tue Jan 12 23:21:32 2016
@@ -36,12 +36,10 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -74,11 +72,11 @@ public class POSort extends PhysicalOper
        private POUserComparisonFunc mSortFunc;
        private Comparator<Tuple> mComparator;
 
-       private boolean inputsAccumulated = false;
        private long limit;
        public boolean isUDFComparatorUsed = false;
-       private DataBag sortedBag;
 
+       private transient boolean inputsAccumulated = false;
+       private transient DataBag sortedBag;
     private transient Iterator<Tuple> it;
     private transient boolean initialized;
     private transient boolean useDefaultBag;
@@ -95,22 +93,22 @@ public class POSort extends PhysicalOper
                this.sortPlans = sortPlans;
                this.mAscCols = mAscCols;
         this.limit = -1;
-               this.mSortFunc = mSortFunc;
-               if (mSortFunc == null) {
+        setSortFunc(mSortFunc);
+       }
+
+       private void setSortFunc(POUserComparisonFunc mSortFunc) {
+           this.mSortFunc = mSortFunc;
+        if (mSortFunc == null) {
             mComparator = new SortComparator();
-                       /*sortedBag = BagFactory.getInstance().newSortedBag(
-                                       new SortComparator());*/
-                       ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+            ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
 
-                       for(PhysicalPlan plan : sortPlans) {
-                               
ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
-                       }
-               } else {
-                       /*sortedBag = BagFactory.getInstance().newSortedBag(
-                                       new UDFSortComparator());*/
+            for(PhysicalPlan plan : sortPlans) {
+                ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
+            }
+        } else {
             mComparator = new UDFSortComparator();
-                       isUDFComparatorUsed = true;
-               }
+            isUDFComparatorUsed = true;
+        }
        }
 
        public POSort(OperatorKey k, int rp, List inp) {
@@ -271,7 +269,7 @@ public class POSort extends PhysicalOper
             }
                        // by default, we create InternalSortedBag, unless user 
configures
             // explicitly to use old bag
-            sortedBag = useDefaultBag ? 
BagFactory.getInstance().newSortedBag(mComparator)
+            sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
                     : new InternalSortedBag(3, mComparator);
 
             while (res.returnStatus != POStatus.STATUS_EOP) {
@@ -363,23 +361,19 @@ public class POSort extends PhysicalOper
 
     @Override
     public POSort clone() throws CloneNotSupportedException {
-        List<PhysicalPlan> clonePlans = new
-            ArrayList<PhysicalPlan>(sortPlans.size());
-        for (PhysicalPlan plan : sortPlans) {
-            clonePlans.add(plan.clone());
+        POSort clone = (POSort) super.clone();
+        clone.sortPlans = clonePlans(sortPlans);
+        if (mSortFunc == null) {
+            setSortFunc(null);
+        } else {
+            setSortFunc(mSortFunc.clone());
         }
         List<Boolean> cloneAsc = new ArrayList<Boolean>(mAscCols.size());
         for (Boolean b : mAscCols) {
             cloneAsc.add(b);
         }
-        POUserComparisonFunc cloneFunc = null;
-        if (mSortFunc != null) {
-            cloneFunc = mSortFunc.clone();
-        }
-        // Don't set inputs as PhysicalPlan.clone will take care of that
-        return new POSort(new OperatorKey(mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism, null, clonePlans, cloneAsc, cloneFunc);
+        clone.mAscCols = cloneAsc;
+        return clone;
     }
 
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
 Tue Jan 12 23:21:32 2016
@@ -89,9 +89,7 @@ public class POSplit extends PhysicalOpe
 
     private BitSet processedSet = new BitSet();
 
-    private static Result empty = new Result(POStatus.STATUS_NULL, null);
-
-    private boolean inpEOP = false;
+    private transient boolean inpEOP = false;
 
     /**
      * Constructs an operator with the specified key
@@ -243,7 +241,7 @@ public class POSplit extends PhysicalOpe
             }
         }
 
-        return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+        return (res.returnStatus == POStatus.STATUS_OK) ? res : RESULT_EMPTY;
     }
 
     private Result runPipeline(PhysicalOperator leaf) throws ExecException {
@@ -321,13 +319,9 @@ public class POSplit extends PhysicalOpe
 
     @Override
     public POSplit clone() throws CloneNotSupportedException {
-        Object o = super.clone();
-        POSplit opClone = (POSplit)o;
+        POSplit opClone = (POSplit) super.clone();
         opClone.processedSet = new BitSet();
-        opClone.myPlans = new ArrayList<PhysicalPlan>(myPlans.size());
-        for (PhysicalPlan plan : myPlans) {
-            opClone.myPlans.add(plan.clone());
-        }
+        opClone.myPlans = clonePlans(myPlans);
         return opClone;
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
 Tue Jan 12 23:21:32 2016
@@ -50,7 +50,6 @@ import org.apache.pig.tools.pigstats.Pig
 public class POStore extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
-    protected static Result empty = new Result(POStatus.STATUS_NULL, null);
     transient private StoreFuncInterface storer;
     transient private StoreFuncDecorator sDecorator;
     transient private POStoreImpl impl;
@@ -117,7 +116,7 @@ public class POStore extends PhysicalOpe
     public void setUp() throws IOException{
         if (impl != null) {
             try{
-                storer = impl.createStoreFunc(this); 
+                storer = impl.createStoreFunc(this);
                 if (!isTmpStore && !disableCounter && impl instanceof 
MapReducePOStoreImpl) {
                     counterName = PigStatsUtil.getMultiStoreCounterName(this);
                     if (counterName != null) {
@@ -165,7 +164,7 @@ public class POStore extends PhysicalOpe
                     sDecorator.putNext((Tuple) res.result);
                 } else
                     illustratorMarkup(res.result, res.result, 0);
-                res = empty;
+                res = RESULT_EMPTY;
 
                 if (counterName != null) {
                     ((MapReducePOStoreImpl) 
impl).incrRecordCounter(counterName, 1);
@@ -256,19 +255,19 @@ public class POStore extends PhysicalOpe
         }
         return storer;
     }
-    
+
     void setStoreFuncDecorator(StoreFuncDecorator sDecorator) {
         this.sDecorator = sDecorator;
     }
 
     /**
-     * 
+     *
      * @return The {@link StoreFuncDecorator} used to write Tuples
      */
     public StoreFuncDecorator getStoreFuncDecorator() {
         return sDecorator;
     }
-    
+
     /**
      * @param sortInfo the sortInfo to set
      */

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
 Tue Jan 12 23:21:32 2016
@@ -43,22 +43,21 @@ import org.apache.pig.pen.util.ExampleTu
 public class POStream extends PhysicalOperator {
     private static final long serialVersionUID = 2L;
 
-    private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, 
null);
-
     private String executableManagerStr;            // String representing 
ExecutableManager to use
-    transient private ExecutableManager executableManager;    // 
ExecutableManager to use
     private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
-    private boolean initialized = false;
-
     protected BlockingQueue<Result> binaryOutputQueue = new 
ArrayBlockingQueue<Result>(1);
 
     protected BlockingQueue<Result> binaryInputQueue = new 
ArrayBlockingQueue<Result>(1);
 
-    protected boolean allInputFromPredecessorConsumed = false;
+    private transient ExecutableManager executableManager;    // 
ExecutableManager to use
+
+    private transient boolean initialized = false;
+
+    protected transient boolean allInputFromPredecessorConsumed = false;
 
-    protected boolean allOutputFromBinaryProcessed = false;
+    protected transient boolean allOutputFromBinaryProcessed = false;
 
     /**
      * This flag indicates whether streaming is done through fetching. If set,
@@ -143,7 +142,7 @@ public class POStream extends PhysicalOp
             // The "allOutputFromBinaryProcessed" flag is set when we see
             // an EOS (End of Stream output) from streaming binary
             if(allOutputFromBinaryProcessed) {
-                return EOP_RESULT;
+                return RESULT_EOP;
             }
 
             // if we are here AFTER all map() calls have been completed
@@ -158,7 +157,7 @@ public class POStream extends PhysicalOp
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r = EOP_RESULT;
+                    r = RESULT_EOP;
                     allOutputFromBinaryProcessed = true;
                 } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
@@ -191,7 +190,7 @@ public class POStream extends PhysicalOp
                             // So we can send an EOP to the successor in
                             // the pipeline and also note this condition
                             // for future calls
-                            r = EOP_RESULT;
+                            r = RESULT_EOP;
                             allOutputFromBinaryProcessed = true;
                         }
                     }
@@ -202,7 +201,7 @@ public class POStream extends PhysicalOp
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r = EOP_RESULT;
+                    r = RESULT_EOP;
                     allOutputFromBinaryProcessed = true;
                 } else if (r.returnStatus == POStatus.STATUS_OK) {
                   illustratorMarkup(r.result, r.result, 0);
@@ -218,7 +217,7 @@ public class POStream extends PhysicalOp
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r = EOP_RESULT;
+                    r = RESULT_EOP;
                     allOutputFromBinaryProcessed  = true;
                 } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
@@ -297,8 +296,9 @@ public class POStream extends PhysicalOp
 
                         // wait for either input to be available
                         // or output to be consumed
-                        while(binaryOutputQueue.isEmpty() && 
!binaryInputQueue.isEmpty())
+                        while(binaryOutputQueue.isEmpty() && 
!binaryInputQueue.isEmpty()) {
                             wait();
+                        }
 
                     }
                 }
@@ -382,4 +382,13 @@ public class POStream extends PhysicalOp
         this.isFetchable = isFetchable;
     }
 
+    @Override
+    public PhysicalOperator clone() throws CloneNotSupportedException {
+        POStream clone = (POStream)super.clone();
+        clone.binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+        clone.binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+        //Not cloning StreamingCommand as it is read only
+        return clone;
+    }
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Tue Jan 12 23:21:32 2016
@@ -653,6 +653,7 @@ public class TezCompiler extends PhyPlan
     public void visitCounter(POCounter op) throws VisitorException {
         // Refer visitRank(PORank) for more details
         try{
+            curTezOp.markRankCounter();
             POCounterTez counterTez = new POCounterTez(op);
             nonBlocking(counterTez);
             phyToTezOpMap.put(op, curTezOp);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Tue Jan 12 23:21:32 2016
@@ -179,7 +179,9 @@ public class TezOperator extends Operato
         // Indicate if this job is a distinct job
         DISTINCT,
         // Indicate if this job is a native job
-        NATIVE;
+        NATIVE,
+        // Indicate if this job does rank counter
+        RANK_COUNTER;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -442,6 +444,14 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.NATIVE.ordinal());
     }
 
+    public boolean isRankCounter() {
+        return feature.get(OPER_FEATURE.RANK_COUNTER.ordinal());
+    }
+
+    public void markRankCounter() {
+        feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> 
excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
 Tue Jan 12 23:21:32 2016
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -152,7 +151,7 @@ public class POCounterStatsTez extends P
                 prevTasksCount += counterRecords.get(i);
             }
 
-            Tuple tuple = TupleFactory.getInstance().newTuple(1);
+            Tuple tuple = mTupleFactory.newTuple(1);
             tuple.set(0, counterOffsets);
             writer.write(POValueOutputTez.EMPTY_KEY, tuple);
             finished = true;

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 Tue Jan 12 23:21:32 2016
@@ -215,4 +215,10 @@ public class POFRJoinTez extends POFRJoi
     public List<String> getInputKeys() {
         return inputKeys;
     }
+
+    @Override
+    public POFRJoinTez clone() throws CloneNotSupportedException {
+        return (POFRJoinTez) super.clone();
+    }
+
 }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 Tue Jan 12 23:21:32 2016
@@ -34,7 +34,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -49,11 +48,11 @@ public class POLocalRearrangeTez extends
     private static final Log LOG = 
LogFactory.getLog(POLocalRearrangeTez.class);
 
     protected String outputKey;
-    protected transient KeyValueWriter writer;
-
     protected boolean connectedToPackage = true;
     protected boolean isSkewedJoin = false;
 
+    protected transient KeyValueWriter writer;
+
     public POLocalRearrangeTez(OperatorKey k) {
         super(k);
     }
@@ -172,20 +171,9 @@ public class POLocalRearrangeTez extends
         return inp;
     }
 
-    /**
-     * Make a deep copy of this operator.
-     * @throws CloneNotSupportedException
-     */
     @Override
     public POLocalRearrangeTez clone() throws CloneNotSupportedException {
-        POLocalRearrangeTez clone = new POLocalRearrangeTez(new OperatorKey(
-                mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
-                        mKey.scope)), requestedParallelism);
-        deepCopyTo(clone);
-        clone.isSkewedJoin = isSkewedJoin;
-        clone.connectedToPackage = connectedToPackage;
-        clone.setOutputKey(outputKey);
-        return clone;
+        return (POLocalRearrangeTez) super.clone();
     }
 
     @Override

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
 Tue Jan 12 23:21:32 2016
@@ -33,16 +33,13 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.Pair;
 
@@ -58,13 +55,11 @@ public class POPartitionRearrangeTez ext
     private static final long serialVersionUID = 1L;
 
     private static final Log LOG = 
LogFactory.getLog(POPartitionRearrangeTez.class);
-    private static final TupleFactory tf = TupleFactory.getInstance();
-    private static final BagFactory mBagFactory = BagFactory.getInstance();
 
     // ReducerMap will store the tuple, max reducer index & min reducer index
-    private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
-    private Integer totalReducers = -1;
-    private boolean inited = false;
+    private transient Map<Object, Pair<Integer, Integer>> reducerMap;
+    private transient Integer totalReducers;
+    private transient boolean inited;
 
     public POPartitionRearrangeTez(OperatorKey k) {
         this(k, -1);
@@ -202,6 +197,8 @@ public class POPartitionRearrangeTez ext
         }
 
         Map<String, Object> distMap = null;
+        totalReducers = -1;
+        reducerMap = Maps.newHashMap();
         if (PigProcessor.sampleMap != null) {
             // We've already collected sampleMap in PigProcessor
             distMap = PigProcessor.sampleMap;
@@ -233,7 +230,7 @@ public class POPartitionRearrangeTez ext
                 if (idxTuple.size() > 3) {
                 // remove the last 2 fields of the tuple, i.e: minIndex
                 // and maxIndex and store it in the reducer map
-                Tuple keyTuple = tf.newTuple();
+                Tuple keyTuple = mTupleFactory.newTuple();
                 for (int i=0; i < idxTuple.size() - 2; i++) {
                     keyTuple.append(idxTuple.get(i));
                 }
@@ -259,13 +256,6 @@ public class POPartitionRearrangeTez ext
 
     @Override
     public POPartitionRearrangeTez clone() throws CloneNotSupportedException {
-        POPartitionRearrangeTez clone = new POPartitionRearrangeTez(new 
OperatorKey(
-                mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
-                        mKey.scope)), requestedParallelism);
-        deepCopyTo(clone);
-        clone.isSkewedJoin = isSkewedJoin;
-        clone.connectedToPackage = connectedToPackage;
-        clone.setOutputKey(outputKey);
-        return clone;
+        return (POPartitionRearrangeTez) super.clone();
     }
 }



Reply via email to