Author: rohini Date: Mon Oct 17 15:17:17 2016 New Revision: 1765306 URL: http://svn.apache.org/viewvc?rev=1765306&view=rev Log: PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java pig/trunk/test/org/apache/pig/test/TestFindQuantiles.java pig/trunk/test/org/apache/pig/test/TestGFCross.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Oct 17 15:17:17 2016 @@ -48,6 +48,8 @@ OPTIMIZATIONS Â BUG FIXES +PIG-5040: Order by and CROSS partitioning is not deterministic due to usage of Random (rohini + PIG-5038: Pig Limit_2 e2e test failed with sort check (Konstantin_Harasov via rohini) PIG-5039: TestTypeCheckingValidatorNewLP.TestTypeCheckingValidatorNewLP is failing (nkollar via knoguchi) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Mon Oct 17 15:17:17 2016 @@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe Random rGen; float[] probVec; float epsilon = 0.0001f; - + private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class); - - public DiscreteProbabilitySampleGenerator(float[] probVec) { - rGen = new Random(); + + public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) { + rGen = new Random(seed); float sum = 0.0f; for (float f : probVec) { sum += f; } this.probVec = probVec; - if (1-epsilon > sum || sum > 1+epsilon) { + if (1-epsilon > sum || sum > 1+epsilon) { LOG.info("Sum of probabilities should be near one: " + sum); } } - + public int getNext(){ double toss = rGen.nextDouble(); // if the uniformly random number that I generated @@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe toss -= probVec[i]; if(toss<=0.0) return i; - } + } return lastIdx; } - + public static void main(String[] args) { float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f }; - DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec); + DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec); CountingMap<Integer> cm = new CountingMap<Integer>(); for(int i=0;i<100;i++){ cm.put(gen.getNext(), 1); @@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe public String toString() { return Arrays.toString(probVec); } - - + + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Mon Oct 17 15:17:17 2016 @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataBag; @@ -129,11 +130,13 @@ public class WeightedRangePartitioner ex DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); convertToArray(quantilesList); + long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple)ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple)ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } // else - the quantiles file is empty - unless we have a bug, the Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Mon Oct 17 15:17:17 2016 @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; import org.apache.pig.data.DataBag; @@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.UDFContext; import org.apache.tez.runtime.library.common.ConfigUtils; public class WeightedRangePartitionerTez extends WeightedRangePartitioner { @@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM); convertToArray(quantilesList); + long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple) ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple) ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } catch (Exception e) { throw new RuntimeException(e); Modified: pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/GFCross.java Mon Oct 17 15:17:17 2016 @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; @@ -42,7 +43,7 @@ public class GFCross extends EvalFunc<Da private BagFactory mBagFactory = BagFactory.getInstance(); private TupleFactory mTupleFactory = TupleFactory.getInstance(); private int parallelism = 0; - private Random r = new Random(); + private Random r; private String crossKey; static private final int DEFAULT_PARALLELISM = 96; @@ -69,6 +70,14 @@ public class GFCross extends EvalFunc<Da if (parallelism < 0) { throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism); } + long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode(); + long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); + r = new Random(seed); + } else { + // Don't see a case where cfg can be null. + // But there is an existing testcase TestGFCross.testDefault + // Using constant generated from task_14738102975522_0001_r_000000 hashcode + r = new Random(-4235927512599300514L); } numInputs = (Integer)input.get(0); Modified: pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original) +++ pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java Mon Oct 17 15:17:17 2016 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.TaskID; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.Mapper; @@ -35,6 +36,7 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase; @@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr * */ public class LocalMapReduceSimulator { - + private MapReduceLauncher launcher = new MapReduceLauncher(); - + private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();; @SuppressWarnings("unchecked") @@ -88,12 +90,12 @@ public class LocalMapReduceSimulator { PigContext pc) throws PigException, IOException, InterruptedException { phyToMRMap.clear(); MROperPlan mrp = launcher.compile(php, pc); - + ConfigurationValidator.validatePigProperties(pc.getProperties()); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); - + JobControlCompiler jcc = new JobControlCompiler(pc, conf); - + JobControl jc; int numMRJobsCompl = 0; DataBag input; @@ -106,6 +108,8 @@ public class LocalMapReduceSimulator { boolean needFileInput; final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>(); pc.getProperties().setProperty("pig.illustrating", "true"); + String jtIdentifier = "" + System.currentTimeMillis(); + int jobId = 0; while(mrp.size() != 0) { jc = jcc.compile(mrp, "Illustrator"); if(jc == null) { @@ -113,6 +117,7 @@ public class LocalMapReduceSimulator { } List<Job> jobs = jc.getWaitingJobs(); for (Job job : jobs) { + jobId++; jobConf = job.getJobConf(); FileLocalizer.setInitialized(false); ArrayList<ArrayList<OperatorKey>> inpTargets = @@ -123,14 +128,14 @@ public class LocalMapReduceSimulator { PigSplit split = null; List<POStore> stores = null; PhysicalOperator pack = null; - // revisit as there are new physical operators from MR compilation + // revisit as there are new physical operators from MR compilation if (!mro.mapPlan.isEmpty()) attacher.revisit(mro.mapPlan); if (!mro.reducePlan.isEmpty()) { attacher.revisit(mro.reducePlan); pack = mro.reducePlan.getRoots().get(0); } - + List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class); if (!mro.mapPlan.isEmpty()) { stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class); @@ -145,10 +150,10 @@ public class LocalMapReduceSimulator { for (POStore store : stores) { output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store)); } - + OutputAttacher oa = new OutputAttacher(mro.mapPlan, output); oa.visit(); - + if (!mro.reducePlan.isEmpty()) { oa = new OutputAttacher(mro.reducePlan, output); oa.visit(); @@ -168,6 +173,7 @@ public class LocalMapReduceSimulator { if (input != null) mro.mapPlan.remove(ld); } + int mapTaskId = 0; for (POLoad ld : lds) { // check newly generated data first input = output.get(ld.getLFile().getFileName()); @@ -180,7 +186,7 @@ public class LocalMapReduceSimulator { break; } } - } + } } needFileInput = (input == null); split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0); @@ -199,6 +205,7 @@ public class LocalMapReduceSimulator { context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split); } ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } else { if ("true".equals(jobConf.get("pig.usercomparator"))) @@ -210,10 +217,11 @@ public class LocalMapReduceSimulator { Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map) .getIllustratorContext(jobConf, input, intermediateData, split); ((PigMapBase) map).setMapPlan(mro.mapPlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString()); map.run(context); } } - + if (!mro.reducePlan.isEmpty()) { if (pack instanceof POPackage) @@ -233,19 +241,20 @@ public class LocalMapReduceSimulator { } ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan); + context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString()); reduce.run(context); } for (PhysicalOperator key : mro.phyToMRMap.keySet()) for (PhysicalOperator value : mro.phyToMRMap.get(key)) phyToMRMap.put(key, value); } - - + + int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>()); - + numMRJobsCompl += removedMROp; } - + jcc.reset(); } @@ -256,7 +265,7 @@ public class LocalMapReduceSimulator { plan)); this.outputBuffer = output; } - + @Override public void visitUserFunc(POUserFunc userFunc) throws VisitorException { if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) { Modified: pig/trunk/test/org/apache/pig/test/TestFindQuantiles.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFindQuantiles.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestFindQuantiles.java (original) +++ pig/trunk/test/org/apache/pig/test/TestFindQuantiles.java Mon Oct 17 15:17:17 2016 @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; @@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ import org.junit.Test; public class TestFindQuantiles { - + private static TupleFactory tFact = TupleFactory.getInstance(); private static final float epsilon = 0.0001f; - + @Test public void testFindQuantiles() throws Exception { final int numSamples = 97778; @@ -50,7 +49,7 @@ public class TestFindQuantiles { System.out.println("sum: " + sum); assertTrue(sum > (1-epsilon) && sum < (1+epsilon)); } - + @Test public void testFindQuantiles2() throws Exception { final int numSamples = 30000; @@ -86,7 +85,7 @@ public class TestFindQuantiles { } private float[] getProbVec(Tuple values) throws Exception { - float[] probVec = new float[values.size()]; + float[] probVec = new float[values.size()]; for(int i = 0; i < values.size(); i++) { probVec[i] = (Float)values.get(i); } @@ -95,7 +94,7 @@ public class TestFindQuantiles { private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception { Random rand = new Random(1000); - List<Tuple> samples = new ArrayList<Tuple>(); + List<Tuple> samples = new ArrayList<Tuple>(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, rand.nextInt(max)); @@ -106,7 +105,7 @@ public class TestFindQuantiles { } private DataBag generateUniqueSamples(int numSamples) throws Exception { - DataBag samples = BagFactory.getInstance().newDefaultBag(); + DataBag samples = BagFactory.getInstance().newDefaultBag(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, new Integer(23)); @@ -121,9 +120,9 @@ public class TestFindQuantiles { in.set(0, new Integer(numReduceres)); in.set(1, samples); - + FindQuantiles fq = new FindQuantiles(); - + Map<String, Object> res = fq.exec(in); return res; } @@ -135,12 +134,11 @@ public class TestFindQuantiles { InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS); Iterator<Object> it = weightedPartsData.values().iterator(); float[] probVec = getProbVec((Tuple)it.next()); - new DiscreteProbabilitySampleGenerator(probVec); float sum = 0.0f; for (float f : probVec) { sum += f; } return sum; } - + } Modified: pig/trunk/test/org/apache/pig/test/TestGFCross.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGFCross.java?rev=1765306&r1=1765305&r2=1765306&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestGFCross.java (original) +++ pig/trunk/test/org/apache/pig/test/TestGFCross.java Mon Oct 17 15:17:17 2016 @@ -20,6 +20,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -50,6 +51,7 @@ public class TestGFCross { public void testSerial() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1"); + cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); @@ -66,6 +68,7 @@ public class TestGFCross { public void testParallelSet() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10"); + cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2);