Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Fri Feb 24 
08:19:42 2017
@@ -17,17 +17,36 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-
-
-import org.apache.pig.data.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DistinctDataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.Spillable;
 import org.junit.After;
 import org.junit.Test;
@@ -36,7 +55,7 @@ import org.junit.Test;
 /**
  * This class will exercise the basic Pig data model and members. It tests for 
proper behavior in
  * assignment and comparison, as well as function application.
- * 
+ *
  * @author dnm
  */
 public class TestDataBag  {
@@ -590,7 +609,7 @@ public class TestDataBag  {
             }
             mgr.forceSpill();
         }
-        
+
        assertEquals("Size of distinct data bag is incorrect", 
rightAnswer.size(), b.size());
 
         // Read tuples back, hopefully they come out in the same order.
@@ -719,14 +738,14 @@ public class TestDataBag  {
     @Test
     public void testDefaultBagFactory() throws Exception {
         BagFactory f = BagFactory.getInstance();
-       
+
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
-        assertTrue("Expected a distinct bag", (distinct instanceof 
DistinctDataBag));         
+        assertTrue("Expected a distinct bag", (distinct instanceof 
DistinctDataBag));
     }
 
     @Test
@@ -756,7 +775,7 @@ public class TestDataBag  {
         try {
             BagFactory f = BagFactory.getInstance();
         } catch (RuntimeException re) {
-            assertEquals("Expected does not extend BagFactory message", 
+            assertEquals("Expected does not extend BagFactory message",
                 "Provided factory org.apache.pig.test.TestDataBag does not 
extend BagFactory!",
                 re.getMessage());
             caughtIt = true;
@@ -775,7 +794,7 @@ public class TestDataBag  {
 
         BagFactory.resetSelf();
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
@@ -789,7 +808,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
@@ -804,7 +823,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
@@ -820,7 +839,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
@@ -837,35 +856,35 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
-    public void testInternalCachedBag() throws Exception {    
+
+    public void testInternalCachedBag() throws Exception {
        // check adding empty tuple
        DataBag bg0 = new InternalCachedBag();
        bg0.add(TupleFactory.getInstance().newTuple());
        bg0.add(TupleFactory.getInstance().newTuple());
        assertEquals(bg0.size(), 2);
-       
+
        // check equal of bags
        DataBag bg1 = new InternalCachedBag(1, 0.5f);
        assertEquals(bg1.size(), 0);
-       
+
        String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { 
"e", "f"} };
        for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-       
+
        // check size, and isSorted(), isDistinct()
        assertEquals(bg1.size(), 3);
        assertFalse(bg1.isSorted());
        assertFalse(bg1.isDistinct());
-       
+
        tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
        DataBag bg2 = new InternalCachedBag(1, 0.5f);
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalCachedBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -873,7 +892,7 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         // check iterator
         Iterator<Tuple> iter = bg3.iterator();
         DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -881,7 +900,7 @@ public class TestDataBag  {
                bg4.add(iter.next());
         }
         assertEquals(bg3, bg4);
-        
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
@@ -894,46 +913,46 @@ public class TestDataBag  {
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
         assertEquals(bg3, bg5);
-        
-        
+
+
         bg4.clear();
-        assertEquals(bg4.size(), 0);        
+        assertEquals(bg4.size(), 0);
     }
-    
-    public void testInternalSortedBag() throws Exception {    
-       
+
+    public void testInternalSortedBag() throws Exception {
+
        // check adding empty tuple
        DataBag bg0 = new InternalSortedBag();
        bg0.add(TupleFactory.getInstance().newTuple());
        bg0.add(TupleFactory.getInstance().newTuple());
        assertEquals(bg0.size(), 2);
-       
+
        // check equal of bags
        DataBag bg1 = new InternalSortedBag();
        assertEquals(bg1.size(), 0);
-       
+
        String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, 
{"c", "d" }};
        for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-       
+
        // check size, and isSorted(), isDistinct()
        assertEquals(bg1.size(), 3);
        assertTrue(bg1.isSorted());
        assertFalse(bg1.isDistinct());
-       
+
        tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
        DataBag bg2 = new InternalSortedBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -941,17 +960,17 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         iter = bg3.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
-        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));         
       
-        
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -959,21 +978,21 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalSortedBag();        
+        DataBag bg5 = new InternalSortedBag();
         for(int j=0; j<3; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg5.add(Util.createTuple(tupleContents[i]));
-               }     
+               }
                bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 9);
         iter = bg5.iterator();
         for(int i=0; i<3; i++) {
@@ -983,21 +1002,21 @@ public class TestDataBag  {
                iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<3; i++) {
-               iter.next().equals(Util.createTuple(new String[] {"e", "f"}));  
 
+               iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalSortedBag();        
+        DataBag bg6 = new InternalSortedBag();
         for(int j=0; j<104; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg6.add(Util.createTuple(tupleContents[i]));
-               }               
+               }
                if (j != 103) {
                        bg6.spill();
                }
         }
-        
+
         assertEquals(bg6.size(), 104*3);
         iter = bg6.iterator();
         for(int i=0; i<104; i++) {
@@ -1007,55 +1026,55 @@ public class TestDataBag  {
                iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<104; i++) {
-               iter.next().equals(Util.createTuple(new String[] {"e", "f"}));  
 
+               iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new SortedDataBag(null);        
+        DataBag bg7 = new SortedDataBag(null);
         for(int j=0; j<104; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg7.add(Util.createTuple(tupleContents[i]));
-               }               
+               }
                if (j != 103) {
                        bg7.spill();
                }
         }
         assertEquals(bg6, bg7);
     }
-    
-    public void testInternalDistinctBag() throws Exception {    
+
+    public void testInternalDistinctBag() throws Exception {
        // check adding empty tuple
        DataBag bg0 = new InternalDistinctBag();
        bg0.add(TupleFactory.getInstance().newTuple());
        bg0.add(TupleFactory.getInstance().newTuple());
        assertEquals(bg0.size(), 1);
-       
+
        // check equal of bags
        DataBag bg1 = new InternalDistinctBag();
        assertEquals(bg1.size(), 0);
-       
+
        String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, 
{"e", "d" }, {"a", "b"}, {"e", "f"}};
        for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-       
+
        // check size, and isSorted(), isDistinct()
        assertEquals(bg1.size(), 3);
        assertFalse(bg1.isSorted());
        assertTrue(bg1.isDistinct());
-       
+
        tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { 
"e", "f"} };
        DataBag bg2 = new InternalDistinctBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalDistinctBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, 
{"a", "b"}, {"e", "f"}};
@@ -1064,13 +1083,13 @@ public class TestDataBag  {
         }
         assertEquals(bg2, bg3);
         assertEquals(bg3.size(), 3);
-              
-        
+
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalDistinctBag(1, 0.0f);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -1078,73 +1097,73 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalDistinctBag();        
+        DataBag bg5 = new InternalDistinctBag();
         for(int j=0; j<3; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg5.add(Util.createTuple(tupleContents[i]));
-               }        
+               }
                bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 3);
