Author: knoguchi
Date: Thu Mar 3 20:56:58 2016
New Revision: 1733522
URL: http://svn.apache.org/viewvc?rev=1733522&view=rev
Log:
Fixing bug from PIG-4819: RANDOM() udf can lead to missing or redundant records
(knoguchi)
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
pig/trunk/src/org/apache/pig/builtin/RANDOM.java
pig/trunk/test/org/apache/pig/test/TestBuiltin.java
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
Thu Mar 3 20:56:58 2016
@@ -18,32 +18,5 @@
package org.apache.pig.piggybank.evaluation.math;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConstants;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-
-public class RANDOM extends EvalFunc<Double>{
- private Random r = null;
-
- @Override
- public Double exec(Tuple input) throws IOException {
- if( r == null ) {
- int jobidhash =
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
- int taskIndex =
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
- r = new Random(((long) jobidhash) << 32 | (taskIndex &
0xffffffffL));
- }
- return r.nextDouble();
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- return new Schema(new
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(),
input), DataType.DOUBLE));
- }
+public class RANDOM extends org.apache.pig.builtin.RANDOM {
}
Modified: pig/trunk/src/org/apache/pig/builtin/RANDOM.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RANDOM.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RANDOM.java Thu Mar 3 20:56:58 2016
@@ -23,6 +23,7 @@ import java.util.Random;
import org.apache.pig.EvalFunc;
import org.apache.pig.PigConstants;
+import org.apache.pig.StaticDataCleanup;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
@@ -49,7 +50,32 @@ public class RANDOM extends EvalFunc<Dou
if( r == null ) {
int jobidhash =
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
int taskIndex =
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
- r = new Random(((long) jobidhash) << 32 | (taskIndex &
0xffffffffL));
+
+ // XOR-ing 3 separate values
+ // |<-----32 bits---->|<----32 bits----->|
+ // |-jobidhash(int)---|-jobidhash(int)---|
+ // | |---taskIndex(int)--|
+ // |----------seedUniquifier (long)------|
+ // | |
+ // |<-- Only 48 bits used ----->|
+ // | by java.util.Random |
+ // | |
+ //
+ // Reason for repeating jobidhash and shifting taskIndex is, seed
+ // too close to each others would produce very similar values.
+ //
+ // Goal of this method is to produce a pseudo-random values that
+ // would
+ // (1) Produce a same sequence of peusdo-random variables for
attempts from same jobid/vertexid and taskid
+ // (2) When taskid, jobid, or vertexid(tez) differ, they should
produce a different random sequence
+ // (3) When Random is called more than once inside the script,
they should also produce different random values
+ // e.g. B = FOREACH A generate RANDOM(), RANDOM();
+ //
+ r = new Random( (((long) jobidhash) << 32 | (jobidhash &
0xffffffffL)) ^ ((long) taskIndex << 16) ^ seedUniquifier);
+
+ // L'Ecuyer, "Tables of Linear Congruential Generators of
+ // Different Sizes and Good Lattice Structure", 1999
+ seedUniquifier *= 4292484099903637661L;
}
return r.nextDouble();
}
@@ -58,4 +84,12 @@ public class RANDOM extends EvalFunc<Dou
public Schema outputSchema(Schema input) {
return new Schema(new
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(),
input), DataType.DOUBLE));
}
+
+ // Taking the initial seed value from java.util.Random
+ private static long seedUniquifier = 8682522807148012L;
+
+ @StaticDataCleanup
+ public static void resetSeedUniquifier() {
+ seedUniquifier = 8682522807148012L;
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Thu Mar 3 20:56:58 2016
@@ -3265,10 +3265,12 @@ public class TestBuiltin {
@Test
public void testRANDOM() throws Exception {
- PigMapReduce.sJobConfInternal.set(new Configuration());
+ Configuration conf = new Configuration();
+ PigMapReduce.sJobConfInternal.set(conf);
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
- PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
+ org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM();
double [] tmpresult = new double [5];
@@ -3282,16 +3284,18 @@ public class TestBuiltin {
}
// with different task id, random should return different number
+ org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
- PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"888");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "1");
r = new org.apache.pig.builtin.RANDOM();
for( int i = 0; i < 5 ; i++ ) {
assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
}
// with different jobid, random should return completely different
number
-
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_222");
- PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+ org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_112");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
r = new org.apache.pig.builtin.RANDOM();
for( int i = 0; i < 5 ; i++ ) {
assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
@@ -3299,12 +3303,23 @@ public class TestBuiltin {
// with same jobid and taskid, random should return exact same sequence
// of pseudo-random number
+ org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
- PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
r = new org.apache.pig.builtin.RANDOM();
for( int i = 0; i < 5 ; i++ ) {
assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
}
+
+ // When initialized again, they should return a different random values
+ // even when jobid and taskid match.
+ // To cover the case when RANDOM is called more than once in the user's
+ // script.
+ // B = FOREACH A generate RANDOM(), RANDOM();
+ r = new org.apache.pig.builtin.RANDOM();
+ for( int i = 0; i < 5 ; i++ ) {
+ assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+ }
}
@Test