Author: xuefu
Date: Mon Feb 15 02:44:39 2016
New Revision: 1730442

URL: http://svn.apache.org/viewvc?rev=1730442&view=rev
Log:
PIG-4777: Enable TestEvalPipelineLocal for spark (Prateek via Xuefu)

Modified:
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1730442&r1=1730441&r2=1730442&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Mon 
Feb 15 02:44:39 2016
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.StringTokenizer;
+import java.util.Collections;
 
 import junit.framework.Assert;
 
@@ -864,7 +865,7 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
-        for(int i = 0; i < LOOP_COUNT; i++) {
+        for(int i=0; i<LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
@@ -888,18 +889,29 @@ public class TestEvalPipelineLocal {
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
+        // When running with spark, output can be in a different order than 
that 
+        // when running in mr mode.
+        List<Tuple> resList = new ArrayList<Tuple>();
         while(iter.hasNext()){
-            Tuple t = iter.next();
-            Assert.assertEquals((Integer)((numIdentity + 1) * 10), 
(Integer)t.get(0));
+            resList.add(iter.next());
+        }
+
+        numIdentity = resList.size();
+        Collections.sort(resList);
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
+        // Since delta differences in some cases are allowed, utility function 
+        // to compare tuple-lists cannot be used here.
+        // This loop generates sorted expected data
+        for (int i=0; i<numIdentity; i++) {
+            Tuple t = resList.get(i);
+            Assert.assertEquals((Integer)((i + 1) * 10), (Integer)t.get(0));
             Assert.assertEquals((Long)10L, (Long)t.get(1));
             Assert.assertEquals((Long)5L, (Long)t.get(2));
             Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
             Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
             Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
             Assert.assertEquals(7, t.size());
-            ++numIdentity;
         }
-        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test
@@ -907,12 +919,25 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+
+        List<Tuple> expectedList = new ArrayList<Tuple>();
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
+                //  Generating expected data
+                Tuple t = mTf.newTuple();
+                t.append(new Double(j - i));
+                t.append((Integer)(j%2));
+                if(j == 0) {
+                    t.append(0.0);
+                } else {
+                    t.append((Double)((double)i/j));
+                }
+                expectedList.add(t);
             }
         }
+        Collections.sort(expectedList);  
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
@@ -929,25 +954,30 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
+        // When ruuning with spark, output can be in a different order than 
when
+        // running in mr mode.
+        List<Tuple> resList = new ArrayList<Tuple>();
+        while(iter.hasNext()){
+            resList.add(iter.next());
+        }
 
-        int numRows = 0;
+        Collections.sort(resList);
+        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size());
+
+        // Since delta difference in some cases is allowed, utility function 
+        // to compare tuple-lists cannot be used here.
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
-                Tuple t = null;
-                if(iter.hasNext()) t = iter.next();
-                Assert.assertEquals(3, t.size());
-                Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
-                Assert.assertEquals((Integer)(j%2), (Integer)t.get(1));
-                if(j == 0) {
-                    Assert.assertEquals(0.0, (Double)t.get(2), 0.01);
-                } else {
-                    Assert.assertEquals((Double)((double)i/j), 
(Double)t.get(2), 0.01);
-                }
-                ++numRows;
+                int k = i*LOOP_COUNT/2 + j/2;
+                Tuple res_t = resList.get(k);
+                Tuple expec_t = expectedList.get(k);
+
+                Assert.assertEquals(expec_t.size(), res_t.size());
+                Assert.assertEquals((Double)expec_t.get(0), 
(Double)res_t.get(0), 0.01);
+                Assert.assertEquals((Integer)expec_t.get(1), 
(Integer)res_t.get(1));
+                Assert.assertEquals((Double)expec_t.get(2), 
(Double)res_t.get(2), 0.01);
             }
         }
-
-        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
 
     @Test
@@ -955,10 +985,16 @@ public class TestEvalPipelineLocal {
         int LOOP_COUNT = 10;
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        List<Tuple> expectedList = new ArrayList<Tuple>();
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j=0;j<LOOP_COUNT;j+=2){
                 ps.println(i+"\t"+j);
                 ps.println(i+"\t"+j);
+                // Generating expected data.
+                Tuple t = mTf.newTuple();
+                t.append(new Double(i+j));
+                t.append(new Double(i + j + i));
+                expectedList.add(t);
             }
         }
         ps.close();
@@ -976,20 +1012,9 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) Assert.fail("No output found");
-
-        int numRows = 0;
-        for(int i = 0; i < LOOP_COUNT; i++) {
-            for(int j = 0; j < LOOP_COUNT; j+=2){
-                Tuple t = null;
-                if(iter.hasNext()) t = iter.next();
-                Assert.assertEquals(2, t.size());
-                Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
-                Assert.assertEquals(new Double(i + j + i), (Double)t.get(1));
-                ++numRows;
-            }
-        }
-
-        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+        // When ruuning with spark, output can be in a different order than 
that
+        // when running in mr mode.
+        Util.checkQueryOutputsAfterSort(iter, expectedList);
     }
 
     @Test
@@ -1108,9 +1133,15 @@ public class TestEvalPipelineLocal {
         pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t, 
z;");
         pigServer.registerQuery("c = group b by t;");
         Iterator<Tuple> iter = pigServer.openIterator("c");
-        
Assert.assertTrue(iter.next().toString().equals("((1,2),{((1,2),3)})"));
-        
Assert.assertTrue(iter.next().toString().equals("((4,5),{((4,5),6)})"));
-        Assert.assertFalse(iter.hasNext());
+        // When ruuning with spark, output can be in a different order than 
that 
+        // when running in mr mode.
+        List<Tuple> expectedRes =
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {
+                                "((1,2),{((1,2),3)})",
+                                "((4,5),{((4,5),6)})"
+                        });
+        Util.checkQueryOutputsAfterSort(iter, expectedRes);
     }
     
     @Test


Reply via email to