-    
-        
+
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalDistinctBag();        
+        DataBag bg6 = new InternalDistinctBag();
         for(int j=0; j<104; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg6.add(Util.createTuple(tupleContents[i]));
-               }               
+               }
                if (j != 103) {
                        bg6.spill();
                }
         }
-        
-        assertEquals(bg6.size(), 3);       
-        
+
+        assertEquals(bg6.size(), 3);
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new DistinctDataBag();        
+        DataBag bg7 = new DistinctDataBag();
         for(int j=0; j<104; j++) {
                for (int i = 0; i < tupleContents.length; i++) {
                        bg7.add(Util.createTuple(tupleContents[i]));
-               }               
+               }
                if (j != 103) {
                        bg7.spill();
                }
         }
         assertEquals(bg6, bg7);
     }
-    
+
     // See PIG-1231
     @Test
     public void testDataBagIterIdempotent() throws Exception {
         DataBag bg0 = new DefaultDataBag();
         processDataBag(bg0, true);
-        
+
         DataBag bg1 = new DistinctDataBag();
         processDataBag(bg1, true);
-        
+
         DataBag bg2 = new InternalDistinctBag();
         processDataBag(bg2, true);
-        
+
         DataBag bg3 = new InternalSortedBag();
         processDataBag(bg3, true);
-        
+
         DataBag bg4 = new SortedDataBag(null);
         processDataBag(bg4, true);
-        
+
         DataBag bg5 = new InternalCachedBag(0, 0);
         processDataBag(bg5, false);
     }
-    
+
     // See PIG-1285
     @Test
     public void testSerializeSingleTupleBag() throws Exception {
@@ -1159,7 +1178,7 @@ public class TestDataBag  {
         dfBag.readFields(dis);
         assertTrue(dfBag.equals(stBag));
     }
-    
+
     // See PIG-2550
     static class MyCustomTuple extends DefaultTuple {
         private static final long serialVersionUID = 8156382697467819543L;
@@ -1184,7 +1203,23 @@ public class TestDataBag  {
         Tuple t2 = iter.next();
         assertTrue(t2.equals(t));
     }
-    
+
+    // See PIG-4260
+    @Test
+    public void testSpillArrayBackedList() throws Exception {
+        Tuple[] tuples = new Tuple[2];
+        tuples[0] = TupleFactory.getInstance().newTuple(1);
+        tuples[0].set(0, "first");
+        tuples[1] = TupleFactory.getInstance().newTuple(1);
+        tuples[1].set(0, "second");
+        DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
+        bag.spill();
+        Iterator<Tuple> iter = bag.iterator();
+        assertEquals(tuples[0], iter.next());
+        assertEquals(tuples[1], iter.next());
+        assertFalse(iter.hasNext());
+    }
+
     void processDataBag(DataBag bg, boolean doSpill) {
         Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
         bg.add(t);
@@ -1194,7 +1229,7 @@ public class TestDataBag  {
         assertTrue(iter.hasNext());
         iter.next();
         assertFalse(iter.hasNext());
-        assertFalse("hasNext should be idempotent", iter.hasNext());        
+        assertFalse("hasNext should be idempotent", iter.hasNext());
     }
 }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Fri Feb 24 
08:19:42 2017
@@ -20,6 +20,9 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.util.Map;
 import java.util.Random;
 
@@ -53,7 +56,7 @@ public class TestDivide {
     public void testOperator() throws ExecException {
         // int TRIALS = 10;
         byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, 
DataType.CHARARRAY,
-                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, 
DataType.LONG,
+                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, 
DataType.LONG, DataType.BIGDECIMAL,
                         DataType.DATETIME, DataType.MAP, DataType.TUPLE };
         // Map<Byte,String> map = GenRandomData.genTypeToNameMap();
         System.out.println("Testing DIVIDE operator");
@@ -250,6 +253,33 @@ public class TestDivide {
                 assertEquals(null, (Long)resl.result);
                 break;
             }
+            case DataType.BIGDECIMAL: {
+                MathContext mc = new 
MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP);
+                BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc);
+                BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc);
+                lt.setValue(inpf1);
+                rt.setValue(inpf2);
+                Result resf = op.getNextBigDecimal();
+                BigDecimal expected = inpf1.divide(inpf2, 2 * 
Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP);
+                assertEquals(expected, (BigDecimal)resf.result);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpf2);
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                // test with null in rhs
+                lt.setValue(inpf1);
+                rt.setValue(null);
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                // test divide by 0
+                lt.setValue(inpf1);
+                rt.setValue(new BigDecimal(0.0f,mc));
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                break;
+            }
             case DataType.DATETIME:
                 DateTime inpdt1 = new DateTime(r.nextLong());
                 DateTime inpdt2 = new DateTime(r.nextLong());

Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Fri Feb 
24 08:19:42 2017
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigRunner;
-import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
@@ -38,16 +38,15 @@ import org.junit.Test;
 
 public class TestEmptyInputDir {
 
-    private static MiniCluster cluster; 
+    private static MiniGenericCluster cluster = 
MiniGenericCluster.buildCluster();
     private static final String EMPTY_DIR = "emptydir";
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
     private static final String PIG_FILE = "test.pig";
 
-    
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        cluster = MiniCluster.buildCluster();
         FileSystem fs = cluster.getFileSystem();
         if (!fs.mkdirs(new Path(EMPTY_DIR))) {
             throw new Exception("failed to create empty dir");
@@ -64,7 +63,35 @@ public class TestEmptyInputDir {
     public static void tearDownAfterClass() throws Exception {
         cluster.shutDown();
     }
-    
+
+    @Test
+    public void testGroupBy() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + EMPTY_DIR + "';");
+        w.println("B = group A by $0;");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+
+            // This assert fails on 205 due to MAPREDUCE-3606
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) 
stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+
     @Test
     public void testSkewedJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -73,31 +100,28 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0, A by $0 using 'skewed';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
+
             assertTrue(stats.isSuccessful());
-            // the sampler job has zero maps
-            MRJobStats js = 
(MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the sampler job has zero maps
+                MRJobStats js = (MRJobStats) 
stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testMergeJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -106,32 +130,28 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'merge';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = 
(MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the indexer job has zero maps
+                MRJobStats js = (MRJobStats) 
stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testFRJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -140,55 +160,44 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'repl';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = 
(MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) 
stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testRegularJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "';");
         w.println("B = load '" + EMPTY_DIR + "';");
-        w.println("C = join B by $0, A by $0;");
+        w.println("C = join B by $0, A by $0 PARALLEL 0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());   
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
-            
+
+            assertTrue(stats.isSuccessful());
+
+            assertEmptyOutputFile();
+
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -203,19 +212,19 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 right outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));              
    
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testLeftOuterJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -224,16 +233,88 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 left outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+
+    @Test
+    public void testBloomJoin() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0, A by $0 using 'bloom';");
+        w.println("D = join A by $0, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(0, stats.getNumberRecords("output1"));
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+        }
+    }
+
+    @Test
+    public void testBloomJoinOuter() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
+        w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
+        w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
+        w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.println("store E into 'output2';");
+        w.println("store F into 'output3';");
+        w.close();
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));              
    
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(2, stats.getNumberRecords("output1"));
+            assertEquals(2, stats.getNumberRecords("output2"));
+            assertEquals(0, stats.getNumberRecords("output3"));
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+            Util.deleteFile(cluster, "output2");
+            Util.deleteFile(cluster, "output3");
         }
     }
