Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Fri Mar 4 18:17:39 2016 @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.BinInterSedes; import org.apache.pig.data.DataType; @@ -46,6 +45,7 @@ public class PigTupleDefaultRawComparato super(TupleFactory.getInstance().tupleClass()); } + @Override public void setConf(Configuration conf) { try { mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder")); @@ -62,6 +62,7 @@ public class PigTupleDefaultRawComparato mWholeTuple = (mAsc.length == 1); } + @Override public Configuration getConf() { return null; } @@ -78,9 +79,9 @@ public class PigTupleDefaultRawComparato * IntWritable.compare() is used. If both are null then the indices are * compared. Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; - mHasNullField = false; Tuple t1; Tuple t2; @@ -99,9 +100,16 @@ public class PigTupleDefaultRawComparato rc = compareTuple(t1, t2); //TODO think about how SchemaTuple could speed this up + // handle PIG-927. If tuples are equal but any field inside tuple is null, + // then we do not merge keys if indices are not same + if (rc == 0 && mHasNullField) { + rc = ((NullableTuple) t1).getIndex() - ((NullableTuple) t2).getIndex(); + } + return rc; } + @Override public int compare(Object o1, Object o2) { NullableTuple nt1 = (NullableTuple) o1; NullableTuple nt2 = (NullableTuple) o2; @@ -110,10 +118,16 @@ public class PigTupleDefaultRawComparato // If either are null, handle differently. if (!nt1.isNull() && !nt2.isNull()) { rc = compareTuple((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType()); + // handle PIG-927. If tuples are equal but any field inside tuple is null, + // then we do not merge keys if indices are not same + if (rc == 0 && mHasNullField) { + rc = nt1.getIndex() - nt2.getIndex(); + } } else { - // For sorting purposes two nulls are equal. - if (nt1.isNull() && nt2.isNull()) - rc = 0; + // Two nulls are equal if indices are same + if (nt1.isNull() && nt2.isNull()) { + rc = nt1.getIndex() - nt2.getIndex(); + } else if (nt1.isNull()) rc = -1; else @@ -125,6 +139,7 @@ public class PigTupleDefaultRawComparato } private int compareTuple(Tuple t1, Tuple t2) { + mHasNullField = false; int sz1 = t1.size(); int sz2 = t2.size(); if (sz2 < sz1) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Fri Mar 4 18:17:39 2016 @@ -23,9 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.mapred.JobConf; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -90,18 +88,26 @@ public class PigTupleSortComparator exte } /** - * Compare two NullableTuples as raw bytes. Tuples are compared field-wise. If both are null they are defined equal. + * Compare two NullableTuples as raw bytes. Tuples are compared field-wise. + * If both are null, then the indices are compared. * Otherwise the null one is defined to be less. */ + @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int rc = 0; if (b1[s1] == 0 && b2[s2] == 0) { // skip mNull and mIndex rc = mComparator.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); + // handle PIG-927. If tuples are equal but any field inside tuple is null, + // then we do not merge keys if indices are not same + if (rc == 0 && mComparator.hasComparedTupleNull()) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } } else { - // for sorting purposes two nulls are equal, null sorts first - if (b1[s1] != 0 && b2[s2] != 0) - rc = 0; + // Two nulls are equal if indices are same + if (b1[s1] != 0 && b2[s2] != 0) { + rc = b1[s1 + 1] - b2[s2 + 1]; + } else if (b1[s1] != 0) rc = -1; else @@ -112,6 +118,7 @@ public class PigTupleSortComparator exte return rc; } + @Override @SuppressWarnings("unchecked") public int compare(Object o1, Object o2) { NullableTuple nt1 = (NullableTuple) o1; @@ -121,10 +128,16 @@ public class PigTupleSortComparator exte // If either are null, handle differently. if (!nt1.isNull() && !nt2.isNull()) { rc = mComparator.compare((Tuple) nt1.getValueAsPigType(), (Tuple) nt2.getValueAsPigType()); + // handle PIG-927. If tuples are equal but any field inside tuple is null, + // then we do not merge keys if indices are not same + if (rc == 0 && mComparator.hasComparedTupleNull()) { + rc = nt1.getIndex() - nt2.getIndex(); + } } else { - // For sorting purposes two nulls are equal. - if (nt1.isNull() && nt2.isNull()) - rc = 0; + // Two nulls are equal if indices are same + if (nt1.isNull() && nt2.isNull()) { + rc = nt1.getIndex() - nt2.getIndex(); + } else if (nt1.isNull()) rc = -1; else Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ProgressableReporter.java Fri Mar 4 18:17:39 2016 @@ -17,8 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; -import java.io.IOException; - import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable; @@ -26,7 +24,7 @@ public class ProgressableReporter implem TaskAttemptContext rep; public ProgressableReporter(){ - + } public ProgressableReporter(TaskAttemptContext rep) { @@ -34,14 +32,19 @@ public class ProgressableReporter implem this.rep = rep; } + @Override public void progress() { - if(rep!=null) + if (rep != null) { rep.progress(); + } } + @Override public void progress(String msg) { try { - rep.setStatus(msg); + if (rep != null) { + rep.setStatus(msg); + } }catch (Exception e) { rep.progress(); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/SkewedPartitioner.java Fri Mar 4 18:17:39 2016 @@ -47,7 +47,7 @@ import com.google.common.collect.Maps; public class SkewedPartitioner extends Partitioner<PigNullableWritable, Writable> implements Configurable { protected static final TupleFactory tf = TupleFactory.getInstance(); - protected Map<Tuple, Pair<Integer, Integer>> reducerMap = Maps.newHashMap(); + protected Map<Object, Pair<Integer, Integer>> reducerMap; protected Integer totalReducers = -1; protected boolean inited = false; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Mar 4 18:17:39 2016 @@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; @@ -110,6 +111,18 @@ public class EndOfAllInputSetter extends endOfAllInputFlag = true; } + @Override + public void visitPOForEach(POForEach foreach) throws VisitorException { + try { + if (foreach.needEndOfAllInputProcessing()) { + endOfAllInputFlag = true; + } + } catch (Exception e) { + throw new VisitorException(e); + } + } + + /** * @return if end of all input is present */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Mar 4 18:17:39 2016 @@ -24,8 +24,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.JVMReuseManager; -import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -33,6 +31,7 @@ import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.Operator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -69,6 +68,8 @@ public abstract class PhysicalOperator e protected static final long serialVersionUID = 1L; protected static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null); protected static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null); + protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); + protected static final BagFactory mBagFactory = BagFactory.getInstance(); // The degree of parallelism requested protected int requestedParallelism; @@ -122,10 +123,6 @@ public abstract class PhysicalOperator e private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>(); - static { - JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class); - } - public PhysicalOperator(OperatorKey k) { this(k, -1, null); } @@ -295,12 +292,13 @@ public abstract class PhysicalOperator e try { if (input == null && (inputs == null || inputs.size() == 0)) { // log.warn("No inputs found. Signaling End of Processing."); - return new Result(POStatus.STATUS_EOP, null); + return RESULT_EOP; } // Should be removed once the model is clear - if (getReporter() != null) { - getReporter().progress(); + PigProgressable progRep = getReporter(); + if (progRep != null) { + progRep.progress(); } if (!isInputAttached()) { @@ -409,7 +407,7 @@ public abstract class PhysicalOperator e public Result getNextDataBag() throws ExecException { Result val = new Result(); - DataBag tmpBag = BagFactory.getInstance().newDefaultBag(); + DataBag tmpBag = mBagFactory.newDefaultBag(); for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) { if (ret.returnStatus == POStatus.STATUS_ERR) { return ret; @@ -457,14 +455,17 @@ public abstract class PhysicalOperator e PhysicalOperator.reporter.set(reporter); } - @StaticDataCleanup + //@StaticDataCleanup public static void staticDataCleanup() { reporter = new ThreadLocal<PigProgressable>(); } /** - * Make a deep copy of this operator. This function is blank, however, + * Make a copy of this operator. This function is blank, however, * we should leave a place holder so that the subclasses can clone + * to make deep copy as this one creates a shallow copy of + * non-primitive types (objects, arrays and lists) + * * @throws CloneNotSupportedException */ @Override @@ -477,6 +478,14 @@ public abstract class PhysicalOperator e originalLocations.addAll(op.originalLocations); } + protected static List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException { + List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size()); + for (PhysicalPlan plan : origPlans) { + clonePlans.add(plan.clone()); + } + return clonePlans; + } + /** * @param physicalPlan */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Mar 4 18:17:39 2016 @@ -1984,7 +1984,22 @@ public class POCast extends ExpressionOp @Override public Result getNextDataByteArray() throws ExecException { + PhysicalOperator in = inputs.get(0); + Byte resultType = in.getResultType(); + if (resultType != DataType.BYTEARRAY) return error(); + + DataByteArray dba = null; + Result res = in.getNextDataByteArray(); + if (res.returnStatus == POStatus.STATUS_OK && res.result != null) { + try { + dba = (DataByteArray) res.result; + } catch (ClassCastException e) { + return error(); + } + if (dba != null) return res; + } + return res; } private void readObject(ObjectInputStream is) throws IOException, Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Fri Mar 4 18:17:39 2016 @@ -27,12 +27,10 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.SingleTupleBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -51,10 +49,6 @@ public class POProject extends Expressio */ private static final long serialVersionUID = 1L; - private static TupleFactory tupleFactory = TupleFactory.getInstance(); - - protected static final BagFactory bagFactory = BagFactory.getInstance(); - private boolean resultSingleTupleBag = false; //The column to project @@ -62,20 +56,20 @@ public class POProject extends Expressio //True if we are in the middle of streaming tuples //in a bag - boolean processingBagOfTuples = false; + private boolean processingBagOfTuples = false; //The bag iterator used while straeming tuple - transient Iterator<Tuple> bagIterator = null; + private transient Iterator<Tuple> bagIterator = null; //Represents the fact that this instance of POProject //is overloaded to stream tuples in the bag rather //than passing the entire bag. It is the responsibility //of the translator to set this. - boolean overloaded = false; + protected boolean overloaded = false; - private boolean isProjectToEnd = false; - private int startCol; + protected boolean isProjectToEnd = false; + protected int startCol; public POProject(OperatorKey k) { this(k,-1,0); @@ -191,7 +185,7 @@ public class POProject extends Expressio for(int col : columns) { addColumn(objList, inpValue, col); } - ret = tupleFactory.newTupleNoCopy(objList); + ret = mTupleFactory.newTupleNoCopy(objList); } res.result = ret; illustratorMarkup(inpValue, res.result, -1); @@ -277,20 +271,20 @@ public class POProject extends Expressio for (int col : columns) { addColumn(objList, tuple, col); } - outBag = new SingleTupleBag( tupleFactory.newTupleNoCopy(objList) ); + outBag = new SingleTupleBag( mTupleFactory.newTupleNoCopy(objList) ); }else { Tuple tmpTuple = getRangeTuple(tuple); outBag = new SingleTupleBag(tmpTuple); } } else { - outBag = bagFactory.newDefaultBag(); + outBag = mBagFactory.newDefaultBag(); for (Tuple tuple : inpBag) { if(!isProjectToEnd){ ArrayList<Object> objList = new ArrayList<Object>(columns.size()); for (int col : columns) { addColumn(objList, tuple, col); } - outBag.add( tupleFactory.newTupleNoCopy(objList) ); + outBag.add( mTupleFactory.newTupleNoCopy(objList) ); }else{ Tuple outTuple = getRangeTuple(tuple); outBag.add(outTuple); @@ -321,14 +315,14 @@ public class POProject extends Expressio Tuple outTuple; if(isRangeInvalid(lastColIdx)){ //invalid range - return empty tuple - outTuple = tupleFactory.newTuple(); + outTuple = mTupleFactory.newTuple(); } else { ArrayList<Object> objList = new ArrayList<Object>(lastColIdx - startCol + 1); for(int i = startCol; i <= lastColIdx ; i++){ addColumn(objList, tuple, i); } - outTuple = tupleFactory.newTupleNoCopy(objList); + outTuple = mTupleFactory.newTupleNoCopy(objList); } return outTuple; } @@ -451,7 +445,7 @@ public class POProject extends Expressio objList.add(null); } } - ret = tupleFactory.newTuple(objList); + ret = mTupleFactory.newTuple(objList); res.result = (Tuple)ret; return res; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORelationToExprProject.java Fri Mar 4 18:17:39 2016 @@ -139,23 +139,24 @@ public class PORelationToExprProject ext sendEmptyBagOnEOP = false; return(r); } - + // See PIG-4644 @Override public PORelationToExprProject clone() throws CloneNotSupportedException { - ArrayList<Integer> cols = new ArrayList<>(columns.size()); + ArrayList<Integer> cols = new ArrayList<Integer>(columns.size()); // Can reuse the same Integer objects, as they are immutable for (Integer i : columns) { cols.add(i); } PORelationToExprProject clone = new PORelationToExprProject(new OperatorKey(mKey.scope, - NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), - requestedParallelism, cols); + NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), + requestedParallelism, cols); clone.cloneHelper(this); clone.overloaded = overloaded; + clone.startCol = startCol; + clone.isProjectToEnd = isProjectToEnd; clone.resultType = resultType; clone.sendEmptyBagOnEOP = sendEmptyBagOnEOP; return clone; } - } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri Mar 4 18:17:39 2016 @@ -45,7 +45,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.MonitoredUDFExecutor; import org.apache.pig.builtin.MonitoredUDF; -import org.apache.pig.builtin.RollupDimensions; import org.apache.pig.data.DataType; import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; import org.apache.pig.data.SchemaTupleFactory; @@ -87,28 +86,6 @@ public class POUserFunc extends Expressi private long timingFrequency = 100L; private boolean doTiming = false; - private static final String ROLLUP_UDF = RollupDimensions.class.getName(); - //the pivot value - private int pivot = -1; - - private boolean rollupHIIoptimizable = false; - - public void setPivot(int pvt) { - this.pivot = pvt; - } - - public int getPivot() { - return this.pivot; - } - - public void setRollupHIIOptimizable(boolean check) { - this.rollupHIIoptimizable = check; - } - - public boolean getRollupHIIOptimizable() { - return this.rollupHIIoptimizable; - } - public PhysicalOperator getReferencedOperator() { return referencedOperator; } @@ -154,17 +131,6 @@ public class POUserFunc extends Expressi if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) { executor = new MonitoredUDFExecutor(func); } - - if (funcSpec.getClassName().equals(ROLLUP_UDF) && this.rollupHIIoptimizable != false) { - try { - ((RollupDimensions) func).setPivot(this.pivot); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - ((RollupDimensions) func).setRollupHIIOptimizable(this.rollupHIIoptimizable); - } - //the next couple of initializations do not work as intended for the following reasons //the reporter and pigLogger are member variables of PhysicalOperator //when instanitateFunc is invoked at deserialization time, both @@ -355,6 +321,9 @@ public class POUserFunc extends Expressi } } } else { + if (parentPlan!=null && parentPlan.endOfAllInput && needEndOfAllInputProcessing()) { + func.setEndOfAllInput(true); + } if (executor != null) { result.result = executor.monitorExec((Tuple) result.result); } else { @@ -601,6 +570,8 @@ public class POUserFunc extends Expressi requestedParallelism, null, funcSpec.clone()); clone.setResultType(resultType); clone.signature = signature; + clone.cacheFiles = cacheFiles; + clone.shipFiles = shipFiles; return clone; } @@ -640,6 +611,10 @@ public class POUserFunc extends Expressi return func; } + public String getSignature() { + return signature; + } + public void setSignature(String signature) { this.signature = signature; if (this.func!=null) { @@ -659,4 +634,7 @@ public class POUserFunc extends Expressi } } + public boolean needEndOfAllInputProcessing() { + return getFunc().needEndOfAllInputProcessing(); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java Fri Mar 4 18:17:39 2016 @@ -19,13 +19,12 @@ package org.apache.pig.backend.hadoop.ex import org.apache.pig.data.Tuple; import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.util.IdentityHashSet; /** * This is a base class for all unary comparison operators. Supports the * use of operand type instead of result type as the result type is * always boolean. - * + * */ public abstract class UnaryComparisonOperator extends UnaryExpressionOperator implements ComparisonOperator { @@ -35,7 +34,7 @@ public abstract class UnaryComparisonOpe //The result will be comunicated using the Status object. //This is a slight abuse of the status object. protected byte operandType; - + public UnaryComparisonOperator(OperatorKey k) { this(k,-1); } @@ -44,14 +43,16 @@ public abstract class UnaryComparisonOpe super(k, rp); } + @Override public byte getOperandType() { return operandType; } + @Override public void setOperandType(byte operandType) { this.operandType = operandType; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { if(illustrator != null) { @@ -59,4 +60,9 @@ public abstract class UnaryComparisonOpe } return null; } + + protected void cloneHelper(UnaryComparisonOperator op) { + super.cloneHelper(op); + this.operandType = op.operandType; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Mar 4 18:17:39 2016 @@ -67,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -153,10 +152,6 @@ public class PhyPlanVisitor extends Plan } } - public void visitPORollupHIIForEach(PORollupHIIForEach nhfe) throws VisitorException { - visitPOForEach(nhfe); - } - public void visitUnion(POUnion un) throws VisitorException{ //do nothing } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCounter.java Fri Mar 4 18:17:39 2016 @@ -28,7 +28,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.pen.util.ExampleTuple; @@ -69,8 +68,6 @@ public class POCounter extends PhysicalO **/ private boolean isRowNumber = false; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - /** * Local counter for tuples on the same task. **/ @@ -321,4 +318,14 @@ public class POCounter extends PhysicalO public String getOperationID() { return operationID; } + + @Override + public POCounter clone() throws CloneNotSupportedException { + POCounter clone = (POCounter)super.clone(); + clone.localCount = new Long(localCount); + clone.taskID = new Integer(taskID); + // counterPlans and mAscCols unused. Not cloning them + return clone; + } + } \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Fri Mar 4 18:17:39 2016 @@ -25,7 +25,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -37,20 +36,20 @@ import org.apache.pig.pen.util.LineageTr /** * Recover this class for nested cross operation. - * - * + * + * */ public class POCross extends PhysicalOperator { private static final long serialVersionUID = 1L; - protected DataBag[] inputBags; + protected transient DataBag[] inputBags; - protected Tuple[] data; + protected transient Tuple[] data; protected transient Iterator<Tuple>[] its; - - protected Tuple tupleOfLastBag; + + protected transient Tuple tupleOfLastBag; public POCross(OperatorKey k) { super(k); @@ -197,7 +196,7 @@ public class POCross extends PhysicalOpe its = new Iterator[length]; for (int i = 0; i < length; ++i) { PhysicalOperator op = inputs.get(i); - DataBag bag = BagFactory.getInstance().newDefaultBag(); + DataBag bag = mBagFactory.newDefaultBag(); inputBags[count] = bag; for (Result res = op.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = op .getNextTuple()) { @@ -226,7 +225,7 @@ public class POCross extends PhysicalOpe return illustratorMarkup(out, out, 0); } - + private boolean loadLastBag() throws ExecException { Result resOfLastBag = null; int index = inputs.size() - 1; @@ -247,7 +246,7 @@ public class POCross extends PhysicalOpe "Error accumulating data in the local Cross operator"); } } - + private void clearMemory() { // reset inputBags, its, data and tupleOfLastBag to null so that in the // next round of getNext, the new input data will be loaded. Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Fri Mar 4 18:17:39 2016 @@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.InternalDistinctBag; @@ -49,9 +48,8 @@ import org.apache.pig.impl.plan.VisitorE public class PODistinct extends PhysicalOperator implements Cloneable { private static final Log log = LogFactory.getLog(PODistinct.class); private static final long serialVersionUID = 1L; - private boolean inputsAccumulated = false; - private DataBag distinctBag = null; - + private transient boolean inputsAccumulated; + private transient DataBag distinctBag; private transient boolean initialized; private transient boolean useDefaultBag; private transient Iterator<Tuple> it; @@ -102,7 +100,7 @@ public class PODistinct extends Physical } } } - distinctBag = useDefaultBag ? BagFactory.getInstance().newDistinctBag() + distinctBag = useDefaultBag ? mBagFactory.newDistinctBag() : new InternalDistinctBag(3); Result in = processInput(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Mar 4 18:17:39 2016 @@ -46,7 +46,6 @@ import org.apache.pig.data.SchemaTupleBa import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; import org.apache.pig.data.SchemaTupleFactory; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -70,6 +69,7 @@ import org.apache.pig.impl.plan.VisitorE public class POFRJoin extends PhysicalOperator { private static final Log log = LogFactory.getLog(POFRJoin.class); private static final long serialVersionUID = 1L; + // The number in the input list which denotes the fragmented input protected int fragment; // There can be n inputs each being a List<PhysicalPlan> @@ -85,18 +85,7 @@ public class POFRJoin extends PhysicalOp protected ConstantExpression[] constExps; // Used to produce the cross product of various bags protected POForEach fe; - // The array of Hashtables one per replicated input. replicates[fragment] = - // null - // fragment is the input which is fragmented and not replicated. - protected TupleToMapKey replicates[]; - // varaible which denotes whether we are returning tuples from the foreach - // operator - protected boolean processingPlan; - // A dummy tuple - protected Tuple dumTup = TupleFactory.getInstance().newTuple(1); - // An instance of tuple factory - protected transient TupleFactory mTupleFactory; - protected boolean setUp; + // A Boolean variable which denotes if this is a LeftOuter Join or an Inner // Join protected boolean isLeftOuterJoin; @@ -106,6 +95,16 @@ public class POFRJoin extends PhysicalOp protected Schema[] inputSchemas; protected Schema[] keySchemas; + // The array of Hashtables one per replicated input. replicates[fragment] = + // null fragment is the input which is fragmented and not replicated. + protected transient TupleToMapKey replicates[]; + // varaible which denotes whether we are returning tuples from the foreach + // operator + protected transient boolean processingPlan; + // A dummy tuple + protected transient Tuple dumTup; + protected transient boolean setUp; + public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes, FileSpec[] replFiles, int fragment, boolean isLeftOuter, @@ -126,12 +125,10 @@ public class POFRJoin extends PhysicalOp this.fragment = fragment; this.keyTypes = keyTypes; this.replFiles = replFiles; - replicates = new TupleToMapKey[ppLists.size()]; + LRs = new POLocalRearrange[ppLists.size()]; constExps = new ConstantExpression[ppLists.size()]; createJoinPlans(k); - processingPlan = false; - mTupleFactory = TupleFactory.getInstance(); List<Tuple> tupList = new ArrayList<Tuple>(); tupList.add(nullTuple); nullBag = new NonSpillableDataBag(tupList); @@ -159,7 +156,6 @@ public class POFRJoin extends PhysicalOp this.fe = copy.fe; this.constExps = copy.constExps; this.processingPlan = copy.processingPlan; - this.mTupleFactory = copy.mTupleFactory; this.nullBag = copy.nullBag; this.isLeftOuterJoin = copy.isLeftOuterJoin; this.inputSchemas = copy.inputSchemas; @@ -173,7 +169,7 @@ public class POFRJoin extends PhysicalOp /** * Configures the Local Rearrange operators & the foreach operator - * + * * @param old * @throws ExecException */ @@ -238,6 +234,8 @@ public class POFRJoin extends PhysicalOp Result res = null; Result inp = null; if (!setUp) { + replicates = new TupleToMapKey[phyPlanLists.size()]; + dumTup = mTupleFactory.newTuple(1); setUpHashMap(); setUp = true; } @@ -254,7 +252,7 @@ public class POFRJoin extends PhysicalOp if (res.returnStatus == POStatus.STATUS_EOP) { // We have completed all cross-products now its time to move // to next tuple of left side - processingPlan = false; + processingPlan = false; break; } if (res.returnStatus == POStatus.STATUS_ERR) { @@ -284,7 +282,7 @@ public class POFRJoin extends PhysicalOp return new Result(); } Tuple lrOutTuple = (Tuple) lrOut.result; - Tuple key = TupleFactory.getInstance().newTuple(1); + Tuple key = mTupleFactory.newTuple(1); key.set(0, lrOutTuple.get(1)); Tuple value = getValueTuple(lr, lrOutTuple); lr.detachInput(); @@ -390,9 +388,9 @@ public class POFRJoin extends PhysicalOp POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile); - + Properties props = ConfigurationUtil.getLocalFSProperties(); - PigContext pc = new PigContext(ExecType.LOCAL, props); + PigContext pc = new PigContext(ExecType.LOCAL, props); ld.setPc(pc); // We use LocalRearrange Operator to seperate Key and Values // eg. ( a, b, c ) would generate a, ( a, b, c ) @@ -437,11 +435,10 @@ public class POFRJoin extends PhysicalOp } return false; } - + private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException { is.defaultReadObject(); - mTupleFactory = TupleFactory.getInstance(); // setUpHashTable(); } @@ -535,4 +532,28 @@ public class POFRJoin extends PhysicalOp // no op: all handled by the preceding POForEach return null; } + + @Override + public POFRJoin clone() throws CloneNotSupportedException { + POFRJoin clone = (POFRJoin) super.clone(); + // Not doing deep copy of nullBag, nullBag, inputSchemas, keySchemas + // as they are read only + clone.phyPlanLists = new ArrayList<List<PhysicalPlan>>(phyPlanLists.size()); + for (List<PhysicalPlan> ppLst : phyPlanLists) { + clone.phyPlanLists.add(clonePlans(ppLst)); + } + + clone.LRs = new POLocalRearrange[phyPlanLists.size()]; + clone.constExps = new ConstantExpression[phyPlanLists.size()]; + try { + clone.createJoinPlans(getOperatorKey()); + } catch (ExecException e) { + CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting plans of " + this.getClass().getSimpleName()); + cnse.initCause(e); + throw cnse; + } + return clone; + } + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Mar 4 18:17:39 2016 @@ -36,30 +36,29 @@ import org.apache.pig.impl.plan.VisitorE * avoid many function calls, the filter operator, stores the Comparison * Operator that is the root of the Expression Plan and uses its getNext * directly. - * + * * Since the filter is supposed to return tuples only, getNext is not supported * on any other data type. - * + * */ public class POFilter extends PhysicalOperator { /** - * + * */ private static final long serialVersionUID = 1L; // The expression plan - PhysicalPlan plan; + private PhysicalPlan plan; // The root comparison operator of the expression plan -// ComparisonOperator comOp; - PhysicalOperator comOp; - + // ComparisonOperator comOp; + private PhysicalOperator comOp; // The operand type for the comparison operator needed // to call the comparison operators getNext with the // appropriate type - byte compOperandType; + // private byte compOperandType; public POFilter(OperatorKey k) { this(k, -1, null); @@ -186,7 +185,7 @@ public class POFilter extends PhysicalOp public PhysicalPlan getPlan() { return plan; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { if (illustrator != null) { @@ -202,4 +201,12 @@ public class POFilter extends PhysicalOp } return (Tuple) out; } + + @Override + public PhysicalOperator clone() throws CloneNotSupportedException { + POFilter opClone = (POFilter) super.clone(); + opClone.setPlan(plan.clone()); + return opClone; + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Mar 4 18:17:39 2016 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; @@ -41,6 +42,7 @@ import org.apache.pig.data.SchemaTupleFa import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.data.TupleMaker; +import org.apache.pig.data.UnlimitedNullTuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -55,26 +57,13 @@ public class POForEach extends PhysicalO private static final long serialVersionUID = 1L; protected List<PhysicalPlan> inputPlans; - protected List<PhysicalOperator> opsToBeReset; - //Since the plan has a generate, this needs to be maintained - //as the generate can potentially return multiple tuples for - //same call. - protected boolean processingPlan = false; - - //its holds the iterators of the databags given by the input expressions which need flattening. - transient protected Iterator<Tuple> [] its = null; - //This holds the outputs given out by the input expressions of any datatype - protected Object [] bags = null; + protected List<PhysicalOperator> opsToBeReset; - //This is the template whcih contains tuples and is flattened out in createTuple() to generate the final output - protected Object[] data = null; + protected PhysicalOperator[] planLeafOps; // store result types of the plan leaves - protected byte[] resultTypes = null; - - // store whether or not an accumulative UDF has terminated early - protected BitSet earlyTermination = null; + protected byte[] resultTypes; // array version of isToBeFlattened - this is purely // for optimization - instead of calling isToBeFlattened.get(i) @@ -83,19 +72,40 @@ public class POForEach extends PhysicalO // so we can also save on the Boolean.booleanValue() calls protected boolean[] isToBeFlattenedArray; - ExampleTuple tIn = null; protected int noItems; - protected PhysicalOperator[] planLeafOps = null; + //Since the plan has a generate, this needs to be maintained + //as the generate can potentially return multiple tuples for + //same call. + protected transient boolean processingPlan; + + //its holds the iterators of the databags given by the input expressions which need flattening. + protected transient Iterator<Tuple> [] its = null; + + //This holds the outputs given out by the input expressions of any datatype + protected transient Object[] bags ; + + //This is the template whcih contains tuples and is flattened out in createTuple() to generate the final output + protected transient Object[] data; + + // store whether or not an accumulative UDF has terminated early + protected transient BitSet earlyTermination; + + protected transient ExampleTuple tIn; + protected transient AccumulativeTupleBuffer buffer; - protected Tuple inpTuple; + protected transient Tuple inpTuple; + + protected transient boolean endOfAllInputProcessed; // Indicate the foreach statement can only in map side // Currently only used in MR cross (See PIG-4175) protected boolean mapSideOnly = false; + protected Boolean endOfAllInputProcessing = false; + private Schema schema; public POForEach(OperatorKey k) { @@ -244,13 +254,21 @@ public class POForEach extends PhysicalO //read while (true) { inp = processInput(); - if (inp.returnStatus == POStatus.STATUS_EOP || - inp.returnStatus == POStatus.STATUS_ERR) { + + if (inp.returnStatus == POStatus.STATUS_ERR) { return inp; } if (inp.returnStatus == POStatus.STATUS_NULL) { continue; } + if (inp.returnStatus == POStatus.STATUS_EOP) { + if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) { + // continue pull one more output + inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); + } else { + return inp; + } + } attachInputToPlans((Tuple) inp.result); inpTuple = (Tuple)inp.result; @@ -357,6 +375,9 @@ public class POForEach extends PhysicalO if(its == null) { + if (endOfAllInputProcessed) { + return RESULT_EOP; + } //getNext being called for the first time OR starting with a set of new data from inputs its = new Iterator[noItems]; bags = new Object[noItems]; @@ -424,6 +445,9 @@ public class POForEach extends PhysicalO its[i] = null; } } + if (parentPlan!=null && parentPlan.endOfAllInput && endOfAllInputProcessing) { + endOfAllInputProcessed = true; + } } // if accumulating, we haven't got data yet for some fields, just return @@ -658,16 +682,13 @@ public class POForEach extends PhysicalO } } - List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(opsToBeReset.size()); - for (PhysicalOperator op : opsToBeReset) { - ops.add(op); - } POForEach clone = new POForEach(new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), requestedParallelism, plans, flattens); - clone.setOpsToBeReset(ops); clone.setResultType(getResultType()); clone.addOriginalLocation(alias, getOriginalLocations()); + clone.endOfAllInputProcessing = endOfAllInputProcessing; + clone.mapSideOnly = mapSideOnly; return clone; } @@ -798,4 +819,21 @@ public class POForEach extends PhysicalO public boolean isMapSideOnly() { return mapSideOnly; } + + public boolean needEndOfAllInputProcessing() throws ExecException { + try { + for (PhysicalPlan innerPlan : inputPlans) { + UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor + = new UDFEndOfAllInputNeededVisitor(innerPlan); + endOfAllInputNeededVisitor.visit(); + if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) { + endOfAllInputProcessing = true; + return true; + } + } + return false; + } catch (Exception e) { + throw new ExecException(e); + } + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Fri Mar 4 18:17:39 2016 @@ -27,7 +27,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.pen.util.ExampleTuple; @@ -38,14 +37,14 @@ public class POLimit extends PhysicalOpe */ private static final long serialVersionUID = 1L; - // Counts for outputs processed - private long soFar = 0; - // Number of limited outputs - long mLimit; + private long mLimit; // The expression plan - PhysicalPlan expressionPlan; + private PhysicalPlan expressionPlan; + + // Counts for outputs processed + private transient long soFar = 0; public POLimit(OperatorKey k) { this(k, -1, null); @@ -159,15 +158,11 @@ public class POLimit extends PhysicalOpe @Override public POLimit clone() throws CloneNotSupportedException { - POLimit newLimit = new POLimit(new OperatorKey(this.mKey.scope, - NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), - this.requestedParallelism, this.inputs); - newLimit.mLimit = this.mLimit; + POLimit clone = (POLimit) super.clone(); if (this.expressionPlan != null) { - newLimit.expressionPlan = this.expressionPlan.clone(); + clone.expressionPlan = this.expressionPlan.clone(); } - newLimit.addOriginalLocation(alias, getOriginalLocations()); - return newLimit; + return clone; } @Override Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Mar 4 18:17:39 2016 @@ -35,9 +35,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.io.PigNullableWritable; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; @@ -57,8 +55,6 @@ public class POLocalRearrange extends Ph */ protected static final long serialVersionUID = 1L; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - private static final Result ERR_RESULT = new Result(); protected List<PhysicalPlan> plans; @@ -82,8 +78,6 @@ public class POLocalRearrange extends Ph protected boolean isCross = false; - protected Result inp; - // map to store mapping of projected columns to // the position in the "Key" where these will be projected to. // We use this information to strip off these columns @@ -133,6 +127,8 @@ public class POLocalRearrange extends Ph // By default, we strip keys from the value. private boolean stripKeyFromValue = true; + protected transient Result inp; + public POLocalRearrange(OperatorKey k) { this(k, -1, null); } @@ -709,32 +705,18 @@ public class POLocalRearrange extends Ph */ @Override public POLocalRearrange clone() throws CloneNotSupportedException { - POLocalRearrange clone = new POLocalRearrange(new OperatorKey( - mKey.scope, - NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), - requestedParallelism); - deepCopyTo(clone); - return clone; - } - - protected void deepCopyTo(POLocalRearrange clone) - throws CloneNotSupportedException { - - clone.setParentPlan(parentPlan); - clone.index = index; - if (useSecondaryKey) { - clone.keyType = mainKeyType; - } else { - clone.keyType = keyType; - } - clone.setUseSecondaryKey(useSecondaryKey); + POLocalRearrange clone = (POLocalRearrange) super.clone(); + // Constructor + clone.leafOps = new ArrayList<ExpressionOperator>(); + clone.secondaryLeafOps = new ArrayList<ExpressionOperator>(); // Needs to be called as setDistinct so that the fake index tuple gets // created. clone.setDistinct(mIsDistinct); - clone.setCross(isCross); - clone.addOriginalLocation(alias, getOriginalLocations()); - clone.setStripKeyFromValue(stripKeyFromValue); - + // Set the keyType to mainKeyType. setSecondaryPlans will calculate + // based on that and set keyType to the final value + if (useSecondaryKey) { + clone.keyType = mainKeyType; + } try { clone.setPlans(clonePlans(plans)); if (secondaryPlans != null) { @@ -745,14 +727,7 @@ public class POLocalRearrange extends Ph cnse.initCause(pe); throw cnse; } - } - - private List<PhysicalPlan> clonePlans(List<PhysicalPlan> origPlans) throws CloneNotSupportedException { - List<PhysicalPlan> clonePlans = new ArrayList<PhysicalPlan>(origPlans.size()); - for (PhysicalPlan plan : origPlans) { - clonePlans.add(plan.clone()); - } - return clonePlans; + return clone; } public boolean isCross() { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Mar 4 18:17:39 2016 @@ -581,7 +581,7 @@ public class POMergeCogroup extends Phys } catch (ExecException e) { // Alas, no choice but to throw Runtime exception. - String errMsg = "Exception occured in compare() of heap in POMergeCogroup."; + String errMsg = "Exception occurred in compare() of heap in POMergeCogroup."; throw new RuntimeException(errMsg,e); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Mar 4 18:17:39 2016 @@ -70,6 +70,12 @@ public class POMergeJoin extends Physica private static final long serialVersionUID = 1L; + private static final String keyOrderReminder = "Remember that you should " + + "not change the order of keys before a merge join in a FOREACH or " + + "manipulate join keys in a UDF in a way that would change the sort " + + "order. UDFs in a FOREACH are allowed as long as they do not change" + + "the join key values in a way that would change the sort order.\n"; + // flag to indicate when getNext() is called first. private boolean firstTime = true; @@ -123,7 +129,7 @@ public class POMergeJoin extends Physica private String signature; - private byte endOfRecordMark; + private byte endOfRecordMark = POStatus.STATUS_NULL; // This serves as the default TupleFactory private transient TupleFactory mTupleFactory; @@ -159,15 +165,6 @@ public class POMergeJoin extends Physica this.joinType = joinType; this.leftInputSchema = leftInputSchema; this.mergedInputSchema = mergedInputSchema; - this.endOfRecordMark = POStatus.STATUS_EOP; - } - - // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez. - // This is because: - // For MR, we send EOP at the end of every record - // For Tez, we only use a global EOP, so send NULL for end of record - public void setEndOfRecordMark(byte endOfRecordMark) { - this.endOfRecordMark = endOfRecordMark; } /** @@ -379,7 +376,9 @@ public class POMergeJoin extends Physica } else{ // At this point right side can't be behind. int errCode = 1102; - String errMsg = "Data is not sorted on right side. Last two tuples encountered were: \n"+ + String errMsg = "Data is not sorted on right side. \n" + + keyOrderReminder + + "Last two tuples encountered were: \n"+ curJoiningRightTup+ "\n" + (Tuple)rightInp.result ; throw new ExecException(errMsg,errCode); } @@ -407,7 +406,9 @@ public class POMergeJoin extends Physica } else{ // Current key < Prev Key int errCode = 1102; - String errMsg = "Data is not sorted on left side. Last two keys encountered were: \n"+ + String errMsg = "Data is not sorted on left side. \n" + + keyOrderReminder + + "Last two tuples encountered were: \n" + prevLeftKey+ "\n" + curLeftKey ; throw new ExecException(errMsg,errCode); } @@ -477,7 +478,9 @@ public class POMergeJoin extends Physica if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){ // Sanity check. int errCode = 1102; - String errMsg = "Data is not sorted on right side. Last two keys encountered were: \n"+ + String errMsg = "Data is not sorted on right side. \n" + + keyOrderReminder + + "Last two tuples encountered were: \n"+ prevRightKey+ "\n" + rightKey ; throw new ExecException(errMsg,errCode); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Fri Mar 4 18:17:39 2016 @@ -17,22 +17,16 @@ */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; -import java.util.ArrayList; import java.util.List; -import java.util.LinkedList; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.DataType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.data.DataType; import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.pen.util.ExampleTuple; -import org.apache.pig.pen.util.LineageTracer; -import org.apache.pig.impl.util.IdentityHashSet; /** * A specialized version of POForeach with the difference @@ -45,10 +39,10 @@ import org.apache.pig.impl.util.Identity public class POOptimizedForEach extends POForEach { /** - * + * */ private static final long serialVersionUID = 1L; - + public POOptimizedForEach(OperatorKey k) { this(k,-1,null,null); } @@ -64,7 +58,7 @@ public class POOptimizedForEach extends public POOptimizedForEach(OperatorKey k, List inp) { this(k,-1,inp,null); } - + public POOptimizedForEach(OperatorKey k, int rp, List<PhysicalPlan> inp, List<Boolean> isToBeFlattened){ super(k, rp); setUpFlattens(isToBeFlattened); @@ -82,7 +76,7 @@ public class POOptimizedForEach extends String fString = getFlatStr(); return "Optimized For Each" + "(" + fString + ")" + "[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString(); } - + /** * Calls getNext on the generate operator inside the nested * physical plan and returns it maintaining an additional state @@ -120,40 +114,25 @@ public class POOptimizedForEach extends //nested plan processing on the input tuple //read while (true) { - - // we know that input has been attached + + // we know that input has been attached attachInputToPlans(input); detachInput(); res = processPlan(); - + processingPlan = true; - + return res; } } - + /** - * Make a deep copy of this operator. + * Make a deep copy of this operator. * @throws CloneNotSupportedException */ @Override public POOptimizedForEach clone() throws CloneNotSupportedException { - List<PhysicalPlan> plans = new - ArrayList<PhysicalPlan>(inputPlans.size()); - for (PhysicalPlan plan : inputPlans) { - plans.add(plan.clone()); - } - List<Boolean> flattens = null; - if(isToBeFlattenedArray != null ) { - flattens = new - ArrayList<Boolean>(isToBeFlattenedArray.length); - for (boolean b : isToBeFlattenedArray) { - flattens.add(b); - } - } - return new POOptimizedForEach(new OperatorKey(mKey.scope, - NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), - requestedParallelism, plans, flattens); + return (POOptimizedForEach) super.clone(); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Fri Mar 4 18:17:39 2016 @@ -30,13 +30,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; import org.apache.pig.data.AccumulativeBag; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.InternalCachedBag; import org.apache.pig.data.ReadOnceBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -74,9 +72,6 @@ public class POPackage extends PhysicalO //key, no value. protected int numInputs; - protected static final BagFactory mBagFactory = BagFactory.getInstance(); - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - private boolean lastBagReadOnly = true; protected Packager pkgr; @@ -87,58 +82,6 @@ public class POPackage extends PhysicalO private transient boolean useDefaultBag; private transient int accumulativeBatchSize; - //the pivot value - private int pivot = -1; - //the index of the first field involves in ROLLUP - protected int rollupFieldIndex = 0; - //the original index of the first field involves in ROLLUP in case it was moved to the end - //(if we have the combination of cube and rollup) - private int rollupOldFieldIndex = 0; - //the size of total fields that involve in CUBE clause - private int dimensionSize = 0; - //number of algebraic function that used after rollup - private int nAlgebraic = 0; - - public void setPivot(int pvt) { - this.pivot = pvt; - } - - public int getPivot() { - return this.pivot; - } - - public void setDimensionSize(int ds) { - this.dimensionSize = ds; - } - - public int getDimensionSize() { - return this.dimensionSize; - } - - public void setNumberAlgebraic(int na) { - this.nAlgebraic = na; - } - - public int getNumberAlgebraic() { - return this.nAlgebraic; - } - - public void setRollupOldFieldIndex(int rofi) { - this.rollupOldFieldIndex = rofi; - } - - public int getRollupOldFieldIndex() { - return this.rollupOldFieldIndex; - } - - public void setRollupFieldIndex(int rfi) { - this.rollupFieldIndex = rfi; - } - - public int getRollupFieldIndex() { - return this.rollupFieldIndex; - } - public POPackage(OperatorKey k) { this(k, -1, null); } @@ -292,8 +235,7 @@ public class POPackage extends PhysicalO // create bag to pull all tuples out of iterator for (int i = 0; i < numInputs; i++) { - dbs[i] = useDefaultBag ? BagFactory.getInstance() - .newDefaultBag() + dbs[i] = useDefaultBag ? mBagFactory.newDefaultBag() // In a very rare case if there is a POStream after this // POPackage in the pipeline and is also blocking the // pipeline; @@ -309,8 +251,15 @@ public class POPackage extends PhysicalO NullableTuple ntup = tupIter.next(); int index = ntup.getIndex(); if (index == numInputs - 1) { - dbs[index] = new PeekedBag(pkgr, ntup, tupIter, keyWritable); - break; + if (pkgr.getUseSecondaryKey()) { + if (dbs[index] == null) { + dbs[index] = useDefaultBag ? mBagFactory + .newDefaultBag() : new InternalCachedBag(numInputs); + } + } else { + dbs[index] = new PeekedBag(pkgr, ntup, tupIter, keyWritable); + break; + } } Tuple copy = pkgr.getValueTuple(keyWritable, ntup, index);
