Author: knoguchi
Date: Wed Mar 2 19:37:03 2016
New Revision: 1733355
URL: http://svn.apache.org/viewvc?rev=1733355&view=rev
Log:
PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
pig/trunk/src/org/apache/pig/builtin/RANDOM.java
pig/trunk/test/org/apache/pig/test/TestBuiltin.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 2 19:37:03 2016
@@ -97,6 +97,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4819: RANDOM() udf can lead to missing or redundant records (knoguchi)
+
PIG-4816: Read a null scalar causing a Tez failure (daijy)
PIG-4818: Single quote inside comment in GENERATE is not being ignored
(knoguchi)
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/RANDOM.java
Wed Mar 2 19:37:03 2016
@@ -19,17 +19,28 @@
package org.apache.pig.piggybank.evaluation.math;
import java.io.IOException;
+import java.util.Random;
import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
public class RANDOM extends EvalFunc<Double>{
+ private Random r = null;
- public Double exec(Tuple input) throws IOException {
- return Math.random();
- }
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ if( r == null ) {
+ int jobidhash =
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
+ int taskIndex =
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
+ r = new Random(((long) jobidhash) << 32 | (taskIndex &
0xffffffffL));
+ }
+ return r.nextDouble();
+ }
@Override
public Schema outputSchema(Schema input) {
Modified: pig/trunk/src/org/apache/pig/builtin/RANDOM.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/RANDOM.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/RANDOM.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/RANDOM.java Wed Mar 2 19:37:03 2016
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.util.Random;
import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
@@ -32,20 +35,24 @@ import org.apache.pig.data.DataType;
*/
@Nondeterministic
public class RANDOM extends EvalFunc<Double>{
- private Random r;
+ private Random r = null;
public RANDOM() {
- r = new Random();
}
public RANDOM(String seed) {
r = new Random(Long.parseLong(seed));
}
- @Override
- public Double exec(Tuple input) throws IOException {
- return r.nextDouble();
- }
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ if( r == null ) {
+ int jobidhash =
PigMapReduce.sJobConfInternal.get().get(MRConfiguration.JOB_ID).hashCode();
+ int taskIndex =
Integer.valueOf(PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX));
+ r = new Random(((long) jobidhash) << 32 | (taskIndex &
0xffffffffL));
+ }
+ return r.nextDouble();
+ }
@Override
public Schema outputSchema(Schema input) {
Modified: pig/trunk/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBuiltin.java?rev=1733355&r1=1733354&r2=1733355&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBuiltin.java Wed Mar 2 19:37:03 2016
@@ -18,6 +18,7 @@
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -40,13 +41,20 @@ import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.builtin.ARITY;
import org.apache.pig.builtin.AddDuration;
import org.apache.pig.builtin.BagSize;
@@ -136,6 +144,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class TestBuiltin {
+ private static final Log LOG = LogFactory.getLog(TestBuiltin.class);
private static PigServer pigServer;
private static Properties properties;
private static MiniGenericCluster cluster;
@@ -3206,17 +3215,96 @@ public class TestBuiltin {
pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
pigServer.registerQuery("B = foreach A generate name, UniqueID();");
Iterator<Tuple> iter = pigServer.openIterator("B");
- iter.next().get(1).equals("0-0");
- iter.next().get(1).equals("0-1");
- iter.next().get(1).equals("0-2");
- iter.next().get(1).equals("0-3");
- iter.next().get(1).equals("0-4");
- iter.next().get(1).equals("1-0");
- iter.next().get(1).equals("1-1");
- iter.next().get(1).equals("1-1");
- iter.next().get(1).equals("1-2");
- iter.next().get(1).equals("1-3");
- iter.next().get(1).equals("1-4");
+ assertEquals(iter.next().get(1),"0-0");
+ assertEquals(iter.next().get(1),"0-1");
+ assertEquals(iter.next().get(1),"0-2");
+ assertEquals(iter.next().get(1),"0-3");
+ assertEquals(iter.next().get(1),"0-4");
+ assertEquals(iter.next().get(1),"1-0");
+ assertEquals(iter.next().get(1),"1-1");
+ assertEquals(iter.next().get(1),"1-2");
+ assertEquals(iter.next().get(1),"1-3");
+ assertEquals(iter.next().get(1),"1-4");
+ }
+
+ @Test
+ public void testRANDOMWithJob() throws Exception {
+ Util.resetStateForExecModeSwitch();
+ String inputFileName = "testRANDOM.txt";
+ Util.createInputFile(cluster, inputFileName, new String[]
+ {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+ // running with two mappers
+
pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size",
"10");
+
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination",
"true");
+ pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+ pigServer.registerQuery("B = foreach A generate name, RANDOM();");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+ double [] mapper1 = new double[5];
+ double [] mapper2 = new double[5];
+ for( int i = 0; i < 5; i++ ){
+ mapper1[i] = (Double) iter.next().get(1);
+ if( i != 0 ) {
+ // making sure it's not creating same value
+ assertNotEquals(mapper1[i-1], mapper1[i], 0.0001);
+ }
+ }
+ for( int i = 0; i < 5; i++ ){
+ mapper2[i] = (Double) iter.next().get(1);
+ if( i != 0 ) {
+ // making sure it's not creating same value
+ assertNotEquals(mapper2[i-1], mapper2[i], 0.0001);
+ }
+ }
+ // making sure different mappers are creating different random values
+ for( int i = 0; i < 5; i++ ){
+ assertNotEquals(mapper1[i], mapper2[i], 0.0001);
+ }
+ }
+
+
+ @Test
+ public void testRANDOM() throws Exception {
+ PigMapReduce.sJobConfInternal.set(new Configuration());
+
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+
+ org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM();
+ double [] tmpresult = new double [5];
+
+ for( int i = 0; i < 5 ; i++ ) {
+ tmpresult[i] = r.exec(null).doubleValue();
+ LOG.info("Return value of RANDOM(): " + tmpresult[i]);
+ if( i != 0 ) {
+ //making sure RANDOM isn't returning some fixed number
+ assertNotEquals(tmpresult[i-1], tmpresult[i], 0.0001);
+ }
+ }
+
+ // with different task id, random should return different number
+
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"888");
+ r = new org.apache.pig.builtin.RANDOM();
+ for( int i = 0; i < 5 ; i++ ) {
+ assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+ }
+
+ // with different jobid, random should return completely different
number
+
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_222");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+ r = new org.apache.pig.builtin.RANDOM();
+ for( int i = 0; i < 5 ; i++ ) {
+ assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+ }
+
+ // with same jobid and taskid, random should return exact same sequence
+ // of pseudo-random number
+
PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+ PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
"999");
+ r = new org.apache.pig.builtin.RANDOM();
+ for( int i = 0; i < 5 ; i++ ) {
+ assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+ }
}
@Test