Author: xuefu
Date: Thu May  7 20:37:18 2015
New Revision: 1678260

URL: http://svn.apache.org/r1678260
Log:
PIG-4276: Fix ordering related failures in TestEvalPipeline for Spark (Mohit 
via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/branches/spark/test/org/apache/pig/test/Util.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Thu May  7 20:37:18 2015
@@ -88,13 +88,12 @@ public class SparkUtil {
 
     public static int getParallelism(List<RDD<Tuple>> predecessors,
             PhysicalOperator physicalOperator) {
-        int parallelism = physicalOperator.getRequestedParallelism();
-        if (parallelism <= 0) {
-            // Parallelism wasn't set in Pig, so set it to whatever Spark 
thinks
-            // is reasonable.
-            parallelism = predecessors.get(0).context().defaultParallelism();
-        }
-        return parallelism;
+      int parallelism = physicalOperator.getRequestedParallelism();
+      if (parallelism <= 0) {
+        // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
+        // is reasonable.
+        parallelism = predecessors.get(0).context().defaultParallelism();
+      }
+      return parallelism;
     }
-
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Thu May  
7 20:37:18 2015
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -417,9 +418,20 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -462,9 +474,20 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
@@ -838,9 +861,20 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+
+        int numIdentity = 0;
+        for (Tuple t : actualResList) {
             Assert.assertEquals((Integer)((numIdentity + 1) * 10), 
(Integer)t.get(0));
             Assert.assertEquals((Long)10L, (Long)t.get(1));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
@@ -869,6 +903,10 @@ public class TestEvalPipeline {
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("B = order B by *;");
+        }
+
         String query = "C = foreach B {"
         + "C1 = $1 - $0;"
         + "C2 = $1%2;"
@@ -879,7 +917,6 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
@@ -916,6 +953,10 @@ public class TestEvalPipeline {
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = distinct A;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("B = order B by *;");
+        }
+
         String query = "C = foreach B {"
         + "C1 = $0 + $1;"
         + "C2 = C1 + $0;"
@@ -925,7 +966,6 @@ public class TestEvalPipeline {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Thu May  
7 20:37:18 2015
@@ -137,9 +137,20 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-        int numIdentity = 0;
+        List<Tuple> actualResList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple tuple = iter.next();
+            actualResList.add(iter.next());
+        }
+
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+
+        int numIdentity = 0;
+        for (Tuple tuple : actualResList) {
             Tuple t = (Tuple)tuple.get(0);
             Assert.assertEquals(DataByteArray.class, t.get(0).getClass());
             int group = Integer.parseInt(new 
String(((DataByteArray)t.get(0)).get()));
@@ -469,17 +480,24 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
 
-        Assert.assertTrue(iter.hasNext());
-        Tuple t = iter.next();
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            String[] expectedResults =
+                new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" 
};
+            Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults,
+                
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+        } else {
+            Assert.assertTrue(iter.hasNext());
+            Tuple t = iter.next();
 
-        Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+            Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
 
-        Assert.assertTrue(iter.hasNext());
-        t = iter.next();
+            Assert.assertTrue(iter.hasNext());
+            t = iter.next();
 
-        Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+            Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
 
-        Assert.assertFalse(iter.hasNext());
+            Assert.assertFalse(iter.hasNext());
+        }
     }
 
     // See PIG-1195
@@ -732,16 +750,18 @@ public class TestEvalPipeline2 {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute");
 
-        Tuple t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 60000 && (Long)t.get(1) == 2 && 
(Long)t.get(2) == 3 );
-
-        t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 120000 && (Long)t.get(1) == 2 && 
(Long)t.get(2) == 2 );
-
-        t = iter.next();
-        Assert.assertTrue( (Long)t.get(0) == 240000 && (Long)t.get(1) == 1 && 
(Long)t.get(2) == 1 );
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+            new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", 
"(240000L,1L,1L)"});
 
-        Assert.assertFalse(iter.hasNext());
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
+        } else {
+            // Even though GROUP BY does not return results sorted by key, that
+            // is the current behavior for MR, which some users rely on. Let's
+            // not sort the results for MR and Tez so that we can catch it when
+            // current MR/Tez behavior changes.
+            Util.checkQueryOutputs(iter, expectedResults);
+        }
     }
 
     // See PIG-1729
@@ -1572,6 +1592,9 @@ public class TestEvalPipeline2 {
 
         pigServer.registerQuery("data = load 'table_testLimitFlatten' as 
(k,v);");
         pigServer.registerQuery("grouped = GROUP data BY k;");
+        if (Util.isSparkExecType(cluster.getExecType())) {
+            pigServer.registerQuery("grouped = ORDER grouped BY group;");
+        }
         pigServer.registerQuery("selected = LIMIT grouped 2;");
         pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN 
(data);");
 
@@ -1579,7 +1602,8 @@ public class TestEvalPipeline2 {
 
         String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, 
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
+        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
+            
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
     }
 
     // See PIG-2237

Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1678260&r1=1678259&r2=1678260&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu May  7 20:37:18 
2015
@@ -554,7 +554,7 @@ public class Util {
 
          Assert.assertEquals("Comparing actual and expected results. ",
                  expectedResList, actualResList);
-
+         
     }
 
     /**
@@ -1151,7 +1151,7 @@ public class Util {
     }
 
 
-    static private void convertBagToSortedBag(Tuple t) {
+    static public void convertBagToSortedBag(Tuple t) {
         for (int i=0;i<t.size();i++) {
            Object obj = null;
            try {
@@ -1327,6 +1327,14 @@ public class Util {
         return execType == ExecType.MAPREDUCE;
     }
 
+    public static boolean isSparkExecType(ExecType execType) {
+        if (execType.name().toLowerCase().startsWith("spark")) {
+            return true;
+        }
+
+        return false;
+    }
+
     public static String findPigJarName() {
         final String suffix = System.getProperty("hadoopversion").equals("20") 
? "1" : "2";
         File baseDir = new File(".");


Reply via email to