Author: rohini Date: Sun Dec 31 01:30:30 2017 New Revision: 1819711 URL: http://svn.apache.org/viewvc?rev=1819711&view=rev Log: PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819711&r1=1819710&r2=1819711&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Sun Dec 31 01:30:30 2017 @@ -62,6 +62,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini) + PIG-3864: ToDate(userstring, format, timezone) computes DateTime with strange handling of Daylight Saving Time with location based timezones (daijy via rohini) PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1819711&r1=1819710&r2=1819711&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Sun Dec 31 01:30:30 2017 @@ -18,8 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; import java.util.List; -import java.util.Random; +import org.apache.commons.math3.random.RandomDataGenerator; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -29,17 +30,18 @@ import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.UDFContext; public class POReservoirSample extends PhysicalOperator { private static final long serialVersionUID = 1L; // number of samples to be sampled - protected int numSamples; + protected long numSamples; private transient int nextSampleIdx = 0; - private transient int rowProcessed = 0; + private transient long rowProcessed = 0; private transient boolean sampleCollectionDone = false; @@ -49,6 +51,8 @@ public class POReservoirSample extends P // last sample result private transient Result lastSample = null; + private transient RandomDataGenerator randGen; + public POReservoirSample(OperatorKey k) { this(k, -1, null); } @@ -65,7 +69,7 @@ public class POReservoirSample extends P super(k, rp, inp); } - public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, int numSamples) { + public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, long numSamples) { super(k, rp, inp); this.numSamples = numSamples; } @@ -88,7 +92,11 @@ public class POReservoirSample extends P } //else collect samples if (samples == null) { - samples = new Result[numSamples]; + samples = new Result[(int)numSamples]; + long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); + randGen = new RandomDataGenerator(); + randGen.reSeed(randomSeed); } // populate the samples array with first numSamples tuples @@ -96,7 +104,7 @@ public class POReservoirSample extends P while (rowProcessed < numSamples) { res = processInput(); if (res.returnStatus == POStatus.STATUS_OK) { - samples[rowProcessed] = res; + samples[(int)rowProcessed] = res; rowProcessed++; } else if (res.returnStatus == POStatus.STATUS_NULL) { continue; @@ -114,7 +122,6 @@ public class POReservoirSample extends P } if (res == null || res.returnStatus != POStatus.STATUS_EOP) { - Random randGen = new Random(); while (true) { // pick this as sample res = processInput(); @@ -125,9 +132,9 @@ public class POReservoirSample extends P } // collect samples until input is exhausted - int rand = randGen.nextInt(rowProcessed + 1); + long rand = randGen.nextLong(0, rowProcessed + 1); if (rand < numSamples) { - samples[rand] = res; + samples[(int)rand] = res; } rowProcessed++; } @@ -170,13 +177,15 @@ public class POReservoirSample extends P if (illustrator != null) { illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0); } - Result res = samples[nextSampleIdx++]; + Result res = samples[nextSampleIdx]; + samples[nextSampleIdx++] = null; //Free memory if (res == null) { // Input data has lesser rows than numSamples return RESULT_EMPTY; } return res; } else{ + samples = null; // Free memory return RESULT_EOP; } } @@ -212,7 +221,8 @@ public class POReservoirSample extends P } t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER); - t.set(sz + 1, (long)rowProcessed); + t.set(sz + 1, rowProcessed); return new Result(POStatus.STATUS_OK, t); } + }