Author: xuefu
Date: Wed Mar 16 13:23:30 2016
New Revision: 1735229
URL: http://svn.apache.org/viewvc?rev=1735229&view=rev
Log:
PIG-4838: Fix test TestBuiltin (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java?rev=1735229&r1=1735228&r2=1735229&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
Wed Mar 16 13:23:30 2016
@@ -21,9 +21,12 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import java.util.UUID;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -74,18 +77,20 @@ public class ForEachConverter implements
}
void initializeJobConf() {
- if (this.jobConf == null) {
- this.jobConf =
KryoSerializer.deserializeJobConf(this.confBytes);
- PigMapReduce.sJobConfInternal.set(jobConf);
- try {
- MapRedUtil.setupUDFContext(jobConf);
- PigContext pc = (PigContext)
ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
- SchemaTupleBackend.initialize(jobConf, pc);
-
- } catch (IOException ioe) {
- String msg = "Problem while configuring UDFContext from
ForEachConverter.";
- throw new RuntimeException(msg, ioe);
- }
+ if (this.jobConf != null) {
+ return;
+ }
+ this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ try {
+ MapRedUtil.setupUDFContext(jobConf);
+ PigContext pc = (PigContext)
ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf, pc);
+ jobConf.set(MRConfiguration.JOB_ID,
UUID.randomUUID().toString());
+ jobConf.set(PigConstants.TASK_INDEX, "0");
+ } catch (IOException ioe) {
+ String msg = "Problem while configuring UDFContext from
ForEachConverter.";
+ throw new RuntimeException(msg, ioe);
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1735229&r1=1735228&r2=1735229&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Wed Mar 16
13:23:30 2016
@@ -3215,16 +3215,46 @@ public class TestBuiltin {
pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
pigServer.registerQuery("B = foreach A generate name, UniqueID();");
Iterator<Tuple> iter = pigServer.openIterator("B");
- assertEquals(iter.next().get(1),"0-0");
- assertEquals(iter.next().get(1),"0-1");
- assertEquals(iter.next().get(1),"0-2");
- assertEquals(iter.next().get(1),"0-3");
- assertEquals(iter.next().get(1),"0-4");
- assertEquals(iter.next().get(1),"1-0");
- assertEquals(iter.next().get(1),"1-1");
- assertEquals(iter.next().get(1),"1-2");
- assertEquals(iter.next().get(1),"1-3");
- assertEquals(iter.next().get(1),"1-4");
+ if (!Util.isSparkExecType(cluster.getExecType())) {
+ assertEquals(iter.next().get(1), "0-0");
+ assertEquals(iter.next().get(1), "0-1");
+ assertEquals(iter.next().get(1), "0-2");
+ assertEquals(iter.next().get(1), "0-3");
+ assertEquals(iter.next().get(1), "0-4");
+ assertEquals(iter.next().get(1), "1-0");
+ assertEquals(iter.next().get(1), "1-1");
+ assertEquals(iter.next().get(1), "1-2");
+ assertEquals(iter.next().get(1), "1-3");
+ assertEquals(iter.next().get(1), "1-4");
+ } else{
+ //because we set PigConstants.TASK_INDEX as 0 in
ForEachConverter#ForEachFunction#initializeJobConf
+ //UniqueID.exec() will output like 0-*
+ //there will be 2 InputSplits when mapred.max.split.size is
10(byte) for the testUniqueID.txt(20 bytes)
+ //Split0:
+ // 1\n
+ // 2\n
+ // 3\n
+ // 4\n
+ // 5\n
+ // 1\n
+ //Split1:
+ // 2\n
+ // 3\n
+ // 4\n
+ // 5\n
+ //The size of Split0 is 12 not 10 because
LineRecordReader#nextKeyValue will read one more line
+ //More detail see PIG-4383
+ assertEquals(iter.next().get(1), "0-0");
+ assertEquals(iter.next().get(1), "0-1");
+ assertEquals(iter.next().get(1), "0-2");
+ assertEquals(iter.next().get(1), "0-3");
+ assertEquals(iter.next().get(1), "0-4");
+ assertEquals(iter.next().get(1), "0-5");
+ assertEquals(iter.next().get(1), "0-0");
+ assertEquals(iter.next().get(1), "0-1");
+ assertEquals(iter.next().get(1), "0-2");
+ assertEquals(iter.next().get(1), "0-3");
+ }
}
@Test