Author: knoguchi
Date: Thu Mar  3 20:56:58 2016
New Revision: 1733522

URL: http://svn.apache.org/viewvc?rev=1733522&view=rev
Log:
Fixing bug from PIG-4819: RANDOM() udf can lead to missing or redundant records 
(knoguchi)

Modified:
    
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
    pig/trunk/src/org/apache/pig/builtin/RANDOM.java
    pig/trunk/test/org/apache/pig/test/TestBuiltin.java

Modified: 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
 Thu Mar  3 20:56:58 2016
@@ -18,32 +18,5 @@
 
 package org.apache.pig.piggybank.evaluation.math;
 
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConstants;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.data.DataType;
-
-public class RANDOM extends EvalFunc<Double>{
-    private Random r = null;
-
-    @Override
-    public Double exec(Tuple input) throws IOException {
-        if( r == null ) {
-            int jobidhash = 
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
-            int taskIndex = 
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
-            r = new Random(((long) jobidhash) << 32 | (taskIndex & 
0xffffffffL));
-        }
-        return r.nextDouble();
-    }
-
-    @Override
-    public Schema outputSchema(Schema input) {
-        return new Schema(new 
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
input), DataType.DOUBLE));
-    }
+public class RANDOM extends org.apache.pig.builtin.RANDOM {
 }

Modified: pig/trunk/src/org/apache/pig/builtin/RANDOM.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RANDOM.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RANDOM.java Thu Mar  3 20:56:58 2016
@@ -23,6 +23,7 @@ import java.util.Random;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigConstants;
+import org.apache.pig.StaticDataCleanup;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
@@ -49,7 +50,32 @@ public class RANDOM extends EvalFunc<Dou
         if( r == null ) {
             int jobidhash = 
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
             int taskIndex = 
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
-            r = new Random(((long) jobidhash) << 32 | (taskIndex & 
0xffffffffL));
+
+            // XOR-ing 3 separate values
+            // |<-----32 bits---->|<----32 bits----->|
+            // |-jobidhash(int)---|-jobidhash(int)---|
+            // |        |---taskIndex(int)--|
+            // |----------seedUniquifier (long)------|
+            //          |                            |
+            //          |<-- Only 48 bits used ----->|
+            //          |    by java.util.Random     |
+            //          |                            |
+            //
+            // Reason for repeating jobidhash and shifting taskIndex is, seed
+            // too close to each others would produce very similar values.
+            //
+            // Goal of this method is to produce a pseudo-random values that
+            // would
+            // (1) Produce a same sequence of peusdo-random variables for 
attempts from same jobid/vertexid and taskid
+            // (2) When taskid, jobid, or vertexid(tez) differ, they should 
produce a different random sequence
+            // (3) When Random is called more than once inside the script, 
they should also produce different random values
+            //     e.g. B = FOREACH A generate RANDOM(), RANDOM();
+            //
+            r = new Random( (((long) jobidhash) << 32 | (jobidhash &  
0xffffffffL))  ^ ((long) taskIndex << 16) ^ seedUniquifier);
+
+            // L'Ecuyer, "Tables of Linear Congruential Generators of
+            // Different Sizes and Good Lattice Structure", 1999
+            seedUniquifier *= 4292484099903637661L;
         }
         return r.nextDouble();
     }
@@ -58,4 +84,12 @@ public class RANDOM extends EvalFunc<Dou
     public Schema outputSchema(Schema input) {
         return new Schema(new 
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
input), DataType.DOUBLE));
     }
+
+    // Taking the initial seed value from java.util.Random
+    private static long seedUniquifier = 8682522807148012L;
+
+    @StaticDataCleanup
+    public static void resetSeedUniquifier() {
+        seedUniquifier = 8682522807148012L;
+    }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1733522&r1=1733521&r2=1733522&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Thu Mar  3 20:56:58 2016
@@ -3265,10 +3265,12 @@ public class TestBuiltin {
 
     @Test
     public void testRANDOM() throws Exception {
-        PigMapReduce.sJobConfInternal.set(new Configuration());
+        Configuration conf = new Configuration();
+        PigMapReduce.sJobConfInternal.set(conf);
         
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
-        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
"999");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
 
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
         org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM();
         double [] tmpresult = new double [5];
 
@@ -3282,16 +3284,18 @@ public class TestBuiltin {
         }
 
         // with different task id, random should return different number
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
         
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
-        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
"888");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "1");
         r = new org.apache.pig.builtin.RANDOM();
         for( int i = 0; i < 5 ; i++ ) {
             assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
         }
 
         // with different jobid, random should return completely different 
number
-        
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_222");
-        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
"999");
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+        
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_112");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
         r = new org.apache.pig.builtin.RANDOM();
         for( int i = 0; i < 5 ; i++ ) {
             assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
@@ -3299,12 +3303,23 @@ public class TestBuiltin {
 
         // with same jobid and taskid, random should return exact same sequence
         // of pseudo-random number
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
         
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
-        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, 
"999");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
         r = new org.apache.pig.builtin.RANDOM();
         for( int i = 0; i < 5 ; i++ ) {
             assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
         }
+
+        // When initialized again, they should return a different random values
+        // even when jobid and taskid match.
+        // To cover the case when RANDOM is called more than once in the user's
+        // script.
+        // B = FOREACH A generate RANDOM(), RANDOM();
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+        }
     }
 
     @Test


Reply via email to