Author: rohini
Date: Tue Jan 12 23:21:32 2016
New Revision: 1724339
URL: http://svn.apache.org/viewvc?rev=1724339&view=rev
Log:
PIG-4737: Check and fix clone implementation for all classes extending
PhysicalOperator (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/trunk/test/e2e/pig/tests/multiquery.conf
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-13.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2-OPTOFF.gld
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 12 23:21:32 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4737: Check and fix clone implementation for all classes extending
PhysicalOperator (rohini)
+
PIG-4770: OOM with POPartialAgg in some cases (rohini)
PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after
union does ascending instead (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
Tue Jan 12 23:21:32 2016
@@ -31,6 +31,7 @@ import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -67,6 +68,8 @@ public abstract class PhysicalOperator e
protected static final long serialVersionUID = 1L;
protected static final Result RESULT_EMPTY = new
Result(POStatus.STATUS_NULL, null);
protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP,
null);
+ protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
+ protected static final BagFactory mBagFactory = BagFactory.getInstance();
// The degree of parallelism requested
protected int requestedParallelism;
@@ -289,7 +292,7 @@ public abstract class PhysicalOperator e
try {
if (input == null && (inputs == null || inputs.size() == 0)) {
// log.warn("No inputs found. Signaling End of Processing.");
- return new Result(POStatus.STATUS_EOP, null);
+ return RESULT_EOP;
}
// Should be removed once the model is clear
@@ -458,8 +461,11 @@ public abstract class PhysicalOperator e
}
/**
- * Make a deep copy of this operator. This function is blank, however,
+ * Make a copy of this operator. This function is blank, however,
* we should leave a place holder so that the subclasses can clone
+ * to make deep copy as this one creates a shallow copy of
+ * non-primitive types (objects, arrays and lists)
+ *
* @throws CloneNotSupportedException
*/
@Override
@@ -472,6 +478,14 @@ public abstract class PhysicalOperator e
originalLocations.addAll(op.originalLocations);
}
+ protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan>
origPlans) throws CloneNotSupportedException {
+ List<PhysicalPlan> clonePlans = new
ArrayList<PhysicalPlan>(origPlans.size());
+ for (PhysicalPlan plan : origPlans) {
+ clonePlans.add(plan.clone());
+ }
+ return clonePlans;
+ }
+
/**
* @param physicalPlan
*/
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
Tue Jan 12 23:21:32 2016
@@ -27,12 +27,10 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -51,10 +49,6 @@ public class POProject extends Expressio
*/
private static final long serialVersionUID = 1L;
- private static TupleFactory tupleFactory = TupleFactory.getInstance();
-
- protected static final BagFactory bagFactory = BagFactory.getInstance();
-
private boolean resultSingleTupleBag = false;
//The column to project
@@ -191,7 +185,7 @@ public class POProject extends Expressio
for(int col : columns) {
addColumn(objList, inpValue, col);
}
- ret = tupleFactory.newTupleNoCopy(objList);
+ ret = mTupleFactory.newTupleNoCopy(objList);
}
res.result = ret;
illustratorMarkup(inpValue, res.result, -1);
@@ -277,20 +271,20 @@ public class POProject extends Expressio
for (int col : columns) {
addColumn(objList, tuple, col);
}
- outBag = new SingleTupleBag(
tupleFactory.newTupleNoCopy(objList) );
+ outBag = new SingleTupleBag(
mTupleFactory.newTupleNoCopy(objList) );
}else {
Tuple tmpTuple = getRangeTuple(tuple);
outBag = new SingleTupleBag(tmpTuple);
}
} else {
- outBag = bagFactory.newDefaultBag();
+ outBag = mBagFactory.newDefaultBag();
for (Tuple tuple : inpBag) {
if(!isProjectToEnd){
ArrayList<Object> objList = new
ArrayList<Object>(columns.size());
for (int col : columns) {
addColumn(objList, tuple, col);
}
- outBag.add( tupleFactory.newTupleNoCopy(objList) );
+ outBag.add( mTupleFactory.newTupleNoCopy(objList) );
}else{
Tuple outTuple = getRangeTuple(tuple);
outBag.add(outTuple);
@@ -321,14 +315,14 @@ public class POProject extends Expressio
Tuple outTuple;
if(isRangeInvalid(lastColIdx)){
//invalid range - return empty tuple
- outTuple = tupleFactory.newTuple();
+ outTuple = mTupleFactory.newTuple();
}
else {
ArrayList<Object> objList = new ArrayList<Object>(lastColIdx -
startCol + 1);
for(int i = startCol; i <= lastColIdx ; i++){
addColumn(objList, tuple, i);
}
- outTuple = tupleFactory.newTupleNoCopy(objList);
+ outTuple = mTupleFactory.newTupleNoCopy(objList);
}
return outTuple;
}
@@ -451,7 +445,7 @@ public class POProject extends Expressio
objList.add(null);
}
}
- ret = tupleFactory.newTuple(objList);
+ ret = mTupleFactory.newTuple(objList);
res.result = (Tuple)ret;
return res;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
Tue Jan 12 23:21:32 2016
@@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.util.IdentityHashSet;
/**
* This is a base class for all unary comparison operators. Supports the
* use of operand type instead of result type as the result type is
* always boolean.
- *
+ *
*/
public abstract class UnaryComparisonOperator extends UnaryExpressionOperator
implements ComparisonOperator {
@@ -35,7 +34,7 @@ public abstract class UnaryComparisonOpe
//The result will be comunicated using the Status object.
//This is a slight abuse of the status object.
protected byte operandType;
-
+
public UnaryComparisonOperator(OperatorKey k) {
this(k,-1);
}
@@ -44,14 +43,16 @@ public abstract class UnaryComparisonOpe
super(k, rp);
}
+ @Override
public byte getOperandType() {
return operandType;
}
+ @Override
public void setOperandType(byte operandType) {
this.operandType = operandType;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
if(illustrator != null) {
@@ -59,4 +60,9 @@ public abstract class UnaryComparisonOpe
}
return null;
}
+
+ protected void cloneHelper(UnaryComparisonOperator op) {
+ super.cloneHelper(op);
+ this.operandType = op.operandType;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java
Tue Jan 12 23:21:32 2016
@@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
@@ -69,8 +68,6 @@ public class POCounter extends PhysicalO
**/
private boolean isRowNumber = false;
- protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
-
/**
* Local counter for tuples on the same task.
**/
@@ -321,4 +318,14 @@ public class POCounter extends PhysicalO
public String getOperationID() {
return operationID;
}
+
+ @Override
+ public POCounter clone() throws CloneNotSupportedException {
+ POCounter clone = (POCounter)super.clone();
+ clone.localCount = new Long(localCount);
+ clone.taskID = new Integer(taskID);
+ // counterPlans and mAscCols unused. Not cloning them
+ return clone;
+ }
+
}
\ No newline at end of file
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Tue Jan 12 23:21:32 2016
@@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -37,20 +36,20 @@ import org.apache.pig.pen.util.LineageTr
/**
* Recover this class for nested cross operation.
- *
- *
+ *
+ *
*/
public class POCross extends PhysicalOperator {
private static final long serialVersionUID = 1L;
- protected DataBag[] inputBags;
+ protected transient DataBag[] inputBags;
- protected Tuple[] data;
+ protected transient Tuple[] data;
protected transient Iterator<Tuple>[] its;
-
- protected Tuple tupleOfLastBag;
+
+ protected transient Tuple tupleOfLastBag;
public POCross(OperatorKey k) {
super(k);
@@ -197,7 +196,7 @@ public class POCross extends PhysicalOpe
its = new Iterator[length];
for (int i = 0; i < length; ++i) {
PhysicalOperator op = inputs.get(i);
- DataBag bag = BagFactory.getInstance().newDefaultBag();
+ DataBag bag = mBagFactory.newDefaultBag();
inputBags[count] = bag;
for (Result res = op.getNextTuple(); res.returnStatus !=
POStatus.STATUS_EOP; res = op
.getNextTuple()) {
@@ -226,7 +225,7 @@ public class POCross extends PhysicalOpe
return illustratorMarkup(out, out, 0);
}
-
+
private boolean loadLastBag() throws ExecException {
Result resOfLastBag = null;
int index = inputs.size() - 1;
@@ -247,7 +246,7 @@ public class POCross extends PhysicalOpe
"Error accumulating data in the local Cross operator");
}
}
-
+
private void clearMemory() {
// reset inputBags, its, data and tupleOfLastBag to null so that in the
// next round of getNext, the new input data will be loaded.
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
Tue Jan 12 23:21:32 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalDistinctBag;
@@ -49,9 +48,8 @@ import org.apache.pig.impl.plan.VisitorE
public class PODistinct extends PhysicalOperator implements Cloneable {
private static final Log log = LogFactory.getLog(PODistinct.class);
private static final long serialVersionUID = 1L;
- private boolean inputsAccumulated = false;
- private DataBag distinctBag = null;
-
+ private transient boolean inputsAccumulated;
+ private transient DataBag distinctBag;
private transient boolean initialized;
private transient boolean useDefaultBag;
private transient Iterator<Tuple> it;
@@ -102,7 +100,7 @@ public class PODistinct extends Physical
}
}
}
- distinctBag = useDefaultBag ?
BagFactory.getInstance().newDistinctBag()
+ distinctBag = useDefaultBag ? mBagFactory.newDistinctBag()
: new InternalDistinctBag(3);
Result in = processInput();
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
Tue Jan 12 23:21:32 2016
@@ -46,7 +46,6 @@ import org.apache.pig.data.SchemaTupleBa
import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
import org.apache.pig.data.SchemaTupleFactory;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -70,6 +69,7 @@ import org.apache.pig.impl.plan.VisitorE
public class POFRJoin extends PhysicalOperator {
private static final Log log = LogFactory.getLog(POFRJoin.class);
private static final long serialVersionUID = 1L;
+
// The number in the input list which denotes the fragmented input
protected int fragment;
// There can be n inputs each being a List<PhysicalPlan>
@@ -85,18 +85,7 @@ public class POFRJoin extends PhysicalOp
protected ConstantExpression[] constExps;
// Used to produce the cross product of various bags
protected POForEach fe;
- // The array of Hashtables one per replicated input. replicates[fragment] =
- // null
- // fragment is the input which is fragmented and not replicated.
- protected TupleToMapKey replicates[];
- // varaible which denotes whether we are returning tuples from the foreach
- // operator
- protected boolean processingPlan;
- // A dummy tuple
- protected Tuple dumTup = TupleFactory.getInstance().newTuple(1);
- // An instance of tuple factory
- protected transient TupleFactory mTupleFactory;
- protected boolean setUp;
+
// A Boolean variable which denotes if this is a LeftOuter Join or an Inner
// Join
protected boolean isLeftOuterJoin;
@@ -106,6 +95,16 @@ public class POFRJoin extends PhysicalOp
protected Schema[] inputSchemas;
protected Schema[] keySchemas;
+ // The array of Hashtables one per replicated input. replicates[fragment] =
+ // null fragment is the input which is fragmented and not replicated.
+ protected transient TupleToMapKey replicates[];
+ // varaible which denotes whether we are returning tuples from the foreach
+ // operator
+ protected transient boolean processingPlan;
+ // A dummy tuple
+ protected transient Tuple dumTup;
+ protected transient boolean setUp;
+
public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
FileSpec[] replFiles, int fragment, boolean isLeftOuter,
@@ -126,12 +125,10 @@ public class POFRJoin extends PhysicalOp
this.fragment = fragment;
this.keyTypes = keyTypes;
this.replFiles = replFiles;
- replicates = new TupleToMapKey[ppLists.size()];
+
LRs = new POLocalRearrange[ppLists.size()];
constExps = new ConstantExpression[ppLists.size()];
createJoinPlans(k);
- processingPlan = false;
- mTupleFactory = TupleFactory.getInstance();
List<Tuple> tupList = new ArrayList<Tuple>();
tupList.add(nullTuple);
nullBag = new NonSpillableDataBag(tupList);
@@ -159,7 +156,6 @@ public class POFRJoin extends PhysicalOp
this.fe = copy.fe;
this.constExps = copy.constExps;
this.processingPlan = copy.processingPlan;
- this.mTupleFactory = copy.mTupleFactory;
this.nullBag = copy.nullBag;
this.isLeftOuterJoin = copy.isLeftOuterJoin;
this.inputSchemas = copy.inputSchemas;
@@ -173,7 +169,7 @@ public class POFRJoin extends PhysicalOp
/**
* Configures the Local Rearrange operators & the foreach operator
- *
+ *
* @param old
* @throws ExecException
*/
@@ -238,6 +234,8 @@ public class POFRJoin extends PhysicalOp
Result res = null;
Result inp = null;
if (!setUp) {
+ replicates = new TupleToMapKey[phyPlanLists.size()];
+ dumTup = mTupleFactory.newTuple(1);
setUpHashMap();
setUp = true;
}
@@ -254,7 +252,7 @@ public class POFRJoin extends PhysicalOp
if (res.returnStatus == POStatus.STATUS_EOP) {
// We have completed all cross-products now its time to
move
// to next tuple of left side
- processingPlan = false;
+ processingPlan = false;
break;
}
if (res.returnStatus == POStatus.STATUS_ERR) {
@@ -284,7 +282,7 @@ public class POFRJoin extends PhysicalOp
return new Result();
}
Tuple lrOutTuple = (Tuple) lrOut.result;
- Tuple key = TupleFactory.getInstance().newTuple(1);
+ Tuple key = mTupleFactory.newTuple(1);
key.set(0, lrOutTuple.get(1));
Tuple value = getValueTuple(lr, lrOutTuple);
lr.detachInput();
@@ -390,9 +388,9 @@ public class POFRJoin extends PhysicalOp
POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L),
replFile);
-
+
Properties props = ConfigurationUtil.getLocalFSProperties();
- PigContext pc = new PigContext(ExecType.LOCAL, props);
+ PigContext pc = new PigContext(ExecType.LOCAL, props);
ld.setPc(pc);
// We use LocalRearrange Operator to seperate Key and Values
// eg. ( a, b, c ) would generate a, ( a, b, c )
@@ -437,11 +435,10 @@ public class POFRJoin extends PhysicalOp
}
return false;
}
-
+
private void readObject(ObjectInputStream is) throws IOException,
ClassNotFoundException, ExecException {
is.defaultReadObject();
- mTupleFactory = TupleFactory.getInstance();
// setUpHashTable();
}
@@ -531,4 +528,28 @@ public class POFRJoin extends PhysicalOp
// no op: all handled by the preceding POForEach
return null;
}
+
+ @Override
+ public POFRJoin clone() throws CloneNotSupportedException {
+ POFRJoin clone = (POFRJoin) super.clone();
+ // Not doing deep copy of nullBag, nullBag, inputSchemas, keySchemas
+ // as they are read only
+ clone.phyPlanLists = new
ArrayList<List<PhysicalPlan>>(phyPlanLists.size());
+ for (List<PhysicalPlan> ppLst : phyPlanLists) {
+ clone.phyPlanLists.add(clonePlans(ppLst));
+ }
+
+ clone.LRs = new POLocalRearrange[phyPlanLists.size()];
+ clone.constExps = new ConstantExpression[phyPlanLists.size()];
+ try {
+ clone.createJoinPlans(getOperatorKey());
+ } catch (ExecException e) {
+ CloneNotSupportedException cnse = new
CloneNotSupportedException("Problem with setting plans of " +
this.getClass().getSimpleName());
+ cnse.initCause(e);
+ throw cnse;
+ }
+ return clone;
+ }
+
+
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
Tue Jan 12 23:21:32 2016
@@ -49,17 +49,16 @@ public class POFilter extends PhysicalOp
private static final long serialVersionUID = 1L;
// The expression plan
- PhysicalPlan plan;
+ private PhysicalPlan plan;
// The root comparison operator of the expression plan
-// ComparisonOperator comOp;
- PhysicalOperator comOp;
-
+ // ComparisonOperator comOp;
+ private PhysicalOperator comOp;
// The operand type for the comparison operator needed
// to call the comparison operators getNext with the
// appropriate type
- byte compOperandType;
+ // private byte compOperandType;
public POFilter(OperatorKey k) {
this(k, -1, null);
@@ -205,8 +204,7 @@ public class POFilter extends PhysicalOp
@Override
public PhysicalOperator clone() throws CloneNotSupportedException {
- Object o = super.clone();
- POFilter opClone = (POFilter)o;
+ POFilter opClone = (POFilter) super.clone();
opClone.setPlan(plan.clone());
return opClone;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
Tue Jan 12 23:21:32 2016
@@ -57,26 +57,13 @@ public class POForEach extends PhysicalO
private static final long serialVersionUID = 1L;
protected List<PhysicalPlan> inputPlans;
- protected List<PhysicalOperator> opsToBeReset;
- //Since the plan has a generate, this needs to be maintained
- //as the generate can potentially return multiple tuples for
- //same call.
- protected boolean processingPlan = false;
-
- //its holds the iterators of the databags given by the input expressions
which need flattening.
- transient protected Iterator<Tuple> [] its = null;
- //This holds the outputs given out by the input expressions of any datatype
- protected Object [] bags = null;
+ protected List<PhysicalOperator> opsToBeReset;
- //This is the template whcih contains tuples and is flattened out in
createTuple() to generate the final output
- protected Object[] data = null;
+ protected PhysicalOperator[] planLeafOps;
// store result types of the plan leaves
- protected byte[] resultTypes = null;
-
- // store whether or not an accumulative UDF has terminated early
- protected BitSet earlyTermination = null;
+ protected byte[] resultTypes;
// array version of isToBeFlattened - this is purely
// for optimization - instead of calling isToBeFlattened.get(i)
@@ -85,16 +72,33 @@ public class POForEach extends PhysicalO
// so we can also save on the Boolean.booleanValue() calls
protected boolean[] isToBeFlattenedArray;
- ExampleTuple tIn = null;
protected int noItems;
- protected PhysicalOperator[] planLeafOps = null;
+ //Since the plan has a generate, this needs to be maintained
+ //as the generate can potentially return multiple tuples for
+ //same call.
+ protected transient boolean processingPlan;
+
+ //its holds the iterators of the databags given by the input expressions
which need flattening.
+ protected transient Iterator<Tuple> [] its = null;
+
+ //This holds the outputs given out by the input expressions of any datatype
+ protected transient Object[] bags ;
+
+ //This is the template whcih contains tuples and is flattened out in
createTuple() to generate the final output
+ protected transient Object[] data;
+
+ // store whether or not an accumulative UDF has terminated early
+ protected transient BitSet earlyTermination;
+
+ protected transient ExampleTuple tIn;
+
protected transient AccumulativeTupleBuffer buffer;
- protected Tuple inpTuple;
+ protected transient Tuple inpTuple;
- protected boolean endOfAllInputProcessed = false;
+ protected transient boolean endOfAllInputProcessed;
// Indicate the foreach statement can only in map side
// Currently only used in MR cross (See PIG-4175)
@@ -372,7 +376,7 @@ public class POForEach extends PhysicalO
if(its == null) {
if (endOfAllInputProcessed) {
- return new Result(POStatus.STATUS_EOP, null);
+ return RESULT_EOP;
}
//getNext being called for the first time OR starting with a set
of new data from inputs
its = new Iterator[noItems];
@@ -688,6 +692,8 @@ public class POForEach extends PhysicalO
clone.setOpsToBeReset(ops);
clone.setResultType(getResultType());
clone.addOriginalLocation(alias, getOriginalLocations());
+ clone.endOfAllInputProcessing = endOfAllInputProcessing;
+ clone.mapSideOnly = mapSideOnly;
return clone;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
Tue Jan 12 23:21:32 2016
@@ -27,7 +27,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
@@ -38,14 +37,14 @@ public class POLimit extends PhysicalOpe
*/
private static final long serialVersionUID = 1L;
- // Counts for outputs processed
- private long soFar = 0;
-
// Number of limited outputs
- long mLimit;
+ private long mLimit;
// The expression plan
- PhysicalPlan expressionPlan;
+ private PhysicalPlan expressionPlan;
+
+ // Counts for outputs processed
+ private transient long soFar = 0;
public POLimit(OperatorKey k) {
this(k, -1, null);
@@ -159,15 +158,11 @@ public class POLimit extends PhysicalOpe
@Override
public POLimit clone() throws CloneNotSupportedException {
- POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
- this.requestedParallelism, this.inputs);
- newLimit.mLimit = this.mLimit;
+ POLimit clone = (POLimit) super.clone();
if (this.expressionPlan != null) {
- newLimit.expressionPlan = this.expressionPlan.clone();
+ clone.expressionPlan = this.expressionPlan.clone();
}
- newLimit.addOriginalLocation(alias, getOriginalLocations());
- return newLimit;
+ return clone;
}
@Override
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
Tue Jan 12 23:21:32 2016
@@ -35,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -57,8 +55,6 @@ public class POLocalRearrange extends Ph
*/
protected static final long serialVersionUID = 1L;
- protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
-
private static final Result ERR_RESULT = new Result();
protected List<PhysicalPlan> plans;
@@ -82,8 +78,6 @@ public class POLocalRearrange extends Ph
protected boolean isCross = false;
- protected Result inp;
-
// map to store mapping of projected columns to
// the position in the "Key" where these will be projected to.
// We use this information to strip off these columns
@@ -133,6 +127,8 @@ public class POLocalRearrange extends Ph
// By default, we strip keys from the value.
private boolean stripKeyFromValue = true;
+ protected transient Result inp;
+
public POLocalRearrange(OperatorKey k) {
this(k, -1, null);
}
@@ -709,32 +705,18 @@ public class POLocalRearrange extends Ph
*/
@Override
public POLocalRearrange clone() throws CloneNotSupportedException {
- POLocalRearrange clone = new POLocalRearrange(new OperatorKey(
- mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
- requestedParallelism);
- deepCopyTo(clone);
- return clone;
- }
-
- protected void deepCopyTo(POLocalRearrange clone)
- throws CloneNotSupportedException {
-
- clone.setParentPlan(parentPlan);
- clone.index = index;
- if (useSecondaryKey) {
- clone.keyType = mainKeyType;
- } else {
- clone.keyType = keyType;
- }
- clone.setUseSecondaryKey(useSecondaryKey);
+ POLocalRearrange clone = (POLocalRearrange) super.clone();
+ // Constructor
+ clone.leafOps = new ArrayList<ExpressionOperator>();
+ clone.secondaryLeafOps = new ArrayList<ExpressionOperator>();
// Needs to be called as setDistinct so that the fake index tuple gets
// created.
clone.setDistinct(mIsDistinct);
- clone.setCross(isCross);
- clone.addOriginalLocation(alias, getOriginalLocations());
- clone.setStripKeyFromValue(stripKeyFromValue);
-
+ // Set the keyType to mainKeyType. setSecondaryPlans will calculate
+ // based on that and set keyType to the final value
+ if (useSecondaryKey) {
+ clone.keyType = mainKeyType;
+ }
try {
clone.setPlans(clonePlans(plans));
if (secondaryPlans != null) {
@@ -745,14 +727,7 @@ public class POLocalRearrange extends Ph
cnse.initCause(pe);
throw cnse;
}
- }
-
- private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws
CloneNotSupportedException {
- List<PhysicalPlan> clonePlans = new
ArrayList<PhysicalPlan>(origPlans.size());
- for (PhysicalPlan plan : origPlans) {
- clonePlans.add(plan.clone());
- }
- return clonePlans;
+ return clone;
}
public boolean isCross() {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
Tue Jan 12 23:21:32 2016
@@ -17,22 +17,16 @@
*/
package
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
-import java.util.ArrayList;
import java.util.List;
-import java.util.LinkedList;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.impl.util.IdentityHashSet;
/**
* A specialized version of POForeach with the difference
@@ -45,10 +39,10 @@ import org.apache.pig.impl.util.Identity
public class POOptimizedForEach extends POForEach {
/**
- *
+ *
*/
private static final long serialVersionUID = 1L;
-
+
public POOptimizedForEach(OperatorKey k) {
this(k,-1,null,null);
}
@@ -64,7 +58,7 @@ public class POOptimizedForEach extends
public POOptimizedForEach(OperatorKey k, List inp) {
this(k,-1,inp,null);
}
-
+
public POOptimizedForEach(OperatorKey k, int rp, List<PhysicalPlan> inp,
List<Boolean> isToBeFlattened){
super(k, rp);
setUpFlattens(isToBeFlattened);
@@ -82,7 +76,7 @@ public class POOptimizedForEach extends
String fString = getFlatStr();
return "Optimized For Each" + "(" + fString + ")" + "[" +
DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
}
-
+
/**
* Calls getNext on the generate operator inside the nested
* physical plan and returns it maintaining an additional state
@@ -120,40 +114,25 @@ public class POOptimizedForEach extends
//nested plan processing on the input tuple
//read
while (true) {
-
- // we know that input has been attached
+
+ // we know that input has been attached
attachInputToPlans(input);
detachInput();
res = processPlan();
-
+
processingPlan = true;
-
+
return res;
}
}
-
+
/**
- * Make a deep copy of this operator.
+ * Make a deep copy of this operator.
* @throws CloneNotSupportedException
*/
@Override
public POOptimizedForEach clone() throws CloneNotSupportedException {
- List<PhysicalPlan> plans = new
- ArrayList<PhysicalPlan>(inputPlans.size());
- for (PhysicalPlan plan : inputPlans) {
- plans.add(plan.clone());
- }
- List<Boolean> flattens = null;
- if(isToBeFlattenedArray != null ) {
- flattens = new
- ArrayList<Boolean>(isToBeFlattenedArray.length);
- for (boolean b : isToBeFlattenedArray) {
- flattens.add(b);
- }
- }
- return new POOptimizedForEach(new OperatorKey(mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
- requestedParallelism, plans, flattens);
+ return (POOptimizedForEach) super.clone();
}
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
Tue Jan 12 23:21:32 2016
@@ -30,13 +30,11 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
import org.apache.pig.data.AccumulativeBag;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.ReadOnceBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -74,9 +72,6 @@ public class POPackage extends PhysicalO
//key, no value.
protected int numInputs;
- protected static final BagFactory mBagFactory = BagFactory.getInstance();
- protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
-
private boolean lastBagReadOnly = true;
protected Packager pkgr;
@@ -240,8 +235,7 @@ public class POPackage extends PhysicalO
// create bag to pull all tuples out of iterator
for (int i = 0; i < numInputs; i++) {
- dbs[i] = useDefaultBag ? BagFactory.getInstance()
- .newDefaultBag()
+ dbs[i] = useDefaultBag ? mBagFactory.newDefaultBag()
// In a very rare case if there is a POStream
after this
// POPackage in the pipeline and is also blocking
the
// pipeline;
@@ -259,7 +253,7 @@ public class POPackage extends PhysicalO
if (index == numInputs - 1) {
if (pkgr.getUseSecondaryKey()) {
if (dbs[index] == null) {
- dbs[index] = useDefaultBag ?
BagFactory.getInstance()
+ dbs[index] = useDefaultBag ? mBagFactory
.newDefaultBag() : new
InternalCachedBag(numInputs);
}
} else {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Tue Jan 12 23:21:32 2016
@@ -41,7 +41,6 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalCachedBag;
import org.apache.pig.data.SelfSpillBag.MemoryLimits;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.GroupingSpillable;
@@ -92,8 +91,6 @@ public class POPartialAgg extends Physic
private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new
WeakHashMap<POPartialAgg, Byte>();
- private static final TupleFactory TF = TupleFactory.getInstance();
-
private PhysicalPlan keyPlan;
private ExpressionOperator keyLeaf;
private List<PhysicalPlan> valuePlans;
@@ -496,7 +493,7 @@ public class POPartialAgg extends Physic
}
private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws
ExecException {
- Tuple valueTuple = TF.newTuple(valuePlans.size() + 1);
+ Tuple valueTuple = mTupleFactory.newTuple(valuePlans.size() + 1);
valueTuple.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
@@ -576,7 +573,7 @@ public class POPartialAgg extends Physic
* @throws ExecException
*/
private Result getOutput(Object key, Tuple value) throws ExecException {
- Tuple output = TF.newTuple(valuePlans.size() + 1);
+ Tuple output = mTupleFactory.newTuple(valuePlans.size() + 1);
output.set(0, key);
for (int i = 0; i < valuePlans.size(); i++) {
@@ -657,4 +654,14 @@ public class POPartialAgg extends Physic
return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
}
+ @Override
+ public PhysicalOperator clone() throws CloneNotSupportedException {
+ POPartialAgg clone = (POPartialAgg) super.clone();
+ clone.setKeyPlan(keyPlan.clone());
+ clone.setValuePlans(clonePlans(valuePlans));
+ return clone;
+ }
+
+
+
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
Tue Jan 12 23:21:32 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -39,8 +38,6 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
-import com.google.common.collect.Maps;
-
/**
* The partition rearrange operator is a part of the skewed join
* implementation. It has an embedded physical plan that
@@ -50,12 +47,11 @@ import com.google.common.collect.Maps;
public class POPartitionRearrange extends POLocalRearrange {
private static final long serialVersionUID = 1L;
- private static final BagFactory mBagFactory = BagFactory.getInstance();
- private Integer totalReducers = -1;
+ private transient Integer totalReducers;
// ReducerMap will store the tuple, max reducer index & min reducer index
- private Map<Object, Pair<Integer, Integer> > reducerMap =
Maps.newHashMap();
- private boolean inited;
+ private transient Map<Object, Pair<Integer, Integer> > reducerMap;
+ private transient boolean inited;
private PigContext pigContext;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
Tue Jan 12 23:21:32 2016
@@ -23,7 +23,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -32,51 +31,43 @@ public class POPoissonSample extends Phy
private static final long serialVersionUID = 1L;
- private static final TupleFactory tf = TupleFactory.getInstance();
- private static Result eop = new Result(POStatus.STATUS_EOP, null);
+ // 17 is not a magic number. It can be obtained by using a poisson
+ // cumulative distribution function with the mean set to 10 (empirically,
+ // minimum number of samples) and the confidence set to 95%
+ public static final int DEFAULT_SAMPLE_RATE = 17;
+
+ private int sampleRate = 0;
+
+ private float heapPerc = 0f;
+
+ private Long totalMemory;
+
+ private transient boolean initialized;
// num of rows sampled so far
- private int numRowsSampled = 0;
+ private transient int numRowsSampled;
// average size of tuple in memory, for tuples sampled
- private long avgTupleMemSz = 0;
+ private transient long avgTupleMemSz;
// current row number
- private long rowNum = 0;
+ private transient long rowNum;
// number of tuples to skip after each sample
- private long skipInterval = -1;
+ private transient long skipInterval;
// bytes in input to skip after every sample.
// divide this by avgTupleMemSize to get skipInterval
- private long memToSkipPerSample = 0;
+ private transient long memToSkipPerSample;
// has the special row with row number information been returned
- private boolean numRowSplTupleReturned = false;
-
- // 17 is not a magic number. It can be obtained by using a poisson
- // cumulative distribution function with the mean set to 10 (empirically,
- // minimum number of samples) and the confidence set to 95%
- public static final int DEFAULT_SAMPLE_RATE = 17;
-
- private int sampleRate = 0;
-
- private float heapPerc = 0f;
-
- private long totalMemory = Runtime.getRuntime().maxMemory();
+ private transient boolean numRowSplTupleReturned;
// new Sample result
- private Result newSample = null;
+ private transient Result newSample;
public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
super(k, rp, null);
- numRowsSampled = 0;
- avgTupleMemSz = 0;
- rowNum = 0;
- skipInterval = -1;
- memToSkipPerSample = 0;
- numRowSplTupleReturned = false;
- newSample = null;
sampleRate = sr;
heapPerc = hp;
if (tm != -1) {
@@ -97,10 +88,22 @@ public class POPoissonSample extends Phy
@Override
public Result getNextTuple() throws ExecException {
+ if (!initialized) {
+ numRowsSampled = 0;
+ avgTupleMemSz = 0;
+ rowNum = 0;
+ skipInterval = -1;
+ memToSkipPerSample = 0;
+ if (totalMemory == null) {
+ // Initialize in backend to get memory of task
+ totalMemory = Runtime.getRuntime().maxMemory();
+ }
+ initialized = true;
+ }
if (numRowSplTupleReturned) {
// row num special row has been returned after all inputs
// were read, nothing more to read
- return eop;
+ return RESULT_EOP;
}
Result res = null;
@@ -216,7 +219,7 @@ public class POPoissonSample extends Phy
*/
private Result createNumRowTuple(Tuple sample) throws ExecException {
int sz = (sample == null) ? 0 : sample.size();
- Tuple t = tf.newTuple(sz + 2);
+ Tuple t = mTupleFactory.newTuple(sz + 2);
if (sample != null) {
for (int i=0; i<sample.size(); i++){
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
Tue Jan 12 23:21:32 2016
@@ -29,12 +29,10 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.SingleTupleBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -53,9 +51,6 @@ public class POPreCombinerLocalRearrange
protected static final long serialVersionUID = 1L;
- protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
- protected static BagFactory mBagFactory = BagFactory.getInstance();
-
private static final Result ERR_RESULT = new Result();
protected List<PhysicalPlan> plans;
@@ -223,4 +218,12 @@ public class POPreCombinerLocalRearrange
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return null;
}
+
+ @Override
+ public POPreCombinerLocalRearrange clone() throws
CloneNotSupportedException {
+ POPreCombinerLocalRearrange clone = (POPreCombinerLocalRearrange)
super.clone();
+ clone.leafOps = new ArrayList<ExpressionOperator>();
+ clone.setPlans(clonePlans(plans));
+ return clone;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
Tue Jan 12 23:21:32 2016
@@ -32,7 +32,6 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
@@ -55,8 +54,6 @@ public class PORank extends PhysicalOper
private List<Boolean> mAscCols;
private List<Byte> ExprOutputTypes;
- protected static final TupleFactory mTupleFactory =
TupleFactory.getInstance();
-
/**
* Unique identifier that links POCounter and PORank,
* through the global counter labeled with it.
@@ -230,4 +227,11 @@ public class PORank extends PhysicalOper
public String getOperationID() {
return operationID;
}
+
+ @Override
+ public PORank clone() throws CloneNotSupportedException {
+ PORank clone = (PORank)super.clone();
+ // rankPlans, mAscCols, ExprOutputTypes are unused. Not cloning them
+ return clone;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
Tue Jan 12 23:21:32 2016
@@ -26,31 +26,28 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
public class POReservoirSample extends PhysicalOperator {
- private static final TupleFactory tf = TupleFactory.getInstance();
-
private static final long serialVersionUID = 1L;
// number of samples to be sampled
protected int numSamples;
- private transient int nextSampleIdx= 0;
+ private transient int nextSampleIdx = 0;
- private int rowProcessed = 0;
+ private transient int rowProcessed = 0;
- private boolean sampleCollectionDone = false;
+ private transient boolean sampleCollectionDone = false;
//array to store the result
private transient Result[] samples = null;
// last sample result
- private Result lastSample = null;
+ private transient Result lastSample = null;
public POReservoirSample(OperatorKey k) {
this(k, -1, null);
@@ -170,14 +167,12 @@ public class POReservoirSample extends P
}
Result res = samples[nextSampleIdx++];
if (res == null) { // Input data has lesser rows than numSamples
- return new Result(POStatus.STATUS_NULL, null);
+ return RESULT_EMPTY;
}
return res;
}
else{
- Result res;
- res = new Result(POStatus.STATUS_EOP, null);
- return res;
+ return RESULT_EOP;
}
}
@@ -203,7 +198,7 @@ public class POReservoirSample extends P
*/
private Result createNumRowTuple(Tuple sample) throws ExecException {
int sz = (sample == null) ? 0 : sample.size();
- Tuple t = tf.newTuple(sz + 2);
+ Tuple t = mTupleFactory.newTuple(sz + 2);
if (sample != null) {
for (int i=0; i<sample.size(); i++){
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
Tue Jan 12 23:21:32 2016
@@ -36,12 +36,10 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -74,11 +72,11 @@ public class POSort extends PhysicalOper
private POUserComparisonFunc mSortFunc;
private Comparator<Tuple> mComparator;
- private boolean inputsAccumulated = false;
private long limit;
public boolean isUDFComparatorUsed = false;
- private DataBag sortedBag;
+ private transient boolean inputsAccumulated = false;
+ private transient DataBag sortedBag;
private transient Iterator<Tuple> it;
private transient boolean initialized;
private transient boolean useDefaultBag;
@@ -95,22 +93,22 @@ public class POSort extends PhysicalOper
this.sortPlans = sortPlans;
this.mAscCols = mAscCols;
this.limit = -1;
- this.mSortFunc = mSortFunc;
- if (mSortFunc == null) {
+ setSortFunc(mSortFunc);
+ }
+
+ private void setSortFunc(POUserComparisonFunc mSortFunc) {
+ this.mSortFunc = mSortFunc;
+ if (mSortFunc == null) {
mComparator = new SortComparator();
- /*sortedBag = BagFactory.getInstance().newSortedBag(
- new SortComparator());*/
- ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+ ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
- for(PhysicalPlan plan : sortPlans) {
-
ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
- }
- } else {
- /*sortedBag = BagFactory.getInstance().newSortedBag(
- new UDFSortComparator());*/
+ for(PhysicalPlan plan : sortPlans) {
+ ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
+ }
+ } else {
mComparator = new UDFSortComparator();
- isUDFComparatorUsed = true;
- }
+ isUDFComparatorUsed = true;
+ }
}
public POSort(OperatorKey k, int rp, List inp) {
@@ -271,7 +269,7 @@ public class POSort extends PhysicalOper
}
// by default, we create InternalSortedBag, unless user
configures
// explicitly to use old bag
- sortedBag = useDefaultBag ?
BagFactory.getInstance().newSortedBag(mComparator)
+ sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
: new InternalSortedBag(3, mComparator);
while (res.returnStatus != POStatus.STATUS_EOP) {
@@ -363,23 +361,19 @@ public class POSort extends PhysicalOper
@Override
public POSort clone() throws CloneNotSupportedException {
- List<PhysicalPlan> clonePlans = new
- ArrayList<PhysicalPlan>(sortPlans.size());
- for (PhysicalPlan plan : sortPlans) {
- clonePlans.add(plan.clone());
+ POSort clone = (POSort) super.clone();
+ clone.sortPlans = clonePlans(sortPlans);
+ if (mSortFunc == null) {
+ setSortFunc(null);
+ } else {
+ setSortFunc(mSortFunc.clone());
}
List<Boolean> cloneAsc = new ArrayList<Boolean>(mAscCols.size());
for (Boolean b : mAscCols) {
cloneAsc.add(b);
}
- POUserComparisonFunc cloneFunc = null;
- if (mSortFunc != null) {
- cloneFunc = mSortFunc.clone();
- }
- // Don't set inputs as PhysicalPlan.clone will take care of that
- return new POSort(new OperatorKey(mKey.scope,
- NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
- requestedParallelism, null, clonePlans, cloneAsc, cloneFunc);
+ clone.mAscCols = cloneAsc;
+ return clone;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
Tue Jan 12 23:21:32 2016
@@ -89,9 +89,7 @@ public class POSplit extends PhysicalOpe
private BitSet processedSet = new BitSet();
- private static Result empty = new Result(POStatus.STATUS_NULL, null);
-
- private boolean inpEOP = false;
+ private transient boolean inpEOP = false;
/**
* Constructs an operator with the specified key
@@ -243,7 +241,7 @@ public class POSplit extends PhysicalOpe
}
}
- return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+ return (res.returnStatus == POStatus.STATUS_OK) ? res : RESULT_EMPTY;
}
private Result runPipeline(PhysicalOperator leaf) throws ExecException {
@@ -321,13 +319,9 @@ public class POSplit extends PhysicalOpe
@Override
public POSplit clone() throws CloneNotSupportedException {
- Object o = super.clone();
- POSplit opClone = (POSplit)o;
+ POSplit opClone = (POSplit) super.clone();
opClone.processedSet = new BitSet();
- opClone.myPlans = new ArrayList<PhysicalPlan>(myPlans.size());
- for (PhysicalPlan plan : myPlans) {
- opClone.myPlans.add(plan.clone());
- }
+ opClone.myPlans = clonePlans(myPlans);
return opClone;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
Tue Jan 12 23:21:32 2016
@@ -50,7 +50,6 @@ import org.apache.pig.tools.pigstats.Pig
public class POStore extends PhysicalOperator {
private static final long serialVersionUID = 1L;
- protected static Result empty = new Result(POStatus.STATUS_NULL, null);
transient private StoreFuncInterface storer;
transient private StoreFuncDecorator sDecorator;
transient private POStoreImpl impl;
@@ -117,7 +116,7 @@ public class POStore extends PhysicalOpe
public void setUp() throws IOException{
if (impl != null) {
try{
- storer = impl.createStoreFunc(this);
+ storer = impl.createStoreFunc(this);
if (!isTmpStore && !disableCounter && impl instanceof
MapReducePOStoreImpl) {
counterName = PigStatsUtil.getMultiStoreCounterName(this);
if (counterName != null) {
@@ -165,7 +164,7 @@ public class POStore extends PhysicalOpe
sDecorator.putNext((Tuple) res.result);
} else
illustratorMarkup(res.result, res.result, 0);
- res = empty;
+ res = RESULT_EMPTY;
if (counterName != null) {
((MapReducePOStoreImpl)
impl).incrRecordCounter(counterName, 1);
@@ -256,19 +255,19 @@ public class POStore extends PhysicalOpe
}
return storer;
}
-
+
void setStoreFuncDecorator(StoreFuncDecorator sDecorator) {
this.sDecorator = sDecorator;
}
/**
- *
+ *
* @return The {@link StoreFuncDecorator} used to write Tuples
*/
public StoreFuncDecorator getStoreFuncDecorator() {
return sDecorator;
}
-
+
/**
* @param sortInfo the sortInfo to set
*/
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
Tue Jan 12 23:21:32 2016
@@ -43,22 +43,21 @@ import org.apache.pig.pen.util.ExampleTu
public class POStream extends PhysicalOperator {
private static final long serialVersionUID = 2L;
- private static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP,
null);
-
private String executableManagerStr; // String representing
ExecutableManager to use
- transient private ExecutableManager executableManager; //
ExecutableManager to use
private StreamingCommand command; // Actual command to be run
private Properties properties;
- private boolean initialized = false;
-
protected BlockingQueue<Result> binaryOutputQueue = new
ArrayBlockingQueue<Result>(1);
protected BlockingQueue<Result> binaryInputQueue = new
ArrayBlockingQueue<Result>(1);
- protected boolean allInputFromPredecessorConsumed = false;
+ private transient ExecutableManager executableManager; //
ExecutableManager to use
+
+ private transient boolean initialized = false;
+
+ protected transient boolean allInputFromPredecessorConsumed = false;
- protected boolean allOutputFromBinaryProcessed = false;
+ protected transient boolean allOutputFromBinaryProcessed = false;
/**
* This flag indicates whether streaming is done through fetching. If set,
@@ -143,7 +142,7 @@ public class POStream extends PhysicalOp
// The "allOutputFromBinaryProcessed" flag is set when we see
// an EOS (End of Stream output) from streaming binary
if(allOutputFromBinaryProcessed) {
- return EOP_RESULT;
+ return RESULT_EOP;
}
// if we are here AFTER all map() calls have been completed
@@ -158,7 +157,7 @@ public class POStream extends PhysicalOp
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
- r = EOP_RESULT;
+ r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
@@ -191,7 +190,7 @@ public class POStream extends PhysicalOp
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
- r = EOP_RESULT;
+ r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
}
}
@@ -202,7 +201,7 @@ public class POStream extends PhysicalOp
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
- r = EOP_RESULT;
+ r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
@@ -218,7 +217,7 @@ public class POStream extends PhysicalOp
// So we can send an EOP to the successor in
// the pipeline and also note this condition
// for future calls
- r = EOP_RESULT;
+ r = RESULT_EOP;
allOutputFromBinaryProcessed = true;
} else if (r.returnStatus == POStatus.STATUS_OK) {
illustratorMarkup(r.result, r.result, 0);
@@ -297,8 +296,9 @@ public class POStream extends PhysicalOp
// wait for either input to be available
// or output to be consumed
- while(binaryOutputQueue.isEmpty() &&
!binaryInputQueue.isEmpty())
+ while(binaryOutputQueue.isEmpty() &&
!binaryInputQueue.isEmpty()) {
wait();
+ }
}
}
@@ -382,4 +382,13 @@ public class POStream extends PhysicalOp
this.isFetchable = isFetchable;
}
+ @Override
+ public PhysicalOperator clone() throws CloneNotSupportedException {
+ POStream clone = (POStream)super.clone();
+ clone.binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+ clone.binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+ //Not cloning StreamingCommand as it is read only
+ return clone;
+ }
+
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
Tue Jan 12 23:21:32 2016
@@ -653,6 +653,7 @@ public class TezCompiler extends PhyPlan
public void visitCounter(POCounter op) throws VisitorException {
// Refer visitRank(PORank) for more details
try{
+ curTezOp.markRankCounter();
POCounterTez counterTez = new POCounterTez(op);
nonBlocking(counterTez);
phyToTezOpMap.put(op, curTezOp);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
Tue Jan 12 23:21:32 2016
@@ -179,7 +179,9 @@ public class TezOperator extends Operato
// Indicate if this job is a distinct job
DISTINCT,
// Indicate if this job is a native job
- NATIVE;
+ NATIVE,
+ // Indicate if this job does rank counter
+ RANK_COUNTER;
};
// Features in the job/vertex. Mostly will be only one feature.
@@ -442,6 +444,14 @@ public class TezOperator extends Operato
feature.set(OPER_FEATURE.NATIVE.ordinal());
}
+ public boolean isRankCounter() {
+ return feature.get(OPER_FEATURE.RANK_COUNTER.ordinal());
+ }
+
+ public void markRankCounter() {
+ feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
+ }
+
public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE>
excludeFeatures) {
for (OPER_FEATURE opf : OPER_FEATURE.values()) {
if (excludeFeatures != null && excludeFeatures.contains(opf)) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
Tue Jan 12 23:21:32 2016
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.runtime.api.LogicalInput;
@@ -152,7 +151,7 @@ public class POCounterStatsTez extends P
prevTasksCount += counterRecords.get(i);
}
- Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ Tuple tuple = mTupleFactory.newTuple(1);
tuple.set(0, counterOffsets);
writer.write(POValueOutputTez.EMPTY_KEY, tuple);
finished = true;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
Tue Jan 12 23:21:32 2016
@@ -215,4 +215,10 @@ public class POFRJoinTez extends POFRJoi
public List<String> getInputKeys() {
return inputKeys;
}
+
+ @Override
+ public POFRJoinTez clone() throws CloneNotSupportedException {
+ return (POFRJoinTez) super.clone();
+ }
+
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
Tue Jan 12 23:21:32 2016
@@ -34,7 +34,6 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -49,11 +48,11 @@ public class POLocalRearrangeTez extends
private static final Log LOG =
LogFactory.getLog(POLocalRearrangeTez.class);
protected String outputKey;
- protected transient KeyValueWriter writer;
-
protected boolean connectedToPackage = true;
protected boolean isSkewedJoin = false;
+ protected transient KeyValueWriter writer;
+
public POLocalRearrangeTez(OperatorKey k) {
super(k);
}
@@ -172,20 +171,9 @@ public class POLocalRearrangeTez extends
return inp;
}
- /**
- * Make a deep copy of this operator.
- * @throws CloneNotSupportedException
- */
@Override
public POLocalRearrangeTez clone() throws CloneNotSupportedException {
- POLocalRearrangeTez clone = new POLocalRearrangeTez(new OperatorKey(
- mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
- mKey.scope)), requestedParallelism);
- deepCopyTo(clone);
- clone.isSkewedJoin = isSkewedJoin;
- clone.connectedToPackage = connectedToPackage;
- clone.setOutputKey(outputKey);
- return clone;
+ return (POLocalRearrangeTez) super.clone();
}
@Override
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1724339&r1=1724338&r2=1724339&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java
Tue Jan 12 23:21:32 2016
@@ -33,16 +33,13 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
-import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.Pair;
@@ -58,13 +55,11 @@ public class POPartitionRearrangeTez ext
private static final long serialVersionUID = 1L;
private static final Log LOG =
LogFactory.getLog(POPartitionRearrangeTez.class);
- private static final TupleFactory tf = TupleFactory.getInstance();
- private static final BagFactory mBagFactory = BagFactory.getInstance();
// ReducerMap will store the tuple, max reducer index & min reducer index
- private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap();
- private Integer totalReducers = -1;
- private boolean inited = false;
+ private transient Map<Object, Pair<Integer, Integer>> reducerMap;
+ private transient Integer totalReducers;
+ private transient boolean inited;
public POPartitionRearrangeTez(OperatorKey k) {
this(k, -1);
@@ -202,6 +197,8 @@ public class POPartitionRearrangeTez ext
}
Map<String, Object> distMap = null;
+ totalReducers = -1;
+ reducerMap = Maps.newHashMap();
if (PigProcessor.sampleMap != null) {
// We've already collected sampleMap in PigProcessor
distMap = PigProcessor.sampleMap;
@@ -233,7 +230,7 @@ public class POPartitionRearrangeTez ext
if (idxTuple.size() > 3) {
// remove the last 2 fields of the tuple, i.e: minIndex
// and maxIndex and store it in the reducer map
- Tuple keyTuple = tf.newTuple();
+ Tuple keyTuple = mTupleFactory.newTuple();
for (int i=0; i < idxTuple.size() - 2; i++) {
keyTuple.append(idxTuple.get(i));
}
@@ -259,13 +256,6 @@ public class POPartitionRearrangeTez ext
@Override
public POPartitionRearrangeTez clone() throws CloneNotSupportedException {
- POPartitionRearrangeTez clone = new POPartitionRearrangeTez(new
OperatorKey(
- mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
- mKey.scope)), requestedParallelism);
- deepCopyTo(clone);
- clone.isSkewedJoin = isSkewedJoin;
- clone.connectedToPackage = connectedToPackage;
- clone.setOutputKey(outputKey);
- return clone;
+ return (POPartitionRearrangeTez) super.clone();
}
}