+
+    private void assertEmptyOutputFile() throws IllegalArgumentException, 
IOException {
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+        assertTrue(status.isDir());
+        assertEquals(0, status.getLen());
+        // output directory isn't empty. Has one empty file
+        FileStatus[] files = fs.listStatus(status.getPath(), 
Util.getSuccessMarkerPathFilter());
+        assertEquals(1, files.length);
+        assertEquals(0, files[0].getLen());
+        assertTrue(files[0].getPath().getName().startsWith("part-"));
+    }
 }

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java 
Fri Feb 24 08:19:42 2017
@@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc
     private void updatePigProperties(boolean allowErrors, long minErrors,
             double errorThreshold) {
         Properties properties = pigServer.getPigContext().getProperties();
-        properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
                 Boolean.toString(allowErrors));
-        properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
                 Long.toString(minErrors));
-        properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
                 Double.toString(errorThreshold));
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Feb 
24 08:19:42 2017
@@ -291,7 +291,7 @@ public class TestEvalPipeline {
             myMap.put("long", new Long(1));
             myMap.put("float", new Float(1.0));
             myMap.put("double", new Double(1.0));
-            myMap.put("dba", new DataByteArray(new 
String("bytes").getBytes()));
+            myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
             myMap.put("map", mapInMap);
             myMap.put("tuple", tuple);
             myMap.put("bag", bag);
@@ -794,32 +794,31 @@ public class TestEvalPipeline {
     }
 
     @Test
-    public void testMapUDFfail() throws Exception{
+    public void testMapUDFWithImplicitTypeCast() throws Exception{
         int LOOP_COUNT = 2;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
-            for(int j=0;j<LOOP_COUNT;j+=2){
-                ps.println(i+"\t"+j);
-                ps.println(i+"\t"+j);
-            }
+            ps.println(i);
         }
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + 
MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
-        String query = "C = foreach B {"
-        + "generate mymap#'dba' * 10;"
-        + "};";
+        String query = "C = foreach B generate mymap#'dba' * 10; ";
 
         pigServer.registerQuery(query);
-        try {
-            pigServer.openIterator("C");
-            Assert.fail("Error expected.");
-        } catch (Exception e) {
-            e.getMessage().contains("Cannot determine");
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        if(!iter.hasNext()) Assert.fail("No output found");
+        int numIdentity = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
+            ++numIdentity;
         }
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Fri Feb 
24 08:19:42 2017
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ
 import org.junit.Test;
 
 public class TestFindQuantiles {
-    
+
     private static TupleFactory tFact = TupleFactory.getInstance();
     private static final float epsilon = 0.0001f;
-    
+
     @Test
     public void testFindQuantiles() throws Exception {
        final int numSamples = 97778;
@@ -50,7 +49,7 @@ public class TestFindQuantiles {
        System.out.println("sum: " + sum);
        assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-    
+
     @Test
     public void testFindQuantiles2() throws Exception {
        final int numSamples = 30000;
@@ -86,7 +85,7 @@ public class TestFindQuantiles {
     }
 
     private float[] getProbVec(Tuple values) throws Exception {
-        float[] probVec = new float[values.size()];        
+        float[] probVec = new float[values.size()];
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
         }
@@ -95,7 +94,7 @@ public class TestFindQuantiles {
 
     private DataBag generateRandomSortedSamples(int numSamples, int max) 
throws Exception {
         Random rand = new Random(1000);
-        List<Tuple> samples = new ArrayList<Tuple>(); 
+        List<Tuple> samples = new ArrayList<Tuple>();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@ public class TestFindQuantiles {
     }
 
     private DataBag generateUniqueSamples(int numSamples) throws Exception {
-        DataBag samples = BagFactory.getInstance().newDefaultBag(); 
+        DataBag samples = BagFactory.getInstance().newDefaultBag();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
@@ -121,9 +120,9 @@ public class TestFindQuantiles {
 
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
-        
+
         FindQuantiles fq = new FindQuantiles();
-        
+
         Map<String, Object> res = fq.exec(in);
         return res;
     }
@@ -135,12 +134,11 @@ public class TestFindQuantiles {
         InternalMap weightedPartsData = (InternalMap) 
res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());
-        new DiscreteProbabilitySampleGenerator(probVec);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         return sum;
     }
-    
+
 }

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.Test;
@@ -105,6 +106,31 @@ public class TestForEachNestedPlanLocal
     }
 
     @Test
+    public void testNestedCrossTwoRelationsLimit() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+                Storage.tuple(Storage.bag(Storage.tuple(1, 1), 
Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))),
+                Storage.tuple(Storage.bag(Storage.tuple(2, 1), 
Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))),
+                Storage.tuple(Storage.bag(Storage.tuple(3, 1)), 
Storage.bag(Storage.tuple(3, 2))));
+
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as 
(bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});");
+        pig.registerQuery("B = foreach A {"
+                + "crossed = cross bag1, bag2;"
+                + "filtered = filter crossed by f1 == f3;"
+                + "lmt = limit filtered 1;"
+                + "generate FLATTEN(lmt);" + "}");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+
+        pig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
     public void testNestedCrossTwoRelationsComplex() throws Exception {
         File[] tmpFiles = generateDataSetFilesForNestedCross();
         List<Tuple> expectedResults = 
Util.getTuplesFromConstantTupleStringAsByteArray(new String[] {

Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Fri Feb 24 
08:19:42 2017
@@ -20,6 +20,7 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@ public class TestGFCross {
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+        cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -66,6 +68,7 @@ public class TestGFCross {
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+        cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Feb 24 
08:19:42 2017
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.io.FilenameFilter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
@@ -970,7 +971,6 @@ public class TestGrunt {
 
     @Test
     public void testStopOnFailure() throws Throwable {
-        Assume.assumeTrue("Skip this test for TEZ", 
Util.isMapredExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), 
cluster.getProperties());
         PigContext context = server.getPigContext();
         context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1569,4 +1569,20 @@ public class TestGrunt {
         }
         assertTrue(found);
     }
+
+    @Test
+    public void testGruntUtf8() throws Throwable {
+        String command = "mkdir 测试\n" +
+                "quit\n";
+        System.setProperty("jline.WindowsTerminal.directConsole", "false");
+        System.setIn(new ByteArrayInputStream(command.getBytes()));
+        org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
+        File[] partFiles = new File(".").listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) { 
+            return name.equals("测试");
+        }
+        });
+        assertEquals(partFiles.length, 1);
+        new File("测试").delete();
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Feb 
24 08:19:42 2017
@@ -71,12 +71,16 @@ public class TestHBaseStorage {
     private static final String TESTTABLE_1 = "pigtable_1";
     private static final String TESTTABLE_2 = "pigtable_2";
     private static final byte[] COLUMNFAMILY = Bytes.toBytes("pig");
+    private static final byte[] COLUMNFAMILY2 = Bytes.toBytes("pig2");
     private static final String TESTCOLUMN_A = "pig:col_a";
     private static final String TESTCOLUMN_B = "pig:col_b";
     private static final String TESTCOLUMN_C = "pig:col_c";
 
     private static final int TEST_ROW_COUNT = 100;
 
+    private enum TableType {ONE_CF, TWO_CF};
+    private TableType lastTableType;
+
     @BeforeClass
     public static void setUp() throws Exception {
         // This is needed by Pig
@@ -313,13 +317,13 @@ public class TestHBaseStorage {
      */
     @Test
     public void testLoadWithMap_3_col_prefix() throws IOException {
-        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText, 
TableType.TWO_CF);
 
         pig.registerQuery("a = load 'hbase://"
                 + TESTTABLE_1
                 + "' using "
                 + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
-                + "pig:col_* pig:prefixed_col_*"
+                + "pig2:* pig:prefixed_col_*"
                 + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], 
pig_prefix_cf_map:map[]);");
         Iterator<Tuple> it = pig.openIterator("a");
         int count = 0;
@@ -328,24 +332,18 @@ public class TestHBaseStorage {
             Tuple t = it.next();
             LOG.info("LoadFromHBase " + t);
             String rowKey = t.get(0).toString();
-            Map pig_cf_map = (Map) t.get(1);
+            Map pig_secondery_cf_map = (Map) t.get(1);
             Map pig_prefix_cf_map = (Map) t.get(2);
             Assert.assertEquals(3, t.size());
 
             Assert.assertEquals("00".substring((count + "").length()) + count,
                     rowKey);
+            Assert.assertEquals(count,
+                    
Integer.parseInt(pig_secondery_cf_map.get("col_x").toString()));
             Assert.assertEquals("PrefixedText_" + count,
                     ((DataByteArray) 
pig_prefix_cf_map.get("prefixed_col_d")).toString());
             Assert.assertEquals(1, pig_prefix_cf_map.size());
 
-            Assert.assertEquals(count,
-                    Integer.parseInt(pig_cf_map.get("col_a").toString()));
-            Assert.assertEquals(count + 0.0,
-                    Double.parseDouble(pig_cf_map.get("col_b").toString()), 
1e-6);
-            Assert.assertEquals("Text_" + count,
-                    ((DataByteArray) pig_cf_map.get("col_c")).toString());
-            Assert.assertEquals(3, pig_cf_map.size());
-
             count++;
         }
         Assert.assertEquals(TEST_ROW_COUNT, count);
@@ -434,6 +432,39 @@ public class TestHBaseStorage {
         LOG.info("LoadFromHBase done");
     }
 
+    public void testLoadWithFixedAndPrefixedCols3() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.UTF8PlainText);
+
+        pig.registerQuery("a = load 'hbase://"
+                + TESTTABLE_1
+                + "' using "
+                + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + "pig:* pig:prefixed_col_*"
+                + "','-loadKey') as (rowKey:chararray, pig_cf_map:map[], 
pig_prefix_cf_map:map[]);");
+        Iterator<Tuple> it = pig.openIterator("a");
+        int count = 0;
+        LOG.info("LoadFromHBase Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            LOG.info("LoadFromHBase " + t);
+            String rowKey = (String) t.get(0);
+            Map pig_cf_map = (Map) t.get(1);
+            Map pig_prefix_cf_map = (Map) t.get(2);
+            Assert.assertEquals(3, t.size());
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals("PrefixedText_" + count,
+                    ((DataByteArray) 
pig_cf_map.get("prefixed_col_d")).toString());
+            Assert.assertEquals(1, pig_cf_map.size());
+            Assert.assertEquals(1, pig_prefix_cf_map.size());
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("LoadFromHBase done");
+    }
+
     /**
      *     * Test Load from hbase with map parameters and with a
      *     static column in different order
@@ -1486,22 +1517,36 @@ public class TestHBaseStorage {
                 + "') as (rowKey:chararray,col_a:int, col_b:double, 
col_c:chararray);");
     }
 
+    private HTable prepareTable(String tableName, boolean initData,
+            DataFormat format) throws IOException {
+        return prepareTable(tableName, initData, format, TableType.ONE_CF);
+    }
     /**
      * Prepare a table in hbase for testing.
      *
      */
     private HTable prepareTable(String tableName, boolean initData,
-            DataFormat format) throws IOException {
+            DataFormat format, TableType type) throws IOException {
         // define the table schema
         HTable table = null;
         try {
-            deleteAllRows(tableName);
+            if (lastTableType == type) {
+                deleteAllRows(tableName);
+            } else {
+                util.deleteTable(tableName);
+            }
         } catch (Exception e) {
             // It's ok, table might not exist.
         }
         try {
-        table = util.createTable(Bytes.toBytesBinary(tableName),
-                COLUMNFAMILY);
+            if (type == TableType.TWO_CF) {
+                table = util.createTable(Bytes.toBytesBinary(tableName),
+                        new byte[][]{COLUMNFAMILY, COLUMNFAMILY2});
+            } else {
+                table = util.createTable(Bytes.toBytesBinary(tableName),
+                        COLUMNFAMILY);
+            }
+            lastTableType = type;
         } catch (Exception e) {
             table = new HTable(conf, Bytes.toBytesBinary(tableName));
         }
@@ -1528,6 +1573,11 @@ public class TestHBaseStorage {
                     // prefixed_col_d: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             Bytes.toBytes("PrefixedText_" + i));
+                    // another cf
+                    if (type == TableType.TWO_CF) {
+                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+                                Bytes.toBytes(i));
+                    }
                     table.put(put);
                 } else {
                     // row key: string type
@@ -1548,6 +1598,11 @@ public class TestHBaseStorage {
                     // prefixed_col_d: string type
                     put.add(COLUMNFAMILY, Bytes.toBytes("prefixed_col_d"),
                             ("PrefixedText_" + i).getBytes());
+                    // another cf
+                    if (type == TableType.TWO_CF) {
+                        put.add(COLUMNFAMILY2, Bytes.toBytes("col_x"),
+                                (i + "").getBytes());
+                    }
                     table.put(put);
                 }
             }

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Fri 
Feb 24 08:19:42 2017
@@ -63,7 +63,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -131,7 +130,7 @@ public class TestJobControlCompiler {
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
     // guava jar is not shipped with Hadoop 2.x
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 
HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, 
fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, 
distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
@@ -235,22 +234,12 @@ public class TestJobControlCompiler {
           // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
           System.out.println("cache.files= " + Arrays.toString(cacheURIs));
           System.out.println("classpath.files= " + 
Arrays.toString(fileClassPaths));
-          if (HadoopShims.isHadoopYARN()) {
-              // Default jars - 5 (pig, antlr, joda-time, automaton)
-              // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, 
udf2.jar, udf1.jar, another.jar
-              Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 
9,
-                      Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
-              Assert.assertEquals("size 9 for " + 
Arrays.toString(fileClassPaths), 9,
-                      Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
-          } else {
-              // Default jars - 5. Has guava in addition
-              // There will be same entries duplicated for udf.jar and udf2.jar
-              Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 
12,
-                      Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
-              Assert.assertEquals("size 12 for " + 
Arrays.toString(fileClassPaths), 12,
-                      Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
-          }
-
+          // Default jars - 5 (pig, antlr, joda-time, automaton)
+          // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, 
udf2.jar, udf1.jar, another.jar
+          Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+                  Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
+          Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 
9,
+                  Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
           // Count occurrences of the resources
           Map<String, Integer> occurrences = new HashMap<String, Integer>();
 
@@ -259,22 +248,12 @@ public class TestJobControlCompiler {
               val = (val == null) ? 1 : ++val;
               occurrences.put(cacheURI.toString(), val);
           }
-          if (HadoopShims.isHadoopYARN()) {
-              Assert.assertEquals(9, occurrences.size());
-          } else {
-              Assert.assertEquals(10, occurrences.size()); //guava jar in 
addition
-          }
+          Assert.assertEquals(9, occurrences.size());
 
           for (String file : occurrences.keySet()) {
-              if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || 
file.endsWith("udf2.jar"))) {
-                  // Same path added twice which is ok. It should not be a 
shipped to hdfs temp path.
-                  // We assert path is same by checking count
-                  Assert.assertEquals("Two occurrences for " + file, 2, (int) 
occurrences.get(file));
-              } else {
-                  // check that only single occurrence even though we added 
once to dist cache (simulating via Oozie)
-                  // and second time through pig register jar when there is 
symlink
-                  Assert.assertEquals("One occurrence for " + file, 1, (int) 
occurrences.get(file));
-              }
+              // check that only single occurrence even though we added once 
to dist cache (simulating via Oozie)
+              // and second time through pig register jar when there is symlink
+              Assert.assertEquals("One occurrence for " + file, 1, (int) 
occurrences.get(file));
           }
       }
 

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java 
Fri Feb 24 08:19:42 2017
@@ -20,16 +20,34 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import junit.framework.Assert;
 
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
+import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.visitor.LineageFindRelVisitor;
@@ -42,6 +60,13 @@ import org.junit.Test;
 
 public class TestLineageFindRelVisitor {
 
+    private PigServer pig ;
+
+    @Before
+    public void setUp() throws Exception{
+        pig = new PigServer(Util.getLocalTestMode()) ;
+    }
+
     public static class SillyLoadCasterWithExtraConstructor extends 
Utf8StorageConverter {
         public SillyLoadCasterWithExtraConstructor(String ignored) {
             super();
@@ -69,6 +94,13 @@ public class TestLineageFindRelVisitor {
         }
     }
 
+    public static class ToTupleWithCustomLoadCaster extends 
org.apache.pig.builtin.TOTUPLE {
+        @Override
+        public LoadCaster getLoadCaster() throws IOException {
+            return new SillyLoadCasterWithExtraConstructor("ignored");
+        }
+    }
+
     @Test
     public void testhaveIdenticalCasters() throws Exception {
         LogicalPlan lp = new LogicalPlan();
@@ -123,6 +155,169 @@ public class TestLineageFindRelVisitor {
                            (Boolean) testMethod.invoke(lineageFindRelVisitor,
                                      casterWithExtraConstuctorSpec, 
casterWithExtraConstuctorSpec) );
 
-        Assert.assertEquals("Loader should be instantiated at most once.", 
SillyLoaderWithLoadCasterWithExtraConstructor.counter, 1);
+        Assert.assertEquals("Loader should be instantiated at most once.", 1, 
SillyLoaderWithLoadCasterWithExtraConstructor.counter);
+    }
+
+    @Test
+    public void testIdenticalColumnUDFForwardingLoadCaster() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+                Storage.tuple(Storage.map(
+                                 "key1",new DataByteArray("aaa"),
+                                 "key2",new DataByteArray("bbb"),
+                                 "key3",new DataByteArray("ccc"))),
+                Storage.tuple(Storage.map(
+                                 "key1",new DataByteArray("zzz"),
+                                 "key2",new DataByteArray("yyy"),
+                                 "key3",new DataByteArray("xxx"))));
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as 
(m:[bytearray]);");
+        pig.registerQuery("B = foreach A GENERATE m#'key1' as key1, m#'key2' 
as key2; "
+                // this equal comparison creates implicit typecast to chararray
+                // which requires loadcaster
+                + "C = FILTER B by key1 == 'aaa' and key2 == 'bbb';");
+        pig.registerQuery("store C into 'output' using mock.Storage();");
+
+        pig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"('aaa', 'bbb')"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testUDFForwardingLoadCaster() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+                Storage.tuple(new DataByteArray("aaa")),
+                Storage.tuple(new DataByteArray("bbb")));
+        pig.setBatchOn();
+        String query = "A = load 'input' using mock.Storage() as 
(a1:bytearray);"
+            + "B = foreach A GENERATE TOTUPLE(a1) as tupleA;"
+            + "C = foreach B GENERATE (chararray) tupleA.a1;"  //using 
loadcaster
+            + "store C into 'output' using mock.Storage();";
+
+        LogicalPlan lp = Util.parse(query, pig.getPigContext());
+        Util.optimizeNewLP(lp);
+
+        CastFinder cf = new CastFinder(lp);
+        cf.visit();
+        Assert.assertEquals("There should be only one typecast expression.", 
1, cf.casts.size());
+        Assert.assertEquals("Loadcaster should be coming from the Load", 
"mock.Storage", cf.casts.get(0).getFuncSpec().getClassName());
+
+        pig.registerQuery(query);
+        pig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"('aaa')", "('bbb')"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testUDFgetLoadCaster() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+                Storage.tuple(new DataByteArray("aaa")),
+                Storage.tuple(new DataByteArray("bbb")));
+        pig.setBatchOn();
+        String query = "A = load 'input' using mock.Storage() as 
(a1:bytearray);"
+            + "B = foreach A GENERATE 
org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster(a1) 
as tupleA;"
+            + "C = foreach B GENERATE (chararray) tupleA.a1;" //using 
loadcaster
+            + "store C into 'output' using mock.Storage();";
+
+        pig.registerQuery(query);
+        pig.executeBatch();
+
+        LogicalPlan lp = Util.parse(query, pig.getPigContext());
+        Util.optimizeNewLP(lp);
+
+        CastFinder cf = new CastFinder(lp);
+        cf.visit();
+        Assert.assertEquals("There should be only one typecast expression.", 
1, cf.casts.size());
+        Assert.assertEquals("Loadcaster should be coming from the UDF", 
"org.apache.pig.test.TestLineageFindRelVisitor$ToTupleWithCustomLoadCaster", 
cf.casts.get(0).getFuncSpec().getClassName());
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"('aaa')", "('bbb')"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testUDFForwardingLoadCasterWithMultipleParams() throws 
Exception{
+        File inputfile = Util.createFile(new String[]{"123","456","789"});
+
+        pig.registerQuery("A = load '"
+                + inputfile.toString()
+                + "' using PigStorage() as (a1:bytearray);\n");
+        pig.registerQuery("B = load '"
+                + inputfile.toString()
+                + "' using PigStorage() as (b1:bytearray);\n");
+        pig.registerQuery("C = join A by a1, B by b1;\n");
+        pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as 
tupleD;\n");
+        pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
+        Iterator<Tuple> iter  = pig.openIterator("E");
+
+        Assert.assertEquals("123", iter.next().get(0));
+        Assert.assertEquals("456", iter.next().get(0));
+        Assert.assertEquals("789", iter.next().get(0));
+    }
+
+    @Test
+    public void testNegativeUDFForwardingLoadCasterWithMultipleParams() throws 
Exception {
+        File inputfile = Util.createFile(new String[]{"123","456","789"});
+
+        pig.registerQuery("A = load '"
+                + inputfile.toString()
+                + "' using PigStorage() as (a1:bytearray);\n");
+        pig.registerQuery("B = load '"
+                + inputfile.toString()
+                + "' using 
org.apache.pig.test.TestLineageFindRelVisitor$SillyLoaderWithLoadCasterWithExtraConstructor2()
 as (b1:bytearray);\n");
+        pig.registerQuery("C = join A by a1, B by b1;\n");
+        pig.registerQuery("D = FOREACH C GENERATE TOTUPLE(a1,b1) as 
tupleD;\n");
+        pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
+        try {
+            Iterator<Tuple> iter  = pig.openIterator("E");
+
+            // this should fail since above typecast cannot determine which
+            // loadcaster to use (one from PigStroage and another from
+            // SillyLoaderWithLoadCasterWithExtraConstructor2)
+            fail("Above typecast should fail since it cannot determine which 
loadcaster to use.");
+        } catch (IOException e) {
+            Assert.assertTrue(e.getMessage().contains("Unable to open iterator 
for alias E"));
+        }
+
+
+    }
+
+    /**
+     * Find all casts in the plan (Copied from 
TestTypeCheckingValidatorNewLP.java)
+     */
+    class CastFinder extends AllExpressionVisitor {
+        List<CastExpression> casts = new ArrayList<CastExpression>();
+
+        public CastFinder(OperatorPlan plan)
+                throws FrontendException {
+            super(plan, new DependencyOrderWalker(plan));
+        }
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(
+                LogicalExpressionPlan exprPlan) throws FrontendException {
+            return new CastExpFinder(exprPlan, new 
ReverseDependencyOrderWalker(exprPlan));
+        }
+
+        class CastExpFinder extends LogicalExpressionVisitor{
+            protected CastExpFinder(OperatorPlan p, PlanWalker walker)
+            throws FrontendException {
+                super(p, walker);
+            }
+
+            @Override
+            public void visit(CastExpression cExp){
+                casts.add(cExp);
+            }
+        }
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoad.java Fri Feb 24 
08:19:42 2017
@@ -67,6 +67,8 @@ public class TestLoad {
 
     static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
+    private static final String WORKING_DIR = "/tmp/test" + 
java.util.UUID.randomUUID();
+
     @Before
     public void setUp() throws Exception {
         FileLocalizer.deleteTempFiles();
@@ -118,7 +120,7 @@ public class TestLoad {
     public void testLoadRemoteRel() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("test","/tmp/test");
+            checkLoadPath("test", WORKING_DIR + "/test");
         }
     }
 
@@ -127,7 +129,7 @@ public class TestLoad {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
             boolean noConversionExpected = true;
-            checkLoadPath("/tmp/test","/tmp/test", noConversionExpected);
+            checkLoadPath(WORKING_DIR + "/test", WORKING_DIR + "/test", 
noConversionExpected);
         }
     }
 
@@ -135,7 +137,7 @@ public class TestLoad {
     public void testLoadRemoteRelScheme() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("test","/tmp/test");
+            checkLoadPath("test", WORKING_DIR + "/test");
         }
     }
 
@@ -143,11 +145,11 @@ public class TestLoad {
     public void testLoadRemoteAbsScheme() throws Exception {
         pc = servers[0].getPigContext();
         boolean noConversionExpected = true;
-        checkLoadPath("hdfs:/tmp/test","hdfs:/tmp/test", noConversionExpected);
+        checkLoadPath("hdfs:" + WORKING_DIR + "/test","hdfs:" + WORKING_DIR + 
"/test", noConversionExpected);
 
         // check if a location 'hdfs:<abs path>' can actually be read using 
PigStorage
         String[] inputFileNames = new String[] {
-                "/tmp/TestLoad-testLoadRemoteAbsSchema-input.txt"};
+                WORKING_DIR + "/TestLoad-testLoadRemoteAbsSchema-input.txt"};
         testLoadingMultipleFiles(inputFileNames, "hdfs:" + inputFileNames[0]);
     }
 
@@ -162,7 +164,7 @@ public class TestLoad {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
             boolean noConversionExpected = true;
-            checkLoadPath("/tmp/foo/../././","/tmp/foo/.././.", 
noConversionExpected);
+            checkLoadPath(WORKING_DIR + "/foo/../././", WORKING_DIR + 
"/foo/.././.", noConversionExpected);
         }
     }
 
@@ -170,7 +172,7 @@ public class TestLoad {
     public void testGlobChars() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("t?s*","/tmp/t?s*");
+            checkLoadPath("t?s*", WORKING_DIR + "/t?s*");
         }
     }
 
@@ -178,7 +180,7 @@ public class TestLoad {
     public void testCommaSeparatedString() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            
checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+            checkLoadPath("usr/pig/a,b", WORKING_DIR + "/usr/pig/a,"+ 
WORKING_DIR + "/b");
         }
     }
 
@@ -186,7 +188,7 @@ public class TestLoad {
     public void testCommaSeparatedString2() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+            checkLoadPath("t?s*,test", WORKING_DIR + "/t?s*,"+ WORKING_DIR + 
"/test");
         }
     }
 
@@ -196,14 +198,14 @@ public class TestLoad {
         PigServer pig = servers[0];
         pc = pig.getPigContext();
         boolean noConversionExpected = true;
-        checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3",
-                "hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3", 
noConversionExpected );
+        checkLoadPath("hdfs:"+ WORKING_DIR + "/test,hdfs:" + WORKING_DIR + 
"/test2,hdfs:" + WORKING_DIR + "/test3",
+                "hdfs:" + WORKING_DIR + "/test,hdfs:" + WORKING_DIR + 
"/test2,hdfs:" + WORKING_DIR + "/test3", noConversionExpected );
 
         // check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually 
be
         // read using PigStorage
         String[] inputFileNames = new String[] {
-                "/tmp/TestLoad-testCommaSeparatedString3-input1.txt",
-                "/tmp/TestLoad-testCommaSeparatedString3-input2.txt"};
+                WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input1.txt",
+                WORKING_DIR + 
"/TestLoad-testCommaSeparatedString3-input2.txt"};
         String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
         inputFileNames[1];
         testLoadingMultipleFiles(inputFileNames, inputString);
@@ -214,7 +216,7 @@ public class TestLoad {
     public void testCommaSeparatedString4() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            
checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+            checkLoadPath("usr/pig/{a,c},usr/pig/b", WORKING_DIR + 
"/usr/pig/{a,c}," + WORKING_DIR + "/usr/pig/b");
         }
     }
 
@@ -222,18 +224,18 @@ public class TestLoad {
     public void testCommaSeparatedString5() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            
checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+            checkLoadPath("/usr/pig/{a,c},b", "/usr/pig/{a,c}," + WORKING_DIR 
+ "/b");
         }
 
         // check if a location '<abs path>,<relative path>' can actually be
         // read using PigStorage
-        String loadLocationString = 
"/tmp/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
-        "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current 
working dir is set to /tmp in checkLoadPath()
+        String loadLocationString = WORKING_DIR + 
"/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
+        "TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current 
working dir is set to WORKING_DIR in checkLoadPath()
 
         String[] inputFileNames = new String[] {
-                "/tmp/TestLoad-testCommaSeparatedStringMixed-input1.txt",
-                "/tmp/TestLoad-testCommaSeparatedStringMixed-input2.txt",
-                "/tmp/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
+                WORKING_DIR + 
"/TestLoad-testCommaSeparatedStringMixed-input1.txt",
+                WORKING_DIR + 
"/TestLoad-testCommaSeparatedStringMixed-input2.txt",
+                WORKING_DIR + 
"/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
         pc = servers[0].getPigContext(); // test in map reduce mode
         testLoadingMultipleFiles(inputFileNames, loadLocationString);
     }
@@ -242,7 +244,7 @@ public class TestLoad {
     public void testCommaSeparatedString6() throws Exception {
         for (PigServer pig : servers) {
             pc = pig.getPigContext();
-            
checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+            checkLoadPath("usr/pig/{a,c},/usr/pig/b", WORKING_DIR + 
"/usr/pig/{a,c},/usr/pig/b");
         }
     }
 
@@ -250,7 +252,7 @@ public class TestLoad {
     public void testNonDfsLocation() throws Exception {
         String nonDfsUrl = "har:///user/foo/f.har";
         String query = "a = load '" + nonDfsUrl + "' using 
PigStorage('\t','-noschema');" +
-                       "store a into 'output';";
+                       "store a into 'pigoutput';";
         LogicalPlan lp = Util.buildLp(servers[1], query);
         LOLoad load = (LOLoad) lp.getSources().get(0);
         nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
@@ -308,7 +310,7 @@ public class TestLoad {
             
pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b);
 
             DataStorage dfs = pc.getDfs();
-            dfs.setActiveContainer(dfs.asContainer("/tmp"));
+            dfs.setActiveContainer(dfs.asContainer(WORKING_DIR));
             Map<String, String> fileNameMap = new HashMap<String, String>();
 
             QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", 
fileNameMap);

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java 
(original)
+++ 
pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java 
Fri Feb 24 08:19:42 2017
@@ -45,12 +45,8 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo';";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", 
"hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", 
"hive-serde",
+                "hive-shims-0.23", "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }
@@ -61,12 +57,8 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo' using OrcStorage;";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", 
"hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", 
"hive-serde",
+                "hive-shims-0.23", "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLocal.java Fri Feb 24 
08:19:42 2017
@@ -39,6 +39,7 @@ import org.apache.pig.builtin.PigStorage
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -166,7 +167,8 @@ public class TestLocal {
         public Tuple getNext() throws IOException {
             if (count < COUNT) {
 
-                   Tuple t = 
TupleFactory.getInstance().newTuple(Integer.toString(count++));
+                   Tuple t = new DefaultTuple();
+                   t.append(Integer.toString(count++));
                    return t;
 
             }


Reply via email to