Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Fri Mar 4 18:17:39 2016 @@ -41,7 +41,6 @@ import org.apache.pig.data.DataType; import org.apache.pig.data.InternalCachedBag; import org.apache.pig.data.SelfSpillBag.MemoryLimits; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.GroupingSpillable; @@ -68,14 +67,23 @@ public class POPartialAgg extends Physic // entry in hash map and average seen reduction private static final int NUM_RECS_TO_SAMPLE = 10000; + // We want to allow bigger list sizes for group all. + // But still have a cap on it to avoid JVM finding it hard to allocate space + // TODO: How high can we go without performance degradation?? + private static final int MAX_LIST_SIZE = 25000; + // We want to avoid massive ArrayList copies as they get big. // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10, // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently // large value to trigger spilling/aggregation instead of paying for yet another data // copy. - private static final int MAX_LIST_SIZE = 9368; + // For group all cases, we will set this to a higher value + private int listSizeThreshold = 9367; - private static final int DEFAULT_MIN_REDUCTION = 10; + // Using default min reduction 7 instead of 10 as processedInputMap size + // will be 4096 (hashmap size is power of 2) for both 20000/10 and 20000/7. + // So using the lower number 7 as even 7x reduction is worth using map side aggregation + private static final int DEFAULT_MIN_REDUCTION = 7; // TODO: these are temporary. The real thing should be using memory usage estimation. private static final int FIRST_TIER_THRESHOLD = 20000; @@ -83,12 +91,11 @@ public class POPartialAgg extends Physic private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>(); - private static final TupleFactory TF = TupleFactory.getInstance(); - private PhysicalPlan keyPlan; private ExpressionOperator keyLeaf; private List<PhysicalPlan> valuePlans; private List<ExpressionOperator> valueLeaves; + private boolean isGroupAll; private transient int numRecsInRawMap; private transient int numRecsInProcessedMap; @@ -112,6 +119,7 @@ public class POPartialAgg extends Physic // rather than just spilling records to disk. private transient volatile boolean doSpill; private transient volatile boolean doContingentSpill; + private transient volatile boolean startedContingentSpill; private transient volatile Object spillLock; private transient int minOutputReduction; @@ -123,17 +131,19 @@ public class POPartialAgg extends Physic private transient int avgTupleSize; private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator; - public POPartialAgg(OperatorKey k) { + this(k, false); + } + + public POPartialAgg(OperatorKey k, boolean isGroupAll) { super(k); + this.isGroupAll = isGroupAll; } private void init() throws ExecException { ALL_POPARTS.put(this, null); numRecsInRawMap = 0; numRecsInProcessedMap = 0; - rawInputMap = Maps.newHashMap(); - processedInputMap = Maps.newHashMap(); minOutputReduction = DEFAULT_MIN_REDUCTION; numRecordsToSample = NUM_RECS_TO_SAMPLE; firstTierThreshold = FIRST_TIER_THRESHOLD; @@ -158,11 +168,24 @@ public class POPartialAgg extends Physic } if (percentUsage <= 0) { LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation."); - disableMapAgg(); + disableMapAgg = true; // Set them to true instead of adding another check for !disableMapAgg sizeReductionChecked = true; estimatedMemThresholds = true; } + // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0 + // newHashMapWithExpectedSize does new HashMap(expectedSize + expectedSize/3) + // to factor in default load factor of 0.75. + // For Hashmap, internally its size is always in power of 2. + // So for NUM_RECS_TO_SAMPLE=10000, hashmap size will be 16384 + // With secondTierThreshold of 2857 (minReduction 7), hashmap size will be 4096 + if (!disableMapAgg) { + rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE); + processedInputMap = Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD); + } + if (isGroupAll) { + listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE); + } initialized = true; SpillableMemoryManager.getInstance().registerSpillable(this); } @@ -196,6 +219,7 @@ public class POPartialAgg extends Physic estimateMemThresholds(); } if (doContingentSpill) { + startedContingentSpill = true; // Don't aggregate if spilling. Avoid concurrent update of spilling iterator. if (doSpill == false) { // SpillableMemoryManager requested a spill to reduce memory @@ -216,14 +240,17 @@ public class POPartialAgg extends Physic doSpill = false; doContingentSpill = false; } - if (result.returnStatus != POStatus.STATUS_EOP - || inputsExhausted) { + if (result.returnStatus != POStatus.STATUS_EOP) { + return result; + } else if (inputsExhausted) { + freeMemory(); return result; } } if (mapAggDisabled()) { // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps. // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on. + freeMemory(); return processInput(); } else { Result inp = processInput(); @@ -265,6 +292,15 @@ public class POPartialAgg extends Physic } } + private void freeMemory() throws ExecException { + if (rawInputMap != null && !rawInputMap.isEmpty()) { + throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty"); + } + // Free up the maps for garbage collection + rawInputMap = null; + processedInputMap = null; + } + private void estimateMemThresholds() { if (!mapAggDisabled()) { LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() @@ -294,6 +330,9 @@ public class POPartialAgg extends Physic secondTierThreshold += 1; firstTierThreshold -= 1; } + if (isGroupAll) { + listSizeThreshold = Math.min(firstTierThreshold, MAX_LIST_SIZE); + } } estimatedMemThresholds = true; } @@ -344,17 +383,26 @@ public class POPartialAgg extends Physic Object key, Tuple inpTuple) throws ExecException { List<Tuple> value = map.get(key); if (value == null) { - value = new ArrayList<Tuple>(); + if (isGroupAll) { + // Set exact array initial size to avoid array copies + // listSizeThreshold = numRecordsToSample before estimating memory + // thresholds and firstTierThreshold after memory estimation + int listSize = (map == rawInputMap) ? listSizeThreshold : Math.min(secondTierThreshold, MAX_LIST_SIZE); + value = new ArrayList<Tuple>(listSize); + } else { + value = new ArrayList<Tuple>(); + } map.put(key, value); } value.add(inpTuple); - if (value.size() >= MAX_LIST_SIZE) { + if (value.size() > listSizeThreshold) { boolean isFirst = (map == rawInputMap); if (LOG.isDebugEnabled()){ LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level.")); } if (isFirst) { - aggregateRawRow(key); + // Aggregate and remove just this key to keep size in check + aggregateRawRow(key, value); } else { aggregateSecondLevel(); } @@ -367,7 +415,7 @@ public class POPartialAgg extends Physic LOG.info("Starting spill."); if (aggregate) { - aggregateBothLevels(false, true); + aggregateBothLevels(false, false); } doSpill = true; spillingIterator = processedInputMap.entrySet().iterator(); @@ -389,8 +437,8 @@ public class POPartialAgg extends Physic } } - private void aggregateRawRow(Object key) throws ExecException { - List<Tuple> value = rawInputMap.get(key); + private void aggregateRawRow(Object key, List<Tuple> value) throws ExecException { + numRecsInRawMap -= value.size(); Tuple valueTuple = createValueTuple(key, value); Result res = getOutput(key, valueTuple); rawInputMap.remove(key); @@ -454,7 +502,7 @@ public class POPartialAgg extends Physic } private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException { - Tuple valueTuple = TF.newTuple(valuePlans.size() + 1); + Tuple valueTuple = mTupleFactory.newTuple(valuePlans.size() + 1); valueTuple.set(0, key); for (int i = 0; i < valuePlans.size(); i++) { @@ -534,7 +582,7 @@ public class POPartialAgg extends Physic * @throws ExecException */ private Result getOutput(Object key, Tuple value) throws ExecException { - Tuple output = TF.newTuple(valuePlans.size() + 1); + Tuple output = mTupleFactory.newTuple(valuePlans.size() + 1); output.set(0, key); for (int i = 0; i < valuePlans.size(); i++) { @@ -591,21 +639,49 @@ public class POPartialAgg extends Physic if (mapAggDisabled()) { return 0; } else { + if (doContingentSpill && !startedContingentSpill) { + LOG.info("Spill triggered by SpillableMemoryManager, but previous spill call is still not processed. Skipping"); + return 0; + } LOG.info("Spill triggered by SpillableMemoryManager"); - doContingentSpill = true; synchronized(spillLock) { - if (!sizeReductionChecked) { + if (rawInputMap != null) { + LOG.info("Memory usage: " + getMemorySize() + + ". Raw map: num keys = " + rawInputMap.size() + + ", num tuples = "+ numRecsInRawMap + + ", Processed map: num keys = " + processedInputMap.size() + + ", num tuples = "+ numRecsInProcessedMap ); + } + startedContingentSpill = false; + doContingentSpill = true; + if (!sizeReductionChecked || !estimatedMemThresholds) { numRecordsToSample = numRecsInRawMap; } try { + // Block till spilling is finished. If main thread execution has not come to POPartialAgg + // and is still processing lower pipeline for more than 5 seconds it means + // jvm is stuck doing GC and will soon fail with java.lang.OutOfMemoryError: GC overhead limit exceeded + // So exit out of here so that SpillableMemoryManger can at least spill + // other Spillable bags and free up some memory for user code to be able to run + // and reach POPartialAgg for the aggregation/spilling of the hashmaps to happen. + long startTime = System.currentTimeMillis(); while (doContingentSpill == true) { - Thread.sleep(50); //Keeping it on the lower side for now. Tune later + Thread.sleep(25); + if (!startedContingentSpill && (System.currentTimeMillis() - startTime) >= 5000) { + break; + } + } + if (doContingentSpill) { + LOG.info("Not blocking for spill and letting SpillableMemoryManager" + + " process other spillable objects as main thread has not reached here for 5 secs"); + } else { + LOG.info("Finished spill for SpillableMemoryManager call"); + return 1; } } catch (InterruptedException e) { LOG.warn("Interrupted exception while waiting for spill to finish", e); } - LOG.info("Finished spill for SpillableMemoryManager call"); - return 1; + return 0; } } } @@ -615,4 +691,14 @@ public class POPartialAgg extends Physic return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap); } + @Override + public PhysicalOperator clone() throws CloneNotSupportedException { + POPartialAgg clone = (POPartialAgg) super.clone(); + clone.setKeyPlan(keyPlan.clone()); + clone.setValuePlans(clonePlans(valuePlans)); + return clone; + } + + + }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Fri Mar 4 18:17:39 2016 @@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; @@ -39,8 +38,6 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.Utils; -import com.google.common.collect.Maps; - /** * The partition rearrange operator is a part of the skewed join * implementation. It has an embedded physical plan that @@ -50,12 +47,11 @@ import com.google.common.collect.Maps; public class POPartitionRearrange extends POLocalRearrange { private static final long serialVersionUID = 1L; - private static final BagFactory mBagFactory = BagFactory.getInstance(); - private Integer totalReducers = -1; + private transient Integer totalReducers; // ReducerMap will store the tuple, max reducer index & min reducer index - private Map<Object, Pair<Integer, Integer> > reducerMap = Maps.newHashMap(); - private boolean inited; + private transient Map<Object, Pair<Integer, Integer> > reducerMap; + private transient boolean inited; private PigContext pigContext; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Mar 4 18:17:39 2016 @@ -23,7 +23,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -32,51 +31,48 @@ public class POPoissonSample extends Phy private static final long serialVersionUID = 1L; - private static final TupleFactory tf = TupleFactory.getInstance(); - private static Result eop = new Result(POStatus.STATUS_EOP, null); + // 17 is not a magic number. It can be obtained by using a poisson + // cumulative distribution function with the mean set to 10 (empirically, + // minimum number of samples) and the confidence set to 95% + public static final int DEFAULT_SAMPLE_RATE = 17; + + private int sampleRate = 0; + + private float heapPerc = 0f; + + private Long totalMemory; + + private transient boolean initialized; // num of rows sampled so far - private int numRowsSampled = 0; + private transient int numRowsSampled; // average size of tuple in memory, for tuples sampled - private long avgTupleMemSz = 0; + private transient long avgTupleMemSz; // current row number - private long rowNum = 0; + private transient long rowNum; // number of tuples to skip after each sample - private long skipInterval = -1; + private transient long skipInterval; // bytes in input to skip after every sample. // divide this by avgTupleMemSize to get skipInterval - private long memToSkipPerSample = 0; + private transient long memToSkipPerSample; // has the special row with row number information been returned - private boolean numRowSplTupleReturned = false; - - // 17 is not a magic number. It can be obtained by using a poisson - // cumulative distribution function with the mean set to 10 (empirically, - // minimum number of samples) and the confidence set to 95% - public static final int DEFAULT_SAMPLE_RATE = 17; - - private int sampleRate = 0; - - private float heapPerc = 0f; + private transient boolean numRowSplTupleReturned; // new Sample result - private Result newSample = null; + private transient Result newSample; - public POPoissonSample(OperatorKey k, int rp, int sr, float hp) { + public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) { super(k, rp, null); - numRowsSampled = 0; - avgTupleMemSz = 0; - rowNum = 0; - skipInterval = -1; - memToSkipPerSample = 0; - numRowSplTupleReturned = false; - newSample = null; sampleRate = sr; heapPerc = hp; + if (tm != -1) { + totalMemory = tm; + } } @Override @@ -92,10 +88,22 @@ public class POPoissonSample extends Phy @Override public Result getNextTuple() throws ExecException { + if (!initialized) { + numRowsSampled = 0; + avgTupleMemSz = 0; + rowNum = 0; + skipInterval = -1; + memToSkipPerSample = 0; + if (totalMemory == null) { + // Initialize in backend to get memory of task + totalMemory = Runtime.getRuntime().maxMemory(); + } + initialized = true; + } if (numRowSplTupleReturned) { // row num special row has been returned after all inputs // were read, nothing more to read - return eop; + return RESULT_EOP; } Result res = null; @@ -107,11 +115,7 @@ public class POPoissonSample extends Phy if (res.returnStatus == POStatus.STATUS_NULL) { continue; } else if (res.returnStatus == POStatus.STATUS_EOP) { - if (this.parentPlan.endOfAllInput) { - return eop; - } else { - continue; - } + return res; } else if (res.returnStatus == POStatus.STATUS_ERR) { return res; } @@ -119,7 +123,7 @@ public class POPoissonSample extends Phy if (res.result == null) { continue; } - long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc); + long availRedMem = (long) (totalMemory * heapPerc); memToSkipPerSample = availRedMem/sampleRate; updateSkipInterval((Tuple)res.result); @@ -215,7 +219,7 @@ public class POPoissonSample extends Phy */ private Result createNumRowTuple(Tuple sample) throws ExecException { int sz = (sample == null) ? 0 : sample.size(); - Tuple t = tf.newTuple(sz + 2); + Tuple t = mTupleFactory.newTuple(sz + 2); if (sample != null) { for (int i=0; i<sample.size(); i++){ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Fri Mar 4 18:17:39 2016 @@ -29,12 +29,10 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.SingleTupleBag; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -53,9 +51,6 @@ public class POPreCombinerLocalRearrange protected static final long serialVersionUID = 1L; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - protected static BagFactory mBagFactory = BagFactory.getInstance(); - private static final Result ERR_RESULT = new Result(); protected List<PhysicalPlan> plans; @@ -223,4 +218,12 @@ public class POPreCombinerLocalRearrange public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; } + + @Override + public POPreCombinerLocalRearrange clone() throws CloneNotSupportedException { + POPreCombinerLocalRearrange clone = (POPreCombinerLocalRearrange) super.clone(); + clone.leafOps = new ArrayList<ExpressionOperator>(); + clone.setPlans(clonePlans(plans)); + return clone; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Fri Mar 4 18:17:39 2016 @@ -32,7 +32,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.pen.util.ExampleTuple; @@ -55,8 +54,6 @@ public class PORank extends PhysicalOper private List<Boolean> mAscCols; private List<Byte> ExprOutputTypes; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - /** * Unique identifier that links POCounter and PORank, * through the global counter labeled with it. @@ -230,4 +227,11 @@ public class PORank extends PhysicalOper public String getOperationID() { return operationID; } + + @Override + public PORank clone() throws CloneNotSupportedException { + PORank clone = (PORank)super.clone(); + // rankPlans, mAscCols, ExprOutputTypes are unused. Not cloning them + return clone; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Mar 4 18:17:39 2016 @@ -26,31 +26,28 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; public class POReservoirSample extends PhysicalOperator { - private static final TupleFactory tf = TupleFactory.getInstance(); - private static final long serialVersionUID = 1L; // number of samples to be sampled protected int numSamples; - private transient int nextSampleIdx= 0; + private transient int nextSampleIdx = 0; - private int rowProcessed = 0; + private transient int rowProcessed = 0; - private boolean sampleCollectionDone = false; + private transient boolean sampleCollectionDone = false; //array to store the result private transient Result[] samples = null; // last sample result - private Result lastSample = null; + private transient Result lastSample = null; public POReservoirSample(OperatorKey k) { this(k, -1, null); @@ -103,12 +100,20 @@ public class POReservoirSample extends P rowProcessed++; } else if (res.returnStatus == POStatus.STATUS_NULL) { continue; + } else if (res.returnStatus == POStatus.STATUS_EOP) { + if (this.parentPlan.endOfAllInput) { + break; + } else { + // In case of Split can get EOP in between. + // Return here instead of setting lastSample to EOP in getSample + return res; + } } else { break; } } - if (res.returnStatus != POStatus.STATUS_EOP) { + if (res == null || res.returnStatus != POStatus.STATUS_EOP) { Random randGen = new Random(); while (true) { // pick this as sample @@ -142,7 +147,7 @@ public class POReservoirSample extends P if (lastSample.returnStatus==POStatus.STATUS_EOP) { return lastSample; } - + Result currentSample = retrieveSample(); // If this is the last sample, tag with number of rows if (currentSample.returnStatus == POStatus.STATUS_EOP) { @@ -156,20 +161,18 @@ public class POReservoirSample extends P } private Result retrieveSample() throws ExecException { - if(nextSampleIdx < samples.length){ + if(nextSampleIdx < Math.min(rowProcessed, samples.length)){ if (illustrator != null) { illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0); } Result res = samples[nextSampleIdx++]; if (res == null) { // Input data has lesser rows than numSamples - return new Result(POStatus.STATUS_NULL, null); + return RESULT_EMPTY; } return res; } else{ - Result res; - res = new Result(POStatus.STATUS_EOP, null); - return res; + return RESULT_EOP; } } @@ -195,7 +198,7 @@ public class POReservoirSample extends P */ private Result createNumRowTuple(Tuple sample) throws ExecException { int sz = (sample == null) ? 0 : sample.size(); - Tuple t = tf.newTuple(sz + 2); + Tuple t = mTupleFactory.newTuple(sz + 2); if (sample != null) { for (int i=0; i<sample.size(); i++){ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Mar 4 18:17:39 2016 @@ -36,12 +36,10 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.InternalSortedBag; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -74,11 +72,11 @@ public class POSort extends PhysicalOper private POUserComparisonFunc mSortFunc; private Comparator<Tuple> mComparator; - private boolean inputsAccumulated = false; private long limit; public boolean isUDFComparatorUsed = false; - private DataBag sortedBag; + private transient boolean inputsAccumulated = false; + private transient DataBag sortedBag; private transient Iterator<Tuple> it; private transient boolean initialized; private transient boolean useDefaultBag; @@ -95,22 +93,22 @@ public class POSort extends PhysicalOper this.sortPlans = sortPlans; this.mAscCols = mAscCols; this.limit = -1; - this.mSortFunc = mSortFunc; - if (mSortFunc == null) { + setSortFunc(mSortFunc); + } + + private void setSortFunc(POUserComparisonFunc mSortFunc) { + this.mSortFunc = mSortFunc; + if (mSortFunc == null) { mComparator = new SortComparator(); - /*sortedBag = BagFactory.getInstance().newSortedBag( - new SortComparator());*/ - ExprOutputTypes = new ArrayList<Byte>(sortPlans.size()); + ExprOutputTypes = new ArrayList<Byte>(sortPlans.size()); - for(PhysicalPlan plan : sortPlans) { - ExprOutputTypes.add(plan.getLeaves().get(0).getResultType()); - } - } else { - /*sortedBag = BagFactory.getInstance().newSortedBag( - new UDFSortComparator());*/ + for(PhysicalPlan plan : sortPlans) { + ExprOutputTypes.add(plan.getLeaves().get(0).getResultType()); + } + } else { mComparator = new UDFSortComparator(); - isUDFComparatorUsed = true; - } + isUDFComparatorUsed = true; + } } public POSort(OperatorKey k, int rp, List inp) { @@ -256,10 +254,10 @@ public class POSort extends PhysicalOper @Override public Result getNextTuple() throws ExecException { - Result res = new Result(); + Result inp; if (!inputsAccumulated) { - res = processInput(); + inp = processInput(); if (!initialized) { initialized = true; if (PigMapReduce.sJobConfInternal.get() != null) { @@ -271,26 +269,28 @@ public class POSort extends PhysicalOper } // by default, we create InternalSortedBag, unless user configures // explicitly to use old bag - sortedBag = useDefaultBag ? BagFactory.getInstance().newSortedBag(mComparator) + sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator) : new InternalSortedBag(3, mComparator); - while (res.returnStatus != POStatus.STATUS_EOP) { - if (res.returnStatus == POStatus.STATUS_ERR) { + while (inp.returnStatus != POStatus.STATUS_EOP) { + if (inp.returnStatus == POStatus.STATUS_ERR) { log.error("Error in reading from the inputs"); - return res; - } else if (res.returnStatus == POStatus.STATUS_NULL) { + return inp; + } else if (inp.returnStatus == POStatus.STATUS_NULL) { // Ignore and read the next tuple. - res = processInput(); + inp = processInput(); continue; } - sortedBag.add((Tuple) res.result); - res = processInput(); + sortedBag.add((Tuple) inp.result); + inp = processInput(); } inputsAccumulated = true; } - if (it == null) { + + Result res = new Result(); + if (it == null) { it = sortedBag.iterator(); } if (it.hasNext()) { @@ -301,7 +301,7 @@ public class POSort extends PhysicalOper res.returnStatus = POStatus.STATUS_EOP; reset(); } - return res; + return res; } @Override @@ -367,23 +367,19 @@ public class POSort extends PhysicalOper @Override public POSort clone() throws CloneNotSupportedException { - List<PhysicalPlan> clonePlans = new - ArrayList<PhysicalPlan>(sortPlans.size()); - for (PhysicalPlan plan : sortPlans) { - clonePlans.add(plan.clone()); + POSort clone = (POSort) super.clone(); + clone.sortPlans = clonePlans(sortPlans); + if (mSortFunc == null) { + setSortFunc(null); + } else { + setSortFunc(mSortFunc.clone()); } List<Boolean> cloneAsc = new ArrayList<Boolean>(mAscCols.size()); for (Boolean b : mAscCols) { cloneAsc.add(b); } - POUserComparisonFunc cloneFunc = null; - if (mSortFunc != null) { - cloneFunc = mSortFunc.clone(); - } - // Don't set inputs as PhysicalPlan.clone will take care of that - return new POSort(new OperatorKey(mKey.scope, - NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)), - requestedParallelism, null, clonePlans, cloneAsc, cloneFunc); + clone.mAscCols = cloneAsc; + return clone; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Mar 4 18:17:39 2016 @@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex import java.util.ArrayList; import java.util.BitSet; -import java.util.LinkedList; import java.util.List; import org.apache.pig.backend.executionengine.ExecException; @@ -32,9 +31,6 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.util.IdentityHashSet; -import org.apache.pig.pen.util.ExampleTuple; -import org.apache.pig.pen.util.LineageTracer; /** * The MapReduce Split operator. @@ -49,7 +45,7 @@ import org.apache.pig.pen.util.LineageTr * as outputs of this operator using the conditions * specified in the LOSplit. So LOSplit will be converted * into: - * + * * | | | * Filter1 Filter2 ... Filter3 * | | ... | @@ -63,13 +59,13 @@ import org.apache.pig.pen.util.LineageTr * approach if not better in many cases because * of the availability of attachinInputs. An optimization * that can ensue is if there are multiple loads that - * load the same file, they can be merged into one and - * then the operators that take input from the load + * load the same file, they can be merged into one and + * then the operators that take input from the load * can be stored. This can be used when * the mapPlan executes to read the file only once and - * attach the resulting tuple as inputs to all the + * attach the resulting tuple as inputs to all the * operators that take input from this load. - * + * * In some cases where the conditions are exclusive and * some outputs are ignored, this approach can be worse. * But this leads to easier management of the Split and @@ -79,24 +75,22 @@ import org.apache.pig.pen.util.LineageTr public class POSplit extends PhysicalOperator { private static final long serialVersionUID = 1L; - + /* * The filespec that is used to store and load the output of the split job * which is the job containing the split */ private FileSpec splitStore; - + /* * The list of sub-plans the inner plan is composed of */ private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>(); - + private BitSet processedSet = new BitSet(); - - private static Result empty = new Result(POStatus.STATUS_NULL, null); - - private boolean inpEOP = false; - + + private transient boolean inpEOP = false; + /** * Constructs an operator with the specified key * @param k the operator key @@ -107,7 +101,7 @@ public class POSplit extends PhysicalOpe /** * Constructs an operator with the specified key - * and degree of parallelism + * and degree of parallelism * @param k the operator key * @param rp the degree of parallelism requested */ @@ -116,7 +110,7 @@ public class POSplit extends PhysicalOpe } /** - * Constructs an operator with the specified key and inputs + * Constructs an operator with the specified key and inputs * @param k the operator key * @param inp the inputs that this operator will read data from */ @@ -128,7 +122,7 @@ public class POSplit extends PhysicalOpe * Constructs an operator with the specified key, * degree of parallelism and inputs * @param k the operator key - * @param rp the degree of parallelism requested + * @param rp the degree of parallelism requested * @param inp the inputs that this operator will read data from */ public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) { @@ -172,20 +166,20 @@ public class POSplit extends PhysicalOpe } /** - * Returns the list of nested plans. + * Returns the list of nested plans. * @return the list of the nested plans * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter */ public List<PhysicalPlan> getPlans() { return myPlans; } - + /** - * Appends the specified plan to the end of + * Appends the specified plan to the end of * the nested input plan list * @param inPlan plan to be appended to the list */ - public void addPlan(PhysicalPlan inPlan) { + public void addPlan(PhysicalPlan inPlan) { myPlans.add(inPlan); processedSet.set(myPlans.size()-1); } @@ -199,18 +193,18 @@ public class POSplit extends PhysicalOpe myPlans.remove(plan); processedSet.clear(myPlans.size()); } - + @Override public Result getNextTuple() throws ExecException { if (this.parentPlan.endOfAllInput) { - - return getStreamCloseResult(); - - } - + + return getStreamCloseResult(); + + } + if (processedSet.cardinality() == myPlans.size()) { - + Result inp = processInput(); if (inp.returnStatus == POStatus.STATUS_EOP && this.parentPlan.endOfAllInput) { @@ -221,44 +215,44 @@ public class POSplit extends PhysicalOpe || inp.returnStatus == POStatus.STATUS_ERR ) { return inp; } - + Tuple tuple = (Tuple)inp.result; for (PhysicalPlan pl : myPlans) { pl.attachInput(tuple); } - + processedSet.clear(); } - - return processPlan(); + + return processPlan(); } private Result processPlan() throws ExecException { - + int idx = processedSet.nextClearBit(0); PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0); - + Result res = runPipeline(leaf); - + if (res.returnStatus == POStatus.STATUS_EOP) { - processedSet.set(idx++); + processedSet.set(idx++); if (idx < myPlans.size()) { res = processPlan(); } } - - return (res.returnStatus == POStatus.STATUS_OK) ? res : empty; + + return (res.returnStatus == POStatus.STATUS_OK) ? res : RESULT_EMPTY; } - + private Result runPipeline(PhysicalOperator leaf) throws ExecException { - + Result res = null; - + while (true) { - + res = leaf.getNextTuple(); - - if (res.returnStatus == POStatus.STATUS_OK) { + + if (res.returnStatus == POStatus.STATUS_OK) { break; } else if (res.returnStatus == POStatus.STATUS_NULL) { continue; @@ -267,19 +261,19 @@ public class POSplit extends PhysicalOpe } else if (res.returnStatus == POStatus.STATUS_ERR) { break; } - } - + } + return res; } - + private Result getStreamCloseResult() throws ExecException { Result res = null; - + while (true) { - + if (processedSet.cardinality() == myPlans.size()) { Result inp = processInput(); - if (inp.returnStatus == POStatus.STATUS_OK) { + if (inp.returnStatus == POStatus.STATUS_OK) { Tuple tuple = (Tuple)inp.result; for (PhysicalPlan pl : myPlans) { pl.attachInput(tuple); @@ -293,40 +287,48 @@ public class POSplit extends PhysicalOpe return inp; } processedSet.clear(); - } - + } + int idx = processedSet.nextClearBit(0); if (inpEOP ) { myPlans.get(idx).endOfAllInput = true; } PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0); - + res = leaf.getNextTuple(); - + if (res.returnStatus == POStatus.STATUS_EOP) { - processedSet.set(idx++); + processedSet.set(idx++); if (idx < myPlans.size()) { continue; } } else { break; } - - if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) { + + if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) { continue; } else { break; } } - + return res; - + } - + + @Override + public POSplit clone() throws CloneNotSupportedException { + POSplit opClone = (POSplit) super.clone(); + opClone.processedSet = new BitSet(); + opClone.myPlans = clonePlans(myPlans); + return opClone; + } + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { - // no op + // no op return null; } - + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri Mar 4 18:17:39 2016 @@ -50,8 +50,8 @@ import org.apache.pig.tools.pigstats.Pig public class POStore extends PhysicalOperator { private static final long serialVersionUID = 1L; - protected static Result empty = new Result(POStatus.STATUS_NULL, null); transient private StoreFuncInterface storer; + transient private StoreFuncDecorator sDecorator; transient private POStoreImpl impl; transient private String counterName = null; private FileSpec sFile; @@ -161,10 +161,10 @@ public class POStore extends PhysicalOpe switch (res.returnStatus) { case POStatus.STATUS_OK: if (illustrator == null) { - storer.putNext((Tuple)res.result); + sDecorator.putNext((Tuple) res.result); } else illustratorMarkup(res.result, res.result, 0); - res = empty; + res = RESULT_EMPTY; if (counterName != null) { ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 1); @@ -250,10 +250,24 @@ public class POStore extends PhysicalOpe if(storer == null){ storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec()); storer.setStoreFuncUDFContextSignature(signature); + // Init the Decorator we use for writing Tuples + setStoreFuncDecorator(new StoreFuncDecorator(storer, signature)); } return storer; } + void setStoreFuncDecorator(StoreFuncDecorator sDecorator) { + this.sDecorator = sDecorator; + } + + /** + * + * @return The {@link StoreFuncDecorator} used to write Tuples + */ + public StoreFuncDecorator getStoreFuncDecorator() { + return sDecorator; + } + /** * @param sortInfo the sortInfo to set */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Fri Mar 4 18:17:39 2016 @@ -26,39 +26,38 @@ import java.util.concurrent.ArrayBlockin import java.util.concurrent.BlockingQueue; import org.apache.pig.PigException; -import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.plan.OperatorKey; -import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.impl.streaming.ExecutableManager; -import org.apache.pig.impl.streaming.StreamingCommand; -import org.apache.pig.pen.util.ExampleTuple; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.streaming.ExecutableManager; +import org.apache.pig.impl.streaming.StreamingCommand; +import org.apache.pig.pen.util.ExampleTuple; public class POStream extends PhysicalOperator { - private static final long serialVersionUID = 2L; - - protected static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null); - protected String executableManagerStr; // String representing ExecutableManager to use - transient private ExecutableManager executableManager; // ExecutableManager to use - protected StreamingCommand command; // Actual command to be run + private String executableManagerStr; // String representing ExecutableManager to use + private StreamingCommand command; // Actual command to be run private Properties properties; - private boolean initialized = false; - protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1); protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1); - protected boolean allInputFromPredecessorConsumed = false; + private transient ExecutableManager executableManager; // ExecutableManager to use + + private transient boolean initialized = false; + + protected transient boolean allInputFromPredecessorConsumed = false; - protected boolean allOutputFromBinaryProcessed = false; + protected transient boolean allOutputFromBinaryProcessed = false; /** * This flag indicates whether streaming is done through fetching. If set, @@ -68,7 +67,7 @@ public class POStream extends PhysicalOp */ private boolean isFetchable; - public POStream(OperatorKey k, ExecutableManager executableManager, + public POStream(OperatorKey k, ExecutableManager executableManager, StreamingCommand command, Properties properties) { super(k); this.executableManagerStr = executableManager.getClass().getName(); @@ -77,21 +76,21 @@ public class POStream extends PhysicalOp // Setup streaming-specific properties if (command.getShipFiles()) { - parseShipCacheSpecs(command.getShipSpecs(), + parseShipCacheSpecs(command.getShipSpecs(), properties, "pig.streaming.ship.files"); } - parseShipCacheSpecs(command.getCacheSpecs(), + parseShipCacheSpecs(command.getCacheSpecs(), properties, "pig.streaming.cache.files"); } - - private static void parseShipCacheSpecs(List<String> specs, + + private static void parseShipCacheSpecs(List<String> specs, Properties properties, String property) { - + String existingValue = properties.getProperty(property, ""); if (specs == null || specs.size() == 0) { return; } - + // Setup streaming-specific properties StringBuffer sb = new StringBuffer(); Iterator<String> i = specs.iterator(); @@ -108,13 +107,13 @@ public class POStream extends PhysicalOp sb.append(", "); } } - properties.setProperty(property, sb.toString()); + properties.setProperty(property, sb.toString()); } public Properties getShipCacheProperties() { return properties; } - + /** * Get the {@link StreamingCommand} for this <code>StreamSpec</code>. * @return the {@link StreamingCommand} for this <code>StreamSpec</code> @@ -122,17 +121,13 @@ public class POStream extends PhysicalOp public StreamingCommand getCommand() { return command; } - - - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple) - */ + @Override public Result getNextTuple() throws ExecException { // The POStream Operator works with ExecutableManager to // send input to the streaming binary and to get output // from it. To achieve a tuple oriented behavior, two queues - // are used - one for output from the binary and one for + // are used - one for output from the binary and one for // input to the binary. In each getNext() call: // 1) If there is no more output expected from the binary, an EOP is // sent to successor @@ -142,14 +137,14 @@ public class POStream extends PhysicalOp // send input to the binary, then the next tuple from the // predecessor is got and passed to the binary try { - // if we are being called AFTER all output from the streaming + // if we are being called AFTER all output from the streaming // binary has already been sent to us then just return EOP // The "allOutputFromBinaryProcessed" flag is set when we see // an EOS (End of Stream output) from streaming binary if(allOutputFromBinaryProcessed) { - return new Result(POStatus.STATUS_EOP, null); + return RESULT_EOP; } - + // if we are here AFTER all map() calls have been completed // AND AFTER we process all possible input to be sent to the // streaming binary, then all we want to do is read output from @@ -160,19 +155,16 @@ public class POStream extends PhysicalOp // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also since we are being called - // after all input from predecessor has been processed - // it means we got here from a call from close() in - // map or reduce. So once we send this EOP down, - // getNext() in POStream should never be called. So - // we don't need to set any flag noting we saw all output - // from binary - r = EOP_RESULT; - } else if (r.returnStatus == POStatus.STATUS_OK) + // the pipeline and also note this condition + // for future calls + r = RESULT_EOP; + allOutputFromBinaryProcessed = true; + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return(r); } - + // if we are here, we haven't consumed all input to be sent // to the streaming binary - check if we are being called // from close() on the map or reduce @@ -185,7 +177,7 @@ public class POStream extends PhysicalOp // then "initialized" will be true. If not, just // send EOP down. if(getInitialized()) { - // signal End of ALL input to the Executable Manager's + // signal End of ALL input to the Executable Manager's // Input handler thread binaryInputQueue.put(r); // note this state for future calls @@ -196,30 +188,24 @@ public class POStream extends PhysicalOp // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also since we are being called - // after all input from predecessor has been processed - // it means we got here from a call from close() in - // map or reduce. So once we send this EOP down, - // getNext() in POStream should never be called. So - // we don't need to set any flag noting we saw all output - // from binary - r = EOP_RESULT; + // the pipeline and also note this condition + // for future calls + r = RESULT_EOP; + allOutputFromBinaryProcessed = true; } } - + } else if(r.returnStatus == POStatus.STATUS_EOS) { // If we received EOS, it means all output // from the streaming binary has been sent to us // So we can send an EOP to the successor in - // the pipeline. Also we are being called - // from close() in map or reduce (this is so because - // only then this.parentPlan.endOfAllInput is true). - // So once we send this EOP down, getNext() in POStream - // should never be called. So we don't need to set any - // flag noting we saw all output from binary - r = EOP_RESULT; - } else if (r.returnStatus == POStatus.STATUS_OK) + // the pipeline and also note this condition + // for future calls + r = RESULT_EOP; + allOutputFromBinaryProcessed = true; + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return r; } else { // we are not being called from close() - so @@ -231,20 +217,21 @@ public class POStream extends PhysicalOp // So we can send an EOP to the successor in // the pipeline and also note this condition // for future calls - r = EOP_RESULT; + r = RESULT_EOP; allOutputFromBinaryProcessed = true; - } else if (r.returnStatus == POStatus.STATUS_OK) + } else if (r.returnStatus == POStatus.STATUS_OK) { illustratorMarkup(r.result, r.result, 0); + } return r; } - + } catch(Exception e) { int errCode = 2083; String msg = "Error while trying to get next result in POStream."; throw new ExecException(msg, errCode, PigException.BUG, e); } - - + + } public synchronized boolean getInitialized() { @@ -265,13 +252,13 @@ public class POStream extends PhysicalOp Result res = binaryOutputQueue.take(); return res; } - - // check if we can write tuples to + + // check if we can write tuples to // input of the process if(binaryInputQueue.remainingCapacity() > 0) { - + Result input = processInput(); - if(input.returnStatus == POStatus.STATUS_EOP || + if(input.returnStatus == POStatus.STATUS_EOP || input.returnStatus == POStatus.STATUS_ERR) { return input; } else { @@ -279,16 +266,16 @@ public class POStream extends PhysicalOp // Only when we see the first tuple which can // be sent as input to the binary we want // to initialize the ExecutableManager and set - // up the streaming binary - this is required in + // up the streaming binary - this is required in // Unions due to a JOIN where there may never be // any input to send to the binary in one of the map // tasks - so we initialize only if we have to. // initialize the ExecutableManager once if(!initialized) { // set up the executableManager - executableManager = + executableManager = (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr); - + try { executableManager.configure(this); executableManager.run(); @@ -296,22 +283,23 @@ public class POStream extends PhysicalOp int errCode = 2084; String msg = "Error while running streaming binary."; throw new ExecException(msg, errCode, PigException.BUG, ioe); - } + } initialized = true; } - + // send this input to the streaming // process binaryInputQueue.put(input); } - + } else { - + // wait for either input to be available // or output to be consumed - while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty()) + while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty()) { wait(); - + } + } } } @@ -321,21 +309,22 @@ public class POStream extends PhysicalOp throw new ExecException(msg, errCode, PigException.BUG, e); } } - + + @Override public String toString() { return getAliasString() + "POStream" + "[" + command.toString() + "]" + " - " + mKey.toString(); } - + @Override public void visit(PhyPlanVisitor v) throws VisitorException { v.visitStream(this); - + } @Override public String name() { - return toString(); + return toString(); } @Override @@ -349,7 +338,7 @@ public class POStream extends PhysicalOp } /** - * + * */ public void finish() throws IOException { executableManager.close(); @@ -368,7 +357,7 @@ public class POStream extends PhysicalOp public BlockingQueue<Result> getBinaryOutputQueue() { return binaryOutputQueue; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { if(illustrator != null) { @@ -393,11 +382,13 @@ public class POStream extends PhysicalOp this.isFetchable = isFetchable; } - public POStream(POStream copy){ - super(copy); + @Override + public PhysicalOperator clone() throws CloneNotSupportedException { + POStream clone = (POStream)super.clone(); + clone.binaryOutputQueue = new ArrayBlockingQueue<Result>(1); + clone.binaryInputQueue = new ArrayBlockingQueue<Result>(1); + //Not cloning StreamingCommand as it is read only + return clone; } - public String getExecutableManagerStr() { - return executableManagerStr; - } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Mar 4 18:17:39 2016 @@ -353,6 +353,9 @@ public class Packager implements Illustr public void setUseSecondaryKey(boolean useSecondaryKey) { this.useSecondaryKey = useSecondaryKey; } + public boolean getUseSecondaryKey() { + return useSecondaryKey; + } public void setPackageType(PackageType type) { this.pkgType = type; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Mar 4 18:17:39 2016 @@ -72,7 +72,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; @@ -237,12 +236,6 @@ public class PlanHelper { } @Override - public void visitPORollupHIIForEach(PORollupHIIForEach hfe) throws VisitorException { - super.visitPORollupHIIForEach(hfe); - visit(hfe); - } - - @Override public void visitUnion(POUnion un) throws VisitorException { super.visitUnion(un); visit(un);
