Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Thu Nov 
27 12:49:54 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,10 +32,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -44,6 +50,7 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.LogUtils;
@@ -347,7 +354,7 @@ public class TestEvalPipeline2 {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "' AS (num:int);");
         pigServer.registerQuery("B = order A by num parallel 2;");
         pigServer.registerQuery("C = limit B 10;");
@@ -376,7 +383,7 @@ public class TestEvalPipeline2 {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "' AS (num:int);");
         pigServer.registerQuery("B = order A by num parallel 2;");
         pigServer.registerQuery("C = limit B 10;");
@@ -409,7 +416,7 @@ public class TestEvalPipeline2 {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "' AS (num:int);");
         pigServer.registerQuery("B = order A by num desc parallel 2;");
         pigServer.registerQuery("C = limit B 10;");
@@ -456,8 +463,8 @@ public class TestEvalPipeline2 {
         ps2.println("2\t2");
         ps2.close();
 
-        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(Util.encodeEscape(tmpFile1.toString()), 
pigServer.getPigContext()) + "' AS (a0, a1, a2);");
-        pigServer.registerQuery("B = LOAD '" + 
Util.generateURI(Util.encodeEscape(tmpFile2.toString()), 
pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("A = LOAD '" + 
Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, 
a1, a2);");
+        pigServer.registerQuery("B = LOAD '" + 
Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, 
b1);");
         pigServer.registerQuery("C = LIMIT B 100;");
         pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
         Iterator<Tuple> iter = pigServer.openIterator("D");
@@ -1424,7 +1431,7 @@ public class TestEvalPipeline2 {
     public void testNonStandardDataWithoutFetch() throws Exception{
         Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", 
Util.isMapredExecType(cluster.getExecType()));
         Properties props = pigServer.getPigContext().getProperties();
-        props.setProperty(PigConfiguration.OPT_FETCH, "false");
+        props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false");
         String[] input1 = {
                 "0",
         };
@@ -1441,7 +1448,7 @@ public class TestEvalPipeline2 {
             }
         }
         finally {
-            props.setProperty(PigConfiguration.OPT_FETCH, "true");
+            props.setProperty(PigConfiguration.PIG_OPT_FETCH, "true");
         }
     }
 
@@ -1604,4 +1611,53 @@ public class TestEvalPipeline2 {
 
         Assert.assertFalse(iter.hasNext());
     }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCrossAfterGroupAll() throws Exception{
+        String[] input = {
+                "1\tA",
+                "2\tB",
+                "3\tC",
+                "4\tD",
+        };
+
+        Util.createInputFile(cluster, "table_testCrossAfterGroupAll", input);
+
+        try {
+            
pigServer.getPigContext().getProperties().setProperty("pig.exec.reducers.bytes.per.reducer",
 "40");
+            pigServer.registerQuery("A = load 'table_testCrossAfterGroupAll' 
as (a0:int, a1:chararray);");
+            pigServer.registerQuery("B = group A all;");
+            pigServer.registerQuery("C = foreach B generate COUNT(A);");
+            pigServer.registerQuery("D = cross A, C;");
+            Path output = 
FileLocalizer.getTemporaryPath(pigServer.getPigContext());
+            ExecJob job = pigServer.store("D", output.toString());
+            FileSystem fs = output.getFileSystem(cluster.getConfiguration());
+            FileStatus[] partFiles = fs.listStatus(output, new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    if (path.getName().startsWith("part")) {
+                        return true;
+                    }
+                    return false;
+                }
+            });
+            // auto-parallelism is 2 in MR, 20 in Tez, so check >=2
+            Assert.assertTrue(partFiles.length >= 2);
+            // Check the output
+            Iterator<Tuple> iter = job.getResults();
+            List<Tuple> results = new ArrayList<Tuple>();
+            while (iter.hasNext()) {
+                results.add(iter.next());
+            }
+            Collections.sort(results);
+            Assert.assertEquals(4, results.size());
+            Assert.assertEquals("(1,A,4)", results.get(0).toString());
+            Assert.assertEquals("(2,B,4)", results.get(1).toString());
+            Assert.assertEquals("(3,C,4)", results.get(2).toString());
+            Assert.assertEquals("(4,D,4)", results.get(3).toString());
+        } finally {
+            
pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
+        }
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Thu 
Nov 27 12:49:54 2014
@@ -107,7 +107,7 @@ public class TestEvalPipelineLocal {
         File f1 = createFile(new String[]{"a:1","b:1","a:1"});
 
         pigServer.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(f1.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(f1.toString(), pigServer.getPigContext())
                 + "' using " + PigStorage.class.getName() + "(':');");
         pigServer.registerQuery("b = foreach a generate 1-1/1;");
         Iterator<Tuple> iter  = pigServer.openIterator("b");
@@ -125,10 +125,10 @@ public class TestEvalPipelineLocal {
         File f2 = createFile(new String[]{"b","b","a"});
         
         pigServer.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(f1.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(f1.toString(), pigServer.getPigContext())
                 + "' using " + PigStorage.class.getName() + "(':');");
         pigServer.registerQuery("b = load '"
-                + Util.generateURI(Util.encodeEscape(f2.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(f2.toString(), pigServer.getPigContext())
                 + "';");
         pigServer.registerQuery("c = cogroup a by $0, b by $0;");        
         pigServer.registerQuery("d = foreach c generate 
flatten($1),flatten($2);");
@@ -151,7 +151,7 @@ public class TestEvalPipelineLocal {
         pw.println("a");
         pw.close();
         pigServer.registerQuery("a = foreach (load '"
-                + Util.generateURI(Util.encodeEscape(f.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(f.toString(), pigServer.getPigContext())
                 + "') generate 1, flatten(" + MyBagFunction.class.getName()
                 + "(*));");
 //        pigServer.registerQuery("b = foreach a generate $0, flatten($1);");
@@ -340,11 +340,11 @@ public class TestEvalPipelineLocal {
         expectedResults.put("conference", 1);
         
         pigServer.registerQuery("newsArticles = LOAD '"
-                + Util.generateURI(Util.encodeEscape(newsFile.toString()), 
pigServer
+                + Util.generateURI(newsFile.toString(), pigServer
                         .getPigContext()) + "' USING "
                 + TextLoader.class.getName() + "();");
         pigServer.registerQuery("queryLog = LOAD '"
-                + Util.generateURI(Util.encodeEscape(queryLogFile.toString()), 
pigServer
+                + Util.generateURI(queryLogFile.toString(), pigServer
                         .getPigContext()) + "';");
 
         pigServer.registerQuery("titleNGrams = FOREACH newsArticles GENERATE 
flatten(" + TitleNGrams.class.getName() + "(*));");
@@ -401,7 +401,7 @@ public class TestEvalPipelineLocal {
         
         String tmpOutputFile = 
FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toString();
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         if (eliminateDuplicates){
             pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) 
PARALLEL 10;");
@@ -448,7 +448,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -488,7 +488,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -530,7 +530,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = limit A 5;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
@@ -550,7 +550,7 @@ public class TestEvalPipelineLocal {
                 new String[] {"{(f1, f2),(f3, 
f4)}\t(1,2)\t[key1#value1,key2#value2]"});
         
         pigServer.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(input.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(input.toString(), pigServer.getPigContext())
                 + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, 
m#'key1', m#'key2';");
@@ -564,7 +564,7 @@ public class TestEvalPipelineLocal {
         
         //test with BinStorage
         pigServer.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(input.toString()), 
pigServer.getPigContext())
+                + Util.generateURI(input.toString(), pigServer.getPigContext())
                 + "' using PigStorage() "
                 + "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);");
         String output = "./TestEvalPipeline-testComplexData";
@@ -781,7 +781,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = foreach A generate "
                 + MapUDF.class.getName() + "($0) as mymap;"); // the argument
@@ -825,7 +825,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = foreach A generate "
                 + MapUDF.class.getName() + "($0) as mymap;"); // the argument
@@ -873,7 +873,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
@@ -916,7 +916,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -964,7 +964,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = distinct A;");
         String query = "C = foreach B {"
@@ -1006,7 +1006,7 @@ public class TestEvalPipelineLocal {
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pigServer
+                + Util.generateURI(tmpFile.toString(), pigServer
                         .getPigContext()) + "';");
         pigServer.registerQuery("B = distinct A ;"); //the argument does not 
matter
         pigServer.registerQuery("C = foreach B generate FLATTEN(" + 
Identity.class.getName() + "($0, $1));"); //the argument does not matter
@@ -1088,7 +1088,7 @@ public class TestEvalPipelineLocal {
     @Test
     public void testSetLocationCalledInFE() throws Exception {
         File f1 = createFile(new String[]{"a","b"});
-        pigServer.registerQuery("a = load '" + 
Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext())
+        pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), 
pigServer.getPigContext())
                 + "' using " + SetLocationTestLoadFunc.class.getName()
                 + "();");
         pigServer.registerQuery("b = order a by $0;");
@@ -1101,7 +1101,7 @@ public class TestEvalPipelineLocal {
     @Test
     public void testGroupByTuple() throws Exception {
         File f1 = createFile(new String[]{"1\t2\t3","4\t5\t6"});
-        pigServer.registerQuery("a = load '" + 
Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext())
+        pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), 
pigServer.getPigContext())
                 + "' as (x:int, y:int, z:int);");
         pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t, 
z;");
         pigServer.registerQuery("c = group b by t;");
@@ -1115,7 +1115,7 @@ public class TestEvalPipelineLocal {
     // See PIG-3060
     public void testFlattenEmptyBag() throws Exception {
         File f1 = createFile(new String[]{"2\t{}","3\t{(1),(2)}", "4\t{}"});
-        pigServer.registerQuery("A = load '" + 
Util.generateURI(Util.encodeEscape(f1.toString()), pigServer.getPigContext())
+        pigServer.registerQuery("A = load '" + Util.generateURI(f1.toString(), 
pigServer.getPigContext())
                 + "'  as (a0:int, a1:bag{(t:chararray)});");
         pigServer.registerQuery("B = group A by a0;");
         pigServer.registerQuery("C = foreach B { c1 = foreach A generate 
FLATTEN(a1); generate COUNT(c1);};");
@@ -1231,4 +1231,30 @@ public class TestEvalPipelineLocal {
         Iterator<Tuple> iter = pigServer.openIterator("D");
         Assert.assertEquals(iter.next().toString(), "(lily)");
     }
+
+    public static class TOTUPLENOINNERSCHEMA extends EvalFunc<Tuple> {
+        @Override
+        public Tuple exec(Tuple input) throws IOException {
+           return input;
+        }
+    }
+
+    // see PIG-4298
+    @Test
+    public void testBytesRawComparatorDesc() throws Exception{
+        File f1 = createFile(new String[]{"2", "1", "4", "3"});
+        
+        pigServer.registerQuery("a = load '" + Util.generateURI(f1.toString(), 
pigServer.getPigContext())
+                + "' as (value:long);");
+        pigServer.registerQuery("b = foreach a generate " + 
TOTUPLENOINNERSCHEMA.class.getName() + "(value);");
+        pigServer.registerQuery("c = foreach b generate flatten($0);");
+        pigServer.registerQuery("d = order c by $0 desc;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        Assert.assertEquals(iter.next().toString(), "(4)");
+        Assert.assertEquals(iter.next().toString(), "(3)");
+        Assert.assertEquals(iter.next().toString(), "(2)");
+        Assert.assertEquals(iter.next().toString(), "(1)");
+        Assert.assertFalse(iter.hasNext());
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestFetch.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFetch.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFetch.java Thu Nov 27 
12:49:54 2014
@@ -96,7 +96,7 @@ public class TestFetch {
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
         // force direct fetch mode
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH,
 "true");
+        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_OPT_FETCH,
 "true");
     }
 
     @Test
@@ -127,7 +127,8 @@ public class TestFetch {
                 .compile(lp, null);
 
         boolean planFetchable = 
FetchOptimizer.isPlanFetchable(pigServer.getPigContext(), pp);
-        assertTrue(planFetchable);
+        //plan is not fetchable since limit is not pushed up to the loader
+        assertFalse(planFetchable);
 
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlan.java Thu 
Nov 27 12:49:54 2014
@@ -59,7 +59,7 @@ public class TestForEachNestedPlan {
     @Test
     public void testInnerOrderBy() throws Exception {
         for (int i = 0; i < nullFlags.length; i++) {
-            System.err.println("Running testInnerOrderBy with nullFlags set to 
:"
+            System.out.println("Running testInnerOrderBy with nullFlags set to 
:"
                             + nullFlags[i]);
             File tmpFile = genDataSetFile1(nullFlags[i]);
             pig.registerQuery("a = load '"

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java 
Thu Nov 27 12:49:54 2014
@@ -52,7 +52,7 @@ public class TestForEachNestedPlanLocal 
             System.err.println("Running testInnerOrderBy with nullFlags set to 
:" + nullFlags[i]);
             File tmpFile = genDataSetFile1(nullFlags[i]);
             pig.registerQuery("a = load '"
-                    + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                    + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                     + "'; ");
             pig.registerQuery("b = group a by $0; ");
             pig.registerQuery("c = foreach b { " + "     c1 = order $1 by *; "
@@ -73,7 +73,7 @@ public class TestForEachNestedPlanLocal 
     public void testInnerLimit() throws Exception {
         File tmpFile = genDataSetFileOneGroup();
         pig.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("b = group a by $0; ");
         pig.registerQuery("c = foreach b { " + "     c1 = limit $1 5; "
@@ -289,4 +289,4 @@ public class TestForEachNestedPlanLocal 
         profilePS.close();
         return new File[] { userFile, sessionFile, profileFile };
     }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Thu Nov 27 
12:49:54 2014
@@ -20,16 +20,16 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.GFCross;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.Test;
 
 public class TestGFCross {
-    
+
     // Test GFCross returns the correct number of default
     // join groups
     @Test
@@ -49,7 +49,7 @@ public class TestGFCross {
     @Test
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
-        cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "1");
+        cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -65,7 +65,7 @@ public class TestGFCross {
     @Test
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
-        cfg.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + ".1", "10");
+        cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Thu Nov 27 
12:49:54 2014
@@ -50,6 +50,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.JavaCompilerHelper;
 import org.apache.pig.test.Util.ProcessReturnInfo;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.pigscript.parser.ParseException;
@@ -66,7 +67,7 @@ public class TestGrunt {
 
     @BeforeClass
     public static void oneTimeSetup() throws Exception {
-        cluster.setProperty(PigConfiguration.OPT_MULTIQUERY,"true");
+        cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,"true");
     }
 
     @AfterClass
@@ -1014,7 +1015,8 @@ public class TestGrunt {
                         +"store a into 'baz';"
                         +"cd /;"
                         +"fs -ls .;"
-                        +"fs -rmr /fstmp/foo/baz;";
+                        +"fs -rmr /fstmp/foo/baz;"
+                        +"cd";
 
         ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
         InputStreamReader reader = new InputStreamReader(cmd);
@@ -1445,4 +1447,62 @@ public class TestGrunt {
 
         assertEquals(Level.INFO.toString(),  
pc.getLog4jProperties().getProperty("log4j.logger.org.apache.pig"));
     }
+
+    @Test
+    public void testAutoShipUDFContainingJar() throws Throwable {
+
+        String FILE_SEPARATOR = System.getProperty("file.separator");
+        File tmpDir = File.createTempFile("test", "");
+        tmpDir.delete();
+        tmpDir.mkdir();
+
+        File udfDir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + 
"com" + FILE_SEPARATOR
+                + "xxx" + FILE_SEPARATOR + "udf");
+        udfDir.mkdirs();
+
+        String udfSrc = new String("package com.xxx.udf;\n" +
+                "import java.io.IOException;\n" +
+                "import org.apache.pig.EvalFunc;\n" +
+                "import org.apache.pig.data.Tuple;\n" +
+                "public class TestUDF extends EvalFunc<Integer>{\n" +
+                "public Integer exec(Tuple input) throws IOException {\n" +
+                "return 1;}\n" +
+                "}");
+
+        // compile
+        JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper();
+        javaCompilerHelper.compile(tmpDir.getAbsolutePath(),
+                new 
JavaCompilerHelper.JavaSourceFromString("com.xxx.udf.TestUDF", udfSrc));
+        
+        String jarName = "TestUDFJar.jar";
+        String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName;
+        int status = Util.executeJavaCommand("jar -cf " + jarFile +
+                " -C " + tmpDir.getAbsolutePath() + " " + "com");
+        assertEquals(0, status);
+
+        Util.createInputFile(cluster, "table_testAutoShipUDFContainingJar", 
new String[] { "1" });
+        File scriptFile = Util.createFile(new String[] {
+                "a = load 'table_testAutoShipUDFContainingJar' as (a0:int);" +
+                "b = foreach a generate com.xxx.udf.TestUDF(a0);" +
+                "store b into 'output_testAutoShipUDFContainingJar';"
+                });
+        String scriptFileName = scriptFile.getAbsolutePath();
+        String execTypeOptions = "-x " + cluster.getExecType() + " ";
+        String cmd = "java -cp " + System.getProperty("java.class.path") + 
File.pathSeparator + jarFile +
+                " org.apache.pig.Main " + execTypeOptions + scriptFileName;
+        ProcessReturnInfo  pri  = Util.executeJavaCommandAndReturnInfo(cmd);
+        assertEquals(pri.exitCode, 0);
+        String[] lines = pri.stderrContents.split("\n");
+        boolean found = false;
+        for (String line : lines) {
+            if (line.matches(".*Added jar .*" + jarName + ".*")) {
+                // MR mode
+                found = true;
+            } else if (line.matches(".*Local resource.*" + jarName + ".*")) {
+                // Tez mode
+                found = true;
+            }
+        }
+        assertTrue(found);
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Thu Nov 
27 12:49:54 2014
@@ -16,6 +16,7 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -41,12 +42,14 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -99,7 +102,7 @@ public class TestHBaseStorage {
 
     @Before
     public void beforeTest() throws Exception {
-        pig = new PigServer(ExecType.LOCAL, conf);
+        pig = new PigServer(Util.getLocalTestMode(), conf);
     }
 
     @After
@@ -123,6 +126,7 @@ public class TestHBaseStorage {
             deletes.add(new Delete(row.getRow()));
         }
         table.delete(deletes);
+        table.close();
     }
 
     /**
@@ -825,6 +829,115 @@ public class TestHBaseStorage {
     }
 
     /**
+     * Test merge inner join with two tables
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testMergeJoin() throws IOException {
+        Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", 
pig.getPigContext().getExecType().equals(ExecType.LOCAL));
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
+        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + 
TESTCOLUMN_C
+                        + "','-loadKey -caster HBaseBinaryConverter') as 
(rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        pig.registerQuery("b = load 'hbase://" + TESTTABLE_2 + "' using "
+                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + 
TESTCOLUMN_C
+                        + "','-loadKey -caster HBaseBinaryConverter') as 
(rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        pig.registerQuery("c = join a by rowKey, b by rowKey USING 'merge';");
+        pig.registerQuery("d = ORDER c BY a::rowKey;");
+
+        Iterator<Tuple> it = pig.openIterator("d");
+        int count = 0;
+        LOG.info("MergeJoin Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            // the columns for both relations should be merged into one tuple
+            // left side
+            String rowKey = (String) t.get(0);
+            int col_a = (Integer) t.get(1);
+            double col_b = (Double) t.get(2);
+            String col_c = (String) t.get(3);
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+            Assert.assertEquals(count, col_a);
+            Assert.assertEquals(count + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + count, col_c);
+
+            // right side
+            String rowKey2 = (String) t.get(4);
+            int col_a2 = (Integer) t.get(5);
+            double col_b2 = (Double) t.get(6);
+            String col_c2 = (String) t.get(7);
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey2);
+            Assert.assertEquals(count, col_a2);
+            Assert.assertEquals(count + 0.0, col_b2, 1e-6);
+            Assert.assertEquals("Text_" + count, col_c2);
+
+            count++;
+        }
+        Assert.assertEquals(count, TEST_ROW_COUNT);
+        LOG.info("MergeJoin done");
+    }
+
+    /**
+     * Test collected group
+     * not much to test here since keys are unique
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testCollectedGroup() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
+        pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
+                        + "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                        + TESTCOLUMN_A + " " + TESTCOLUMN_B + " " + 
TESTCOLUMN_C
+                        + "','-loadKey -caster HBaseBinaryConverter') as 
(rowKey:chararray,col_a:int, col_b:double, col_c:chararray);");
+        pig.registerQuery("c = group a by rowKey USING 'collected';");
+        pig.registerQuery("d = ORDER c BY group;");
+
+        // do a merge group
+        Iterator<Tuple> it = pig.openIterator("d");
+        int count = 0;
+        LOG.info("CollectedGroup Starting");
+        while (it.hasNext()) {
+            Tuple t = it.next();
+
+            String rowKey = (String)t.get(0);
+
+            Assert.assertEquals("00".substring((count + "").length()) + count,
+                    rowKey);
+
+            int rowCount = 0;
+            DataBag rows = (DataBag)t.get(1);
+            for (Iterator<Tuple> iter = rows.iterator(); iter.hasNext();) {
+                Tuple row = iter.next();
+
+                // there should be two bags with all 3 columns
+                int col_a = (Integer) row.get(1);
+                double col_b = (Double) row.get(2);
+                String col_c = (String) row.get(3);
+
+                Assert.assertEquals(count, col_a);
+                Assert.assertEquals(count + 0.0, col_b, 1e-6);
+                Assert.assertEquals("Text_" + count, col_c);
+                rowCount++;
+            }
+            Assert.assertEquals(1, rowCount);
+
+            count++;
+        }
+        Assert.assertEquals(TEST_ROW_COUNT, count);
+        LOG.info("CollectedGroup done");
+    }
+
+    /**
      * Test Load from hbase using HBaseBinaryConverter
      */
     @Test
@@ -892,6 +1005,7 @@ public class TestHBaseStorage {
 
         pig.getPigContext().getProperties()
                 
.setProperty(MRConfiguration.FILEOUTPUTCOMMITTER_MARKSUCCESSFULJOBS, "false");
+        table.close();
     }
 
     /**
@@ -928,6 +1042,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0, col_b, 1e-6);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -964,6 +1079,7 @@ public class TestHBaseStorage {
             Assert.assertEquals("Text_" + i, col_c);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1026,6 +1142,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0, col_b, 1e-6);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1061,6 +1178,7 @@ public class TestHBaseStorage {
             Assert.assertEquals(i + 0.0 + "", col_b);
         }
         Assert.assertEquals(100, i);
+        table.close();
     }
 
     /**
@@ -1101,6 +1219,43 @@ public class TestHBaseStorage {
         Assert.assertEquals(index, TEST_ROW_COUNT);
     }
 
+    @Test
+    // See PIG-4151
+    public void testStoreEmptyMap() throws IOException {
+        String tableName = "emptyMapTest";
+        HTable table;
+        try {
+            deleteAllRows(tableName);
+        } catch (Exception e) {
+            // It's ok, table might not exist.
+        }
+        byte[][] cfs = new byte[2][];
+        cfs[0] = Bytes.toBytes("info");
+        cfs[1] = Bytes.toBytes("friends");
+        try {
+            table = util.createTable(Bytes.toBytesBinary(tableName),
+                    cfs);
+        } catch (Exception e) {
+            table = new HTable(conf, Bytes.toBytesBinary(tableName));
+        }
+
+        File inputFile = Util.createInputFile("test", "tmp", new String[] 
{"row1;Homer;Morrison;[1#Silvia,2#Stacy]",
+                "row2;Sheila;Fletcher;[1#Becky,2#Salvador,3#Lois]",
+                "row4;Andre;Morton;[1#Nancy]",
+                "row3;Sonja;Webb;[]"
+        });
+        pig.registerQuery("source = LOAD '" + 
Util.generateURI(inputFile.toString(), pig.getPigContext())
+                + "' USING PigStorage(';')"
+                + " AS (row:chararray, first_name:chararray, 
last_name:chararray, friends:map[]);");
+        pig.registerQuery("STORE source INTO 'hbase://" + tableName + "' 
USING" +
+                " org.apache.pig.backend.hadoop.hbase.HBaseStorage('info:fname 
info:lname friends:*');");
+        Get get = new Get(Bytes.toBytes("row3"));
+        Result r = table.get(get);
+        Assert.assertEquals(new String(r.getValue(cfs[0], 
Bytes.toBytes("fname"))), "Sonja");
+        Assert.assertEquals(new String(r.getValue(cfs[0], 
Bytes.toBytes("lname"))), "Webb");
+        Assert.assertTrue(r.getFamilyMap(cfs[1]).isEmpty());
+    }
+
     private void scanTable1(PigServer pig, DataFormat dataFormat) throws 
IOException {
         scanTable1(pig, dataFormat, "");
     }

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobControlCompiler.java Thu 
Nov 27 12:49:54 2014
@@ -31,8 +31,10 @@ import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.JarOutputStream;
@@ -47,6 +49,7 @@ import javax.tools.ToolProvider;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -55,11 +58,12 @@ import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -72,7 +76,7 @@ public class TestJobControlCompiler {
 
     private static final Configuration CONF = new Configuration();
 
-    
+
     @BeforeClass
     public static void setupClass() throws Exception {
         // creating a hadoop-site.xml and making it visible to Pig
@@ -126,13 +130,14 @@ public class TestJobControlCompiler {
 
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 8, 
fileClassPaths.length);
+    // guava jar is not shipped with Hadoop 2.x
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 
HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, 
distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
-    Assert.assertTrue("starts with /: "+distributedCachePath, 
+    Assert.assertTrue("starts with /: "+distributedCachePath,
         distributedCachePath.toString().startsWith("/"));
-    Assert.assertTrue("jar pushed to distributed cache should contain 
testUDF", 
+    Assert.assertTrue("jar pushed to distributed cache should contain testUDF",
         jarContainsFileNamed(new File(fileClassPaths[0].toUri().getPath()), 
testUDFFileName));
   }
 
@@ -171,15 +176,9 @@ public class TestJobControlCompiler {
                 StringUtils.join(zipArchives, ","));
         pigContext.getProperties().put("pig.streaming.cache.files",
                 StringUtils.join(tarArchives, ","));
-        final JobControlCompiler jobControlCompiler = new JobControlCompiler(
-                pigContext, CONF);
 
-        final MROperPlan plan = new MROperPlan();
-        plan.add(new MapReduceOper(new OperatorKey()));
+        final JobConf jobConf = compileTestJob(pigContext, CONF);
 
-        final JobControl jobControl = jobControlCompiler.compile(plan, "test");
-        final JobConf jobConf = 
jobControl.getWaitingJobs().get(0).getJobConf();
-        
         URI[] uris = DistributedCache.getCacheFiles(jobConf);
         int sizeTxt = 0;
         for (int i = 0; i < uris.length; i++) {
@@ -193,6 +192,100 @@ public class TestJobControlCompiler {
                 ".tar.gz", ".tar");
     }
 
+    private JobConf compileTestJob(final PigContext pigContext, Configuration 
conf)
+            throws JobCreationException {
+        final JobControlCompiler jobControlCompiler = new JobControlCompiler(
+                pigContext, conf);
+
+        final MROperPlan plan = new MROperPlan();
+        plan.add(new MapReduceOper(new OperatorKey()));
+
+        final JobControl jobControl = jobControlCompiler.compile(plan, "test");
+        final JobConf jobConf = 
jobControl.getWaitingJobs().get(0).getJobConf();
+        return jobConf;
+    }
+
+    /**
+     * Tests that no duplicate jars are added to distributed cache, which 
might cause conflicts
+     * and tests with both symlinked and normal jar specification
+     */
+      @Test
+      public void testNoDuplicateJarsInDistributedCache() throws Exception {
+
+          // JobControlCompiler setup
+          final PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+          PigContext pigContext = pigServer.getPigContext();
+          pigContext.connect();
+
+          Configuration conf = new Configuration();
+          DistributedCache.addFileToClassPath(new Path(new 
URI("/lib/udf-0.jar#udf.jar")), conf, FileSystem.get(conf));
+          DistributedCache.addFileToClassPath(new Path(new 
URI("/lib/udf1.jar#diffname.jar")), conf, FileSystem.get(conf));
+          DistributedCache.addFileToClassPath(new Path(new 
URI("/lib/udf2.jar")), conf, FileSystem.get(conf));
+          createAndAddResource("udf.jar", pigContext);
+          createAndAddResource("udf1.jar", pigContext);
+          createAndAddResource("udf2.jar", pigContext);
+          createAndAddResource("another.jar", pigContext);
+
+          final JobConf jobConf = compileTestJob(pigContext, conf);
+
+          // verifying the jar gets on distributed cache
+          URI[] cacheURIs = DistributedCache.getCacheFiles(jobConf);
+          Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
+          // expected - 1. udf.jar#udf.jar, 2. udf1.jar#diffname.jar 3. 
udf2.jar (same added twice)
+          // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
+          System.out.println("cache.files= " + Arrays.toString(cacheURIs));
+          System.out.println("classpath.files= " + 
Arrays.toString(fileClassPaths));
+          if (HadoopShims.isHadoopYARN()) {
+              // Default jars - 5 (pig, antlr, joda-time, automaton)
+              // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, 
udf2.jar, udf1.jar, another.jar
+              Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 
9,
+                      Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
+              Assert.assertEquals("size 9 for " + 
Arrays.toString(fileClassPaths), 9,
+                      Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
+          } else {
+              // Default jars - 5. Has guava in addition
+              // There will be same entries duplicated for udf.jar and udf2.jar
+              Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 
12,
+                      Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
+              Assert.assertEquals("size 12 for " + 
Arrays.toString(fileClassPaths), 12,
+                      Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
+          }
+
+          // Count occurrences of the resources
+          Map<String, Integer> occurrences = new HashMap<String, Integer>();
+
+          for (URI cacheURI : cacheURIs) {
+              Integer val = occurrences.get(cacheURI.toString());
+              val = (val == null) ? 1 : ++val;
+              occurrences.put(cacheURI.toString(), val);
+          }
+          if (HadoopShims.isHadoopYARN()) {
+              Assert.assertEquals(9, occurrences.size());
+          } else {
+              Assert.assertEquals(10, occurrences.size()); //guava jar in 
addition
+          }
+
+          for (String file : occurrences.keySet()) {
+              if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || 
file.endsWith("udf2.jar"))) {
+                  // Same path added twice which is ok. It should not be a 
shipped to hdfs temp path.
+                  // We assert path is same by checking count
+                  Assert.assertEquals("Two occurrences for " + file, 2, (int) 
occurrences.get(file));
+              } else {
+                  // check that only single occurrence even though we added 
once to dist cache (simulating via Oozie)
+                  // and second time through pig register jar when there is 
symlink
+                  Assert.assertEquals("One occurrence for " + file, 1, (int) 
occurrences.get(file));
+              }
+          }
+      }
+
+      private File createAndAddResource(String name, PigContext pigContext) 
throws IOException {
+          File f = new File(name);
+          f.createNewFile();
+          f.deleteOnExit();
+          pigContext.addJar(name);
+          return f;
+      }
+
     @Test
     public void testEstimateNumberOfReducers() throws Exception {
         Assert.assertEquals(2, JobControlCompiler.estimateNumberOfReducers(
@@ -229,7 +322,7 @@ public class TestJobControlCompiler {
     }
 
   /**
-   * checks if the given file name is in the jar 
+   * checks if the given file name is in the jar
    * @param jarFile the jar to check
    * @param name the name to find (full path in the jar)
    * @return true if the name was found

Modified: pig/branches/spark/test/org/apache/pig/test/TestJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJoin.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJoin.java Thu Nov 27 
12:49:54 2014
@@ -87,7 +87,7 @@ public class TestJoin {
             fileName = fileNameHint;
         } else if (execType == ExecType.LOCAL) {
             File f = Util.createInputFile("test", fileNameHint, data);
-            fileName = 
Util.generateURI(Util.encodeEscape(f.getAbsolutePath()), 
pigServer.getPigContext());
+            fileName = Util.generateURI(f.getAbsolutePath(), 
pigServer.getPigContext());
         }
         return fileName;
     }
@@ -733,4 +733,4 @@ public class TestJoin {
         }
         assertTrue("All expected tuples should have been found, remaining: 
"+expected, expected.isEmpty());
     }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJsonLoaderStorage.java Thu 
Nov 27 12:49:54 2014
@@ -111,7 +111,27 @@ public class TestJsonLoaderStorage {
     "\"l\":null," +
     "\"m\":null" +
     "}";
-
+  
+  private static final String bigDecimalJson =
+    "{" +
+    "\"a\":123.456," +
+    "\"b\":\"123.456\"" +
+    "}";
+       
+  private static final String badJson =
+    "{" +
+    "\"a\":\"good\"," +
+    "\"b\":\"good\"" +
+    "}\n" +
+       "{" +
+    "\"a\":bad," +
+    "\"b\":\"good\"" +
+    "}\n" +
+       "{" +
+    "\"a\":\"good\"," +
+    "\"b\":\"good\"" +
+    "}";
+      
   private static final String jsonOutput =
     "{\"f1\":\"18\",\"count\":3}";
 
@@ -214,6 +234,50 @@ public class TestJsonLoaderStorage {
     assertEquals(1, count);
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testJsonLoaderBadRow() throws IOException{
+
+    String badJsonFile = createInput(badJson);
+    pigServer.registerQuery("data = load '" + badJsonFile + "' using 
JsonLoader('a:chararray, b:chararray');");
+    Iterator<Tuple> tuples = pigServer.openIterator("data");
+    
+    Tuple t = tuples.next();
+    assertTrue(t.size()==2);
+    assertTrue(t.get(0)!=null);
+    assertTrue(t.get(1)!=null);
+    assertTrue(tuples.hasNext());
+
+    // bad row - skip it, returning a null tuple.
+    t = tuples.next();
+    assertTrue(t.size()==2);
+    assertTrue(t.get(0)==null);
+    assertTrue(t.get(1)==null);
+    assertTrue(tuples.hasNext());
+
+    t = tuples.next();
+    assertTrue(t.size()==2);
+    assertTrue(t.get(0)!=null);
+    assertTrue(t.get(1)!=null);
+    assertTrue(!tuples.hasNext());
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testJsonLoaderBigDecimalFormats() throws IOException{
+
+    String bigDecimalJsonFile = createInput(bigDecimalJson);
+    pigServer.registerQuery("data = load '" + bigDecimalJsonFile + "' using 
JsonLoader('a:bigdecimal, b:bigdecimal');");
+    Iterator<Tuple> tuples = pigServer.openIterator("data");
+    
+    Tuple t = tuples.next();
+    assertTrue(t.size()==2);
+    assertTrue(t.get(0)!=null);
+    assertTrue(t.get(1)!=null);
+    assertEquals(t.get(0), t.get(1));
+    assertTrue(!tuples.hasNext());
+  }
+  
   @Test
   public void testJsonLoaderNull() throws IOException {
     Iterator<Tuple> tuples = loadJson(nullJson);
@@ -346,7 +410,7 @@ public class TestJsonLoaderStorage {
     tempJsonFile.delete();
 
     // Pig query to run
-    pigServer.registerQuery("IP = load '"+  
Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext())
+    pigServer.registerQuery("IP = load '"+  Util.generateURI(input.toString(), 
pigServer.getPigContext())
         +"' using PigStorage (';') as (ID:chararray,DETAILS:chararray);");
     pigServer.registerQuery(
         "id_details = FOREACH IP GENERATE " +

Modified: pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java Thu Nov 
27 12:49:54 2014
@@ -20,16 +20,19 @@ package org.apache.pig.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
 import org.junit.AfterClass;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -116,6 +119,34 @@ public class TestLimitVariable {
         Util.checkQueryOutputs(itE, expectedResE);
     }
 
+    @Test
+    public void testLimitVariable4() throws IOException {
+        String query =
+            "a = load '" + inputFile.getName() + "' as (x:int, y:int);" +
+            "b = group a all;" +
+            "c = foreach b generate COUNT(a) as sum;" +
+            "d = order a by $0 DESC;" +
+            "e = filter d by $0 != 4;" +
+            "f = limit e c.sum/2;" // return top half of the tuples
+            ;
+
+        try {
+            HashSet<String> disabledOptimizerRules = new HashSet<String>();
+            disabledOptimizerRules.add("PushUpFilter");
+            
pigServer.getPigContext().getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
+                    ObjectSerializer.serialize(disabledOptimizerRules));
+            Util.registerMultiLineQuery(pigServer, query);
+            Iterator<Tuple> it = pigServer.openIterator("f");
+
+            // Even if push up filter is disabled order should be retained
+            List<Tuple> expectedRes = 
Util.getTuplesFromConstantTupleStrings(new String[] {
+                    "(6,15)", "(5,10)", "(3,10)" });
+            Util.checkQueryOutputs(it, expectedRes);
+        } finally {
+            
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
+        }
+    }
+
     @Test(expected=FrontendException.class)
     public void testLimitVariableException1() throws Throwable {
         String query =
@@ -179,4 +210,20 @@ public class TestLimitVariable {
                 "(1,11)", "(2,3)", "(3,10)", "(6,15)" });
         Util.checkQueryOutputsAfterSort(it, expectedRes);
     }
+
+    @Test
+    public void testZeroLimitVariable() throws Throwable {
+        String query =
+            "a = load '" + inputFile.getName() + "';" +
+            "b = group a all;" +
+            "c = foreach b generate COUNT(a) as sum;" +
+            "d = limit a c.sum - c.sum; "
+            ;
+
+        Util.registerMultiLineQuery(pigServer, query);
+        Iterator<Tuple> it = pigServer.openIterator("d");
+
+        List<Tuple> emptyresult = new ArrayList<Tuple>(0);
+        Util.checkQueryOutputsAfterSort(it, emptyresult);
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoad.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoad.java Thu Nov 27 
12:49:54 2014
@@ -306,7 +306,7 @@ public class TestLoad {
         boolean[] multiquery = {true, false};
 
         for (boolean b : multiquery) {
-            pc.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, "" 
+ b);
+            
pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b);
 
             DataStorage dfs = pc.getDfs();
             dfs.setActiveContainer(dfs.asContainer("/tmp"));

Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLocal.java Thu Nov 27 
12:49:54 2014
@@ -100,7 +100,7 @@ public class TestLocal {
     public Double bigGroupAll( File tmpFile ) throws Throwable {
 
         String query = "foreach (group (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "') all) generate " + COUNT.class.getName() + "($1) ;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
@@ -188,7 +188,7 @@ public class TestLocal {
 
     //Load, Execute and Store query
         String query = "foreach (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "') generate $0,$1;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
@@ -239,7 +239,7 @@ public class TestLocal {
 
     // Load, Execute and Store query
         String query = "foreach (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "') generate $0,$1;";
         System.out.println(query);
         pig.registerQuery("asdf_id = " + query);
@@ -284,7 +284,7 @@ public class TestLocal {
 
         // execute query
         String query = "foreach (group (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "' using " + MyStorage.class.getName() + "()) by "
                 + MyGroup.class.getName() + "('all')) generate flatten("
                 + MyApply.class.getName() + "($1)) ;";
@@ -323,7 +323,7 @@ public class TestLocal {
 
         // execute query
         String query = "foreach (group (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "' using " + MyStorage.class.getName() + "()) by "
                 + MyGroup.class.getName() + "('all')) generate flatten("
                 + MyApply.class.getName() + "($1)) ;";
@@ -358,7 +358,7 @@ public class TestLocal {
         pig.registerFunction("foo",
             new FuncSpec(MyApply.class.getName()+"('foo')"));
         String query = "foreach (group (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "' using " + MyStorage.class.getName() + "()) by "
                 + MyGroup.class.getName()
                 + "('all')) generate flatten(foo($1)) ;";
@@ -406,7 +406,7 @@ public class TestLocal {
         pig.registerFunction("foo",
             new FuncSpec(MyApply.class.getName()+"('foo')"));
         String query = "foreach (group (load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile.toString(), pig.getPigContext())
                 + "' using " + MyStorage.class.getName() + "()) by "
                 + MyGroup.class.getName()
                 + "('all')) generate flatten(foo($1)) ;";
@@ -515,4 +515,4 @@ public class TestLocal {
          return  data;
 
     }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestLocal2.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLocal2.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLocal2.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLocal2.java Thu Nov 27 
12:49:54 2014
@@ -54,10 +54,10 @@ public class TestLocal2 {
         File tmpFile1 = genDataSetFile(false, 30 ) ;
         File tmpFile2 = genDataSetFile(false, 50 ) ;
         pig.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("b = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile2.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("c = union a, b; ") ;
 
@@ -70,10 +70,10 @@ public class TestLocal2 {
         File tmpFile1 = genDataSetFile(true, 30 ) ;
         File tmpFile2 = genDataSetFile(true, 50 ) ;
         pig.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("b = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile2.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("c = union a, b; ") ;
 
@@ -86,10 +86,10 @@ public class TestLocal2 {
         File tmpFile1 = genDataSetFile(false, 30) ;
         File tmpFile2 = genDataSetFile(false, 50) ;
         pig.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("b = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile2.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("a1 = foreach a generate $0, $1; ") ;
         pig.registerQuery("b1 = foreach b generate $0, $1; ") ;
@@ -103,10 +103,10 @@ public class TestLocal2 {
         File tmpFile1 = genDataSetFile(true, 30) ;
         File tmpFile2 = genDataSetFile(true, 50) ;
         pig.registerQuery("a = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile1.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("b = load '"
-                + Util.generateURI(Util.encodeEscape(tmpFile2.toString()), 
pig.getPigContext())
+                + Util.generateURI(tmpFile2.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("a1 = foreach a generate $0, $1; ") ;
         pig.registerQuery("b1 = foreach b generate $0, $1; ") ;
@@ -125,7 +125,7 @@ public class TestLocal2 {
         ps.close();
 
         pig.registerQuery("A = load '"
-                + Util.generateURI(Util.encodeEscape(fp1.toString()), 
pig.getPigContext())
+                + Util.generateURI(fp1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("B = foreach A generate flatten("
                 + Pig800Udf.class.getName() + "($0));");
@@ -148,7 +148,7 @@ public class TestLocal2 {
         ps.close();
 
         pig.registerQuery("A = load '"
-                + Util.generateURI(Util.encodeEscape(fp1.toString()), 
pig.getPigContext())
+                + Util.generateURI(fp1.toString(), pig.getPigContext())
                 + "'; ");
         pig.registerQuery("B = foreach A generate flatten("
                 + Pig800Udf.class.getName() + "($0));");
@@ -171,7 +171,7 @@ public class TestLocal2 {
         ps.close();
 
         pig.registerQuery("A = load '"
-                + Util.generateURI(Util.encodeEscape(fp1.toString()), 
pig.getPigContext())
+                + Util.generateURI(fp1.toString(), pig.getPigContext())
                 + "' AS (c1:int, c2:int); ");
         pig.registerQuery("B = filter A by c1 > 0;");
         pig.registerQuery("C = filter B by c1 < 2;");
@@ -207,10 +207,10 @@ public class TestLocal2 {
 
 
         pig.registerQuery("A = load '"
-                + Util.generateURI(Util.encodeEscape(fp1.toString()), 
pig.getPigContext())
+                + Util.generateURI(fp1.toString(), pig.getPigContext())
                 + "'AS (a0:int, a1:int); ");
         pig.registerQuery("B = load '"
-                + Util.generateURI(Util.encodeEscape(fp2.toString()), 
pig.getPigContext())
+                + Util.generateURI(fp2.toString(), pig.getPigContext())
                 + "'AS (b0:int, b1:int); ");
         pig.registerQuery("C = join A by a0, B by b0;");
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMRJobStats.java Thu Nov 27 
12:49:54 2014
@@ -27,6 +27,8 @@ import java.io.PrintWriter;
 import java.io.RandomAccessFile;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
@@ -126,8 +128,8 @@ public class TestMRJobStats {
         getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
         jobStats.setSuccessful(true);
 
-        getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, 
TaskReport[].class)
-            .invoke(jobStats, mapTaskReports, reduceTaskReports);
+        getJobStatsMethod("addMapReduceStatistics", Iterator.class, 
Iterator.class)
+            .invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), 
Arrays.asList(reduceTaskReports).iterator());
         String msg = (String)getJobStatsMethod("getDisplayString")
             .invoke(jobStats);
 
@@ -156,8 +158,8 @@ public class TestMRJobStats {
         getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
         jobStats.setSuccessful(true);
 
-        getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, 
TaskReport[].class)
-            .invoke(jobStats, mapTaskReports, reduceTaskReports);
+        getJobStatsMethod("addMapReduceStatistics", Iterator.class, 
Iterator.class)
+            .invoke(jobStats, Arrays.asList(mapTaskReports).iterator(), 
Arrays.asList(reduceTaskReports).iterator());
         String msg = (String)getJobStatsMethod("getDisplayString")
             .invoke(jobStats);
         System.out.println(JobStats.SUCCESS_HEADER);

Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Nov 
27 12:49:54 2014
@@ -479,7 +479,7 @@ public class TestMapSideCogroup {
 
         @Override
         public void initialize(Configuration conf) throws IOException {
-            is = FileSystem.get(conf).open(new Path(loc));
+            is = FileSystem.get(new Path(loc).toUri(), conf).open(new 
Path(loc));
         }
 
         @Override

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Thu Nov 27 
12:49:54 2014
@@ -52,7 +52,7 @@ public class TestMultiQuery {
         Util.copyFromLocalToLocal(
                 "test/org/apache/pig/test/data/passwd2", "passwd2");
         Properties props = new Properties();
-        props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
+        props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.LOCAL, props);
     }
 
@@ -813,6 +813,32 @@ public class TestMultiQuery {
         }
     }
 
+    @Test
+    public void testMultiQueryJiraPig4170() throws Exception {
+
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation", Storage.tuple(1, "hello"), Storage.tuple(2, 
"world"));
+
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as 
(a:int, b:chararray);");
+        myPig.registerQuery("A1 = group A by a;");
+        myPig.registerQuery("A2 = group A by b;");
+        myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+        myPig.registerQuery("store A2 into 'output2' using mock.Storage();");
+
+        myPig.executeBatch();
+
+        myPig.registerQuery("A = load 'output1' using mock.Storage() as 
(a:int, c:bag{(i:int, s:chararray)});");
+        Iterator<Tuple> iter = myPig.openIterator("A");
+        iter.next().toString().equals("(1,{(1,hello)})");
+        iter.next().toString().equals("(2,{(2,world)})");
+
+        myPig.registerQuery("A = load 'output2' using mock.Storage() as 
(b:chararray, c:bag{(i:int, s:chararray)});");
+        iter = myPig.openIterator("A");
+        iter.next().toString().equals("(hello,{(1,hello)})");
+        iter.next().toString().equals("(world,{(2,world)})");
+    }
+
     // 
--------------------------------------------------------------------------
     // Helper methods
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryBasic.java Thu 
Nov 27 12:49:54 2014
@@ -69,7 +69,7 @@ public class TestMultiQueryBasic {
         Util.copyFromLocalToLocal(
                 "test/org/apache/pig/test/data/passwd2", "passwd2");
         Properties props = new Properties();
-        props.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
+        props.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.LOCAL, props);
     }
 

Modified: 
pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryCompiler.java Thu 
Nov 27 12:49:54 2014
@@ -82,7 +82,7 @@ public class TestMultiQueryCompiler {
 
     @Before
     public void setUp() throws Exception {
-        cluster.setProperty(PigConfiguration.OPT_MULTIQUERY, ""+true);
+        cluster.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
         myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         deleteOutputFiles();
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQueryLocal.java Thu 
Nov 27 12:49:54 2014
@@ -67,7 +67,7 @@ public class TestMultiQueryLocal {
     @Before
     public void setUp() throws Exception {
         PigContext context = new PigContext(ExecType.LOCAL, new Properties());
-        context.getProperties().setProperty(PigConfiguration.OPT_MULTIQUERY, 
""+true);
+        
context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, 
""+true);
         myPig = new PigServer(context);
         
myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", 
"false");
         
myPig.getPigContext().getProperties().setProperty(PigConfiguration.PIG_TEMP_DIR,
 "build/test/tmp/");

Modified: pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestOrderBy.java Thu Nov 27 
12:49:54 2014
@@ -65,8 +65,7 @@ public class TestOrderBy {
             ps.println("1\t" + DATA[1][i] + "\t" + DATA[0][i]);
         }
         ps.close();
-        
-        
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null)));
+    
     }
     
     @After

Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAgg.java Thu Nov 
27 12:49:54 2014
@@ -17,10 +17,18 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.FuncSpec;
@@ -38,9 +46,11 @@ import org.apache.pig.builtin.IntSum;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.Spillable;
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,12 +60,19 @@ import com.google.common.base.Strings;
  * Test POPartialAgg runtime
  */
 public class TestPOPartialAgg {
-    POPartialAgg partAggOp;
-    PhysicalPlan parentPlan;
-    Tuple dummyTuple = null;
+
+    private static ExecutorService executor = 
Executors.newSingleThreadExecutor();
+    private POPartialAgg partAggOp;
+    private PhysicalPlan parentPlan;
+
+    @AfterClass
+    public static void oneTimeTearDown() {
+        executor.shutdownNow();
+    }
 
     @Before
     public void setUp() throws Exception {
+        PigMapReduce.sJobConfInternal.set(new Configuration());
         createPOPartialPlan(1);
     }
 
@@ -73,14 +90,14 @@ public class TestPOPartialAgg {
 
         // setup value plans
         List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
-        
+
         for (int i = 0; i < valueCount; i++) {
             // project arg for udf
             PhysicalPlan valPlan = new PhysicalPlan();
             POProject projVal1 = new POProject(GenPhyOp.getOK(), -1, i + 1);
             projVal1.setResultType(DataType.BAG);
             valPlan.add(projVal1);
-    
+
             // setup udf
             List<PhysicalOperator> udfInps = new ArrayList<PhysicalOperator>();
             udfInps.add(projVal1);
@@ -89,7 +106,7 @@ public class TestPOPartialAgg {
                     sumSpec);
             valPlan.add(sumUdf);
             valPlan.connect(projVal1, sumUdf);
-    
+
             valuePlans.add(valPlan);
         }
 
@@ -225,7 +242,7 @@ public class TestPOPartialAgg {
     public void testMultiVals() throws Exception {
         // more than one value to be aggregated
         createPOPartialPlan(2);
-        
+
         // input tuple has key, and bag containing SUM.Init output
         String[] inputTups = { "(1,(1L),(2L))", "(2,(2L),(1L))", 
"(1,(2L),(2L))" };
         String[] outputTups = { "(1,(3L),(4L))", "(2,(2L),(1L))" };
@@ -234,21 +251,21 @@ public class TestPOPartialAgg {
 
     @Test
     public void testMultiValCheckNotDisabled() throws Exception {
-        // "large" number of values per input to aggregate but good reduction 
-        // in size due to aggregation. 
-        // This case should result in a reduction from 10500 inputs to 500 
-        // outputs (factor of 20), so in-memory aggregation should not be 
-        // disabled in checkSize(). If it is disabled, too many output rows 
+        // "large" number of values per input to aggregate but good reduction
+        // in size due to aggregation.
+        // This case should result in a reduction from 10500 inputs to 500
+        // outputs (factor of 20), so in-memory aggregation should not be
+        // disabled in checkSize(). If it is disabled, too many output rows
         // will be generated.
-        
+
         int numKeys = 500;
         int numVals = 3;
 
         createPOPartialPlan(numVals);
-        
+
         // Build a string of values to use in all input tuples
         String vals = Strings.repeat(",(1L)", numVals);
-        
+
         // And input tuples.
         // We need the next multiple of numKeys over 10,000 because we need to
         // trigger the size check (at 10,000), and we want an even multiple of
@@ -258,7 +275,7 @@ public class TestPOPartialAgg {
         for (int i = 0; i < numInputs; i++) {
             inputTups[i] = "(" + (i % numKeys) + vals + ")";
         }
-        
+
         // Build expected results
         int expectedVal = numInputs / numKeys;
         vals = Strings.repeat(",(" + expectedVal + "L)", numVals);
@@ -266,17 +283,100 @@ public class TestPOPartialAgg {
         for (int i = 0; i < numKeys; i++) {
             outputTups[i] = "(" + i + vals + ")";
         }
-        
+
         // input tuple has key, and bag containing SUM.Init output
         checkInputAndOutput(inputTups, outputTups, false);
     }
-    
-    
+
+    @Test
+    public void testMemorySpill1() throws Exception {
+        // Test spill which only does aggregation
+        Result res;
+        for (long i=1; i <= 15; i ++) {
+            Tuple t = tuple(1, tuple(i));
+            partAggOp.attachInput(t);
+            res = partAggOp.getNextTuple();
+            assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        }
+        Future<Long> spilled = executor.submit(new Spill(partAggOp));
+        Thread.sleep(100);
+        partAggOp.attachInput(tuple(2, tuple(-1L)));
+        assertFalse(spilled.isDone());
+        res = partAggOp.getNextTuple();
+        // Since it was aggregated there should be no records emitted
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        Thread.sleep(100);
+        assertTrue(spilled.isDone());
+        assertEquals(new Long(1), spilled.get());
+
+        List<Tuple> expectedValues = new ArrayList<Tuple>();
+        expectedValues.add(tuple(1, tuple(120L))); //aggregated result
+        expectedValues.add(tuple(2, tuple(-1L)));
+        // end of all input, now expecting all tuples
+        parentPlan.endOfAllInput = true;
+        res = partAggOp.getNextTuple();
+        do {
+            assertEquals(POStatus.STATUS_OK, res.returnStatus);
+            assertTrue(expectedValues.remove(res.result));
+            res = partAggOp.getNextTuple();
+        } while (res.returnStatus != POStatus.STATUS_EOP);
+        assertTrue(expectedValues.isEmpty());
+    }
+
+    @Test
+    public void testMemorySpill2() throws Exception {
+        // Test spill which emits records as aggregation does not meet 
secondary tier threshold
+        Result res = null;
+        List<Tuple> expectedValues = new ArrayList<Tuple>();
+        //POPartialAgg.SECOND_TIER_THRESHOLD evaluates to 2000 by default
+        for (long i=1; i <= 2001; i ++) {
+            Tuple t = tuple(i, tuple(i));
+            expectedValues.add(t);
+            partAggOp.attachInput(t);
+            res = partAggOp.getNextTuple();
+            assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        }
+        Future<Long> spilled = executor.submit(new Spill(partAggOp));
+        Thread.sleep(100);
+        partAggOp.attachInput(tuple(2, tuple(-1L)));
+        expectedValues.add(tuple(2, tuple(-1L)));
+
+        long i = 0;
+        res = partAggOp.getNextTuple();
+        do {
+            assertFalse(spilled.isDone());
+            assertEquals(POStatus.STATUS_OK, res.returnStatus);
+            assertTrue(expectedValues.remove(res.result));
+            i++;
+            res = partAggOp.getNextTuple();
+        } while (res.returnStatus != POStatus.STATUS_EOP);
+        assertEquals(2002, i);
+        assertTrue(expectedValues.isEmpty());
+        Thread.sleep(100);
+        assertTrue(spilled.isDone());
+        assertEquals(new Long(1), spilled.get());
+    }
+
+    private static class Spill implements Callable<Long> {
+
+        private Spillable spillable;
+
+        public Spill(Spillable spillable) {
+            this.spillable = spillable;
+        }
+
+        @Override
+        public Long call() throws Exception {
+            return spillable.spill();
+        }
+
+    }
+
     /**
      * run the plan on inputTups and check if output matches outputTups if
      * isMapMemEmpty is set to true, set memory available for the hash-map to
      * zero
-     * 
+     *
      * @param inputTups
      * @param outputTups
      * @param isMapMemEmpty
@@ -287,9 +387,8 @@ public class TestPOPartialAgg {
     private void checkInputAndOutput(String[] inputTups, String[] outputTups,
             boolean isMapMemEmpty) throws Exception {
 
-        PigMapReduce.sJobConfInternal.set(new Configuration());
         if (isMapMemEmpty) {
-            
PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE,
+            
PigMapReduce.sJobConfInternal.get().set(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE,
                     "0");
         }
 
@@ -324,7 +423,7 @@ public class TestPOPartialAgg {
 
             res = partAggOp.getNextTuple();
             assertEquals(POStatus.STATUS_EOP, res.returnStatus);
-            Util.compareActualAndExpectedResults(outputs, expectedOuts);
+            Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
         } else {
             while (true) {
                 Result res = partAggOp.getNextTuple();
@@ -332,7 +431,7 @@ public class TestPOPartialAgg {
                     break;
                 }
             }
-            Util.compareActualAndExpectedResults(outputs, expectedOuts);
+            Util.checkQueryOutputsAfterSort(outputs, expectedOuts);
         }
 
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPOPartialAggPlan.java Thu 
Nov 27 12:49:54 2014
@@ -64,7 +64,7 @@ public class TestPOPartialAggPlan  {
     public void testMapAggPropFalse() throws Exception{
         //test with pig.exec.mapPartAgg set to false
         String query = getGByQuery();
-        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, 
"false");
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"false");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -75,7 +75,7 @@ public class TestPOPartialAggPlan  {
     public void testMapAggPropTrue() throws Exception{
         //test with pig.exec.mapPartAgg to true
         String query = getGByQuery();
-        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, 
"true");
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -102,7 +102,7 @@ public class TestPOPartialAggPlan  {
         String query = "l = load 'x' as (a,b,c);" +
                 "g = group l by a;" +
                 "f = foreach g generate group;";
-        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, 
"true");
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 
@@ -115,7 +115,7 @@ public class TestPOPartialAggPlan  {
         String query = "l = load 'x' as (a,b,c);" +
                 "g = group l by a;" +
                 "f = foreach g generate group, COUNT(l.b), l.b;";
-        pc.getProperties().setProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG, 
"true");
+        pc.getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, 
"true");
         MROperPlan mrp = Util.buildMRPlan(query, pc);
         assertEquals(mrp.size(), 1);
 


Reply via email to