Author: xuefu
Date: Mon Feb 15 02:44:39 2016
New Revision: 1730442
URL: http://svn.apache.org/viewvc?rev=1730442&view=rev
Log:
PIG-4777: Enable TestEvalPipelineLocal for spark (Prateek via Xuefu)
Modified:
pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1730442&r1=1730441&r2=1730442&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Mon
Feb 15 02:44:39 2016
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;
+import java.util.Collections;
import junit.framework.Assert;
@@ -864,7 +865,7 @@ public class TestEvalPipelineLocal {
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
- for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int i=0; i<LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
ps.println(i+"\t"+j);
@@ -888,18 +889,29 @@ public class TestEvalPipelineLocal {
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
int numIdentity = 0;
+ // When running with spark, output can be in a different order than
that
+ // when running in mr mode.
+ List<Tuple> resList = new ArrayList<Tuple>();
while(iter.hasNext()){
- Tuple t = iter.next();
- Assert.assertEquals((Integer)((numIdentity + 1) * 10),
(Integer)t.get(0));
+ resList.add(iter.next());
+ }
+
+ numIdentity = resList.size();
+ Collections.sort(resList);
+ Assert.assertEquals(LOOP_COUNT, numIdentity);
+ // Since delta differences in some cases are allowed, utility function
+ // to compare tuple-lists cannot be used here.
+ // This loop generates sorted expected data
+ for (int i=0; i<numIdentity; i++) {
+ Tuple t = resList.get(i);
+ Assert.assertEquals((Integer)((i + 1) * 10), (Integer)t.get(0));
Assert.assertEquals((Long)10L, (Long)t.get(1));
Assert.assertEquals((Long)5L, (Long)t.get(2));
Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
Assert.assertEquals(7, t.size());
- ++numIdentity;
}
- Assert.assertEquals(LOOP_COUNT, numIdentity);
}
@Test
@@ -907,12 +919,25 @@ public class TestEvalPipelineLocal {
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+
+ List<Tuple> expectedList = new ArrayList<Tuple>();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
ps.println(i+"\t"+j);
+ // Generating expected data
+ Tuple t = mTf.newTuple();
+ t.append(new Double(j - i));
+ t.append((Integer)(j%2));
+ if(j == 0) {
+ t.append(0.0);
+ } else {
+ t.append((Double)((double)i/j));
+ }
+ expectedList.add(t);
}
}
+ Collections.sort(expectedList);
ps.close();
pigServer.registerQuery("A = LOAD '"
@@ -929,25 +954,30 @@ public class TestEvalPipelineLocal {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
+ // When ruuning with spark, output can be in a different order than
when
+ // running in mr mode.
+ List<Tuple> resList = new ArrayList<Tuple>();
+ while(iter.hasNext()){
+ resList.add(iter.next());
+ }
- int numRows = 0;
+ Collections.sort(resList);
+ Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size());
+
+ // Since delta difference in some cases is allowed, utility function
+ // to compare tuple-lists cannot be used here.
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j = 0; j < LOOP_COUNT; j+=2){
- Tuple t = null;
- if(iter.hasNext()) t = iter.next();
- Assert.assertEquals(3, t.size());
- Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
- Assert.assertEquals((Integer)(j%2), (Integer)t.get(1));
- if(j == 0) {
- Assert.assertEquals(0.0, (Double)t.get(2), 0.01);
- } else {
- Assert.assertEquals((Double)((double)i/j),
(Double)t.get(2), 0.01);
- }
- ++numRows;
+ int k = i*LOOP_COUNT/2 + j/2;
+ Tuple res_t = resList.get(k);
+ Tuple expec_t = expectedList.get(k);
+
+ Assert.assertEquals(expec_t.size(), res_t.size());
+ Assert.assertEquals((Double)expec_t.get(0),
(Double)res_t.get(0), 0.01);
+ Assert.assertEquals((Integer)expec_t.get(1),
(Integer)res_t.get(1));
+ Assert.assertEquals((Double)expec_t.get(2),
(Double)res_t.get(2), 0.01);
}
}
-
- Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
}
@Test
@@ -955,10 +985,16 @@ public class TestEvalPipelineLocal {
int LOOP_COUNT = 10;
File tmpFile = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ List<Tuple> expectedList = new ArrayList<Tuple>();
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j=0;j<LOOP_COUNT;j+=2){
ps.println(i+"\t"+j);
ps.println(i+"\t"+j);
+ // Generating expected data.
+ Tuple t = mTf.newTuple();
+ t.append(new Double(i+j));
+ t.append(new Double(i + j + i));
+ expectedList.add(t);
}
}
ps.close();
@@ -976,20 +1012,9 @@ public class TestEvalPipelineLocal {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
-
- int numRows = 0;
- for(int i = 0; i < LOOP_COUNT; i++) {
- for(int j = 0; j < LOOP_COUNT; j+=2){
- Tuple t = null;
- if(iter.hasNext()) t = iter.next();
- Assert.assertEquals(2, t.size());
- Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
- Assert.assertEquals(new Double(i + j + i), (Double)t.get(1));
- ++numRows;
- }
- }
-
- Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+ // When ruuning with spark, output can be in a different order than
that
+ // when running in mr mode.
+ Util.checkQueryOutputsAfterSort(iter, expectedList);
}
@Test
@@ -1108,9 +1133,15 @@ public class TestEvalPipelineLocal {
pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t,
z;");
pigServer.registerQuery("c = group b by t;");
Iterator<Tuple> iter = pigServer.openIterator("c");
-
Assert.assertTrue(iter.next().toString().equals("((1,2),{((1,2),3)})"));
-
Assert.assertTrue(iter.next().toString().equals("((4,5),{((4,5),6)})"));
- Assert.assertFalse(iter.hasNext());
+ // When ruuning with spark, output can be in a different order than
that
+ // when running in mr mode.
+ List<Tuple> expectedRes =
+ Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "((1,2),{((1,2),3)})",
+ "((4,5),{((4,5),6)})"
+ });
+ Util.checkQueryOutputsAfterSort(iter, expectedRes);
}
@Test