Author: xuefu
Date: Thu May 7 20:37:18 2015
New Revision: 1678260
URL: http://svn.apache.org/r1678260
Log:
PIG-4276: Fix ordering related failures in TestEvalPipeline for Spark (Mohit
via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
pig/branches/spark/test/org/apache/pig/test/Util.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Thu May 7 20:37:18 2015
@@ -88,13 +88,12 @@ public class SparkUtil {
public static int getParallelism(List<RDD<Tuple>> predecessors,
PhysicalOperator physicalOperator) {
- int parallelism = physicalOperator.getRequestedParallelism();
- if (parallelism <= 0) {
- // Parallelism wasn't set in Pig, so set it to whatever Spark
thinks
- // is reasonable.
- parallelism = predecessors.get(0).context().defaultParallelism();
- }
- return parallelism;
+ int parallelism = physicalOperator.getRequestedParallelism();
+ if (parallelism <= 0) {
+ // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
+ // is reasonable.
+ parallelism = predecessors.get(0).context().defaultParallelism();
+ }
+ return parallelism;
}
-
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Thu May
7 20:37:18 2015
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -417,9 +418,20 @@ public class TestEvalPipeline {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
- int numIdentity = 0;
+ List<Tuple> actualResList = new ArrayList<Tuple>();
while(iter.hasNext()){
- Tuple t = iter.next();
+ actualResList.add(iter.next());
+ }
+
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ for (Tuple t : actualResList) {
+ Util.convertBagToSortedBag(t);
+ }
+ Collections.sort(actualResList);
+ }
+
+ int numIdentity = 0;
+ for (Tuple t : actualResList) {
Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
Assert.assertEquals((Long)5L, (Long)t.get(2));
Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -462,9 +474,20 @@ public class TestEvalPipeline {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
- int numIdentity = 0;
+ List<Tuple> actualResList = new ArrayList<Tuple>();
while(iter.hasNext()){
- Tuple t = iter.next();
+ actualResList.add(iter.next());
+ }
+
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ for (Tuple t : actualResList) {
+ Util.convertBagToSortedBag(t);
+ }
+ Collections.sort(actualResList);
+ }
+
+ int numIdentity = 0;
+ for (Tuple t : actualResList) {
Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
Assert.assertEquals((Long)5L, (Long)t.get(2));
Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -838,9 +861,20 @@ public class TestEvalPipeline {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
- int numIdentity = 0;
+ List<Tuple> actualResList = new ArrayList<Tuple>();
while(iter.hasNext()){
- Tuple t = iter.next();
+ actualResList.add(iter.next());
+ }
+
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ for (Tuple t : actualResList) {
+ Util.convertBagToSortedBag(t);
+ }
+ Collections.sort(actualResList);
+ }
+
+ int numIdentity = 0;
+ for (Tuple t : actualResList) {
Assert.assertEquals((Integer)((numIdentity + 1) * 10),
(Integer)t.get(0));
Assert.assertEquals((Long)10L, (Long)t.get(1));
Assert.assertEquals((Long)5L, (Long)t.get(2));
@@ -869,6 +903,10 @@ public class TestEvalPipeline {
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ pigServer.registerQuery("B = order B by *;");
+ }
+
String query = "C = foreach B {"
+ "C1 = $1 - $0;"
+ "C2 = $1%2;"
@@ -879,7 +917,6 @@ public class TestEvalPipeline {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
-
int numRows = 0;
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j = 0; j < LOOP_COUNT; j+=2){
@@ -916,6 +953,10 @@ public class TestEvalPipeline {
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = distinct A;");
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ pigServer.registerQuery("B = order B by *;");
+ }
+
String query = "C = foreach B {"
+ "C1 = $0 + $1;"
+ "C2 = C1 + $0;"
@@ -925,7 +966,6 @@ public class TestEvalPipeline {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
-
int numRows = 0;
for(int i = 0; i < LOOP_COUNT; i++) {
for(int j = 0; j < LOOP_COUNT; j+=2){
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Thu May
7 20:37:18 2015
@@ -137,9 +137,20 @@ public class TestEvalPipeline2 {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("C");
if(!iter.hasNext()) Assert.fail("No output found");
- int numIdentity = 0;
+ List<Tuple> actualResList = new ArrayList<Tuple>();
while(iter.hasNext()){
- Tuple tuple = iter.next();
+ actualResList.add(iter.next());
+ }
+
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ for (Tuple t : actualResList) {
+ Util.convertBagToSortedBag(t);
+ }
+ Collections.sort(actualResList);
+ }
+
+ int numIdentity = 0;
+ for (Tuple tuple : actualResList) {
Tuple t = (Tuple)tuple.get(0);
Assert.assertEquals(DataByteArray.class, t.get(0).getClass());
int group = Integer.parseInt(new
String(((DataByteArray)t.get(0)).get()));
@@ -469,17 +480,24 @@ public class TestEvalPipeline2 {
pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
- Assert.assertTrue(iter.hasNext());
- Tuple t = iter.next();
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ String[] expectedResults =
+ new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})"
};
+ Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults,
+
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ } else {
+ Assert.assertTrue(iter.hasNext());
+ Tuple t = iter.next();
- Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+ Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
- Assert.assertTrue(iter.hasNext());
- t = iter.next();
+ Assert.assertTrue(iter.hasNext());
+ t = iter.next();
- Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+ Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
- Assert.assertFalse(iter.hasNext());
+ Assert.assertFalse(iter.hasNext());
+ }
}
// See PIG-1195
@@ -732,16 +750,18 @@ public class TestEvalPipeline2 {
pigServer.registerQuery(query);
Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute");
- Tuple t = iter.next();
- Assert.assertTrue( (Long)t.get(0) == 60000 && (Long)t.get(1) == 2 &&
(Long)t.get(2) == 3 );
-
- t = iter.next();
- Assert.assertTrue( (Long)t.get(0) == 120000 && (Long)t.get(1) == 2 &&
(Long)t.get(2) == 2 );
-
- t = iter.next();
- Assert.assertTrue( (Long)t.get(0) == 240000 && (Long)t.get(1) == 1 &&
(Long)t.get(2) == 1 );
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)",
"(240000L,1L,1L)"});
- Assert.assertFalse(iter.hasNext());
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
+ } else {
+ // Even though GROUP BY does not return results sorted by key, that
+ // is the current behavior for MR, which some users rely on. Let's
+ // not sort the results for MR and Tez so that we can catch it when
+ // current MR/Tez behavior changes.
+ Util.checkQueryOutputs(iter, expectedResults);
+ }
}
// See PIG-1729
@@ -1572,6 +1592,9 @@ public class TestEvalPipeline2 {
pigServer.registerQuery("data = load 'table_testLimitFlatten' as
(k,v);");
pigServer.registerQuery("grouped = GROUP data BY k;");
+ if (Util.isSparkExecType(cluster.getExecType())) {
+ pigServer.registerQuery("grouped = ORDER grouped BY group;");
+ }
pigServer.registerQuery("selected = LIMIT grouped 2;");
pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN
(data);");
@@ -1579,7 +1602,8 @@ public class TestEvalPipeline2 {
String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected,
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
}
// See PIG-2237
Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu May 7 20:37:18
2015
@@ -554,7 +554,7 @@ public class Util {
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
-
+
}
/**
@@ -1151,7 +1151,7 @@ public class Util {
}
- static private void convertBagToSortedBag(Tuple t) {
+ static public void convertBagToSortedBag(Tuple t) {
for (int i=0;i<t.size();i++) {
Object obj = null;
try {
@@ -1327,6 +1327,14 @@ public class Util {
return execType == ExecType.MAPREDUCE;
}
+ public static boolean isSparkExecType(ExecType execType) {
+ if (execType.name().toLowerCase().startsWith("spark")) {
+ return true;
+ }
+
+ return false;
+ }
+
public static String findPigJarName() {
final String suffix = System.getProperty("hadoopversion").equals("20")
? "1" : "2";
File baseDir = new File(".");