Modified: pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java Thu Nov 27 12:49:54 2014 @@ -33,10 +33,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.Shell; +import org.apache.pig.ExecType; +import org.apache.pig.impl.PigContext; import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; import org.apache.pig.tools.parameters.ParseException; import org.junit.Test; @@ -1352,9 +1355,11 @@ public class TestParamSubPreproc { File inputFile = Util.createFile(new String[]{"daniel\t10","jenny\t20"}); File outputFile = File.createTempFile("tmp", ""); outputFile.delete(); - String command = "a = load '" + Util.encodeEscape(inputFile.toString()) + "' as ($param1:chararray, $param2:int);\n" - + "store a into '" + outputFile.toString() + "';\n" + PigContext pc = new PigContext(ExecType.LOCAL, new Properties()); + String command = "a = load '" + Util.generateURI(inputFile.toString(), pc) + "' as ($param1:chararray, $param2:int);\n" + + "store a into '" + Util.generateURI(outputFile.toString(), pc) + "';\n" + "quit\n"; + System.setProperty("jline.WindowsTerminal.directConsole", "false"); System.setIn(new ByteArrayInputStream(command.getBytes())); org.apache.pig.PigRunner.run(new String[] {"-x", "local", "-p", "param1=name", "-p", "param2=age"}, null); File[] partFiles = outputFile.listFiles(new FilenameFilter() { @@ -1387,4 +1392,4 @@ public class TestParamSubPreproc { } return result; } -} \ No newline at end of file +}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Thu Nov 27 12:49:54 2014 @@ -45,23 +45,25 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.newplan.Operator; +import org.apache.pig.tools.pigstats.EmptyPigStats; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigProgressNotificationListener; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStatsUtil; -import org.apache.pig.tools.pigstats.EmptyPigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; public class TestPigRunner { - private static MiniCluster cluster; + private static MiniGenericCluster cluster; + private static String execType; private static final String INPUT_FILE = "input"; private static final String OUTPUT_FILE = "output"; @@ -69,7 +71,8 @@ public class TestPigRunner { @BeforeClass public static void setUpBeforeClass() throws Exception { - cluster = MiniCluster.buildCluster(); + cluster = MiniGenericCluster.buildCluster(); + execType = cluster.getExecType().name().toLowerCase(); PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE)); w.println("1\t2\t3"); w.println("5\t3\t4"); @@ -89,6 +92,7 @@ public class TestPigRunner { @Before public void setUp() { deleteAll(new File(OUTPUT_FILE)); + Util.resetStateForExecModeSwitch(); } @Test @@ -154,8 +158,8 @@ public class TestPigRunner { w.close(); try { - String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Dopt.fetch=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -171,7 +175,7 @@ public class TestPigRunner { Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties()); assertTrue(conf.getBoolean("stop.on.failure", false)); assertTrue(!conf.getBoolean("aggregate.warning", true)); - assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true)); + assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true)); assertTrue(!conf.getBoolean("opt.fetch", true)); } finally { new File(PIG_FILE).delete(); @@ -189,8 +193,8 @@ public class TestPigRunner { w.close(); try { - String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-Dstop.on.failure=true", "-Dopt.multiquery=false", "-Daggregate.warning=false", "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats instanceof EmptyPigStats); assertTrue(stats.isSuccessful()); @@ -200,7 +204,7 @@ public class TestPigRunner { Configuration conf = ConfigurationUtil.toConfiguration(stats.getPigProperties()); assertTrue(conf.getBoolean("stop.on.failure", false)); assertTrue(!conf.getBoolean("aggregate.warning", true)); - assertTrue(!conf.getBoolean(PigConfiguration.OPT_MULTIQUERY, true)); + assertTrue(!conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true)); assertTrue(conf.getBoolean("opt.fetch", true)); } finally { new File(PIG_FILE).delete(); @@ -220,8 +224,8 @@ public class TestPigRunner { Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE); try { - String[] args = { inputInDfs.toString() }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, inputInDfs.toString() }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -248,11 +252,17 @@ public class TestPigRunner { w.println("C = limit B 2;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - String[] args = { PIG_FILE }; + String[] args = { "-x", execType, PIG_FILE }; try { - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - assertTrue(stats.getJobGraph().size() == 4); + if (execType.equals("tez")) { + assertEquals(stats.getJobGraph().size(), 1); + // 5 vertices + assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5); + } else { + assertEquals(stats.getJobGraph().size(), 4); + } assertTrue(stats.getJobGraph().getSinks().size() == 1); assertTrue(stats.getJobGraph().getSources().size() == 1); JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0); @@ -263,11 +273,20 @@ public class TestPigRunner { assertEquals(2, stats.getRecordWritten()); assertEquals(12, stats.getBytesWritten()); - assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get( - 0)).getAlias()); - assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors( - js).get(0)).getAlias()); - assertEquals("B", js.getAlias()); + if (execType.equals("tez")) { + assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get( + 0)).getAlias()); + // TODO: alias is not set for sample-aggregation/partition/sort job. + // Need to investigate + // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors( + // js).get(0)).getAlias()); + } else { + assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get( + 0)).getAlias()); + assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors( + js).get(0)).getAlias()); + assertEquals("B", js.getAlias()); + } } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); @@ -287,8 +306,8 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); assertTrue(stats.getJobGraph().size() == 1); // Each output file should include the following: @@ -336,10 +355,11 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); - assertTrue(stats.getJobGraph().size() == 1); + assertEquals(stats.getJobGraph().size(), 1); + // Each output file should include the following: // output: // 5\t3\t4\n @@ -383,15 +403,19 @@ public class TestPigRunner { w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); try { - String[] args = { PIG_FILE }; + String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, null); Iterator<JobStats> iter = stats.getJobGraph().iterator(); while (iter.hasNext()) { JobStats js=iter.next(); - if(js.getState().name().equals("FAILED")) { - List<Operator> ops=stats.getJobGraph().getSuccessors(js); - for(Operator op : ops ) { - assertEquals(((JobStats)op).getState().toString(), "UNKNOWN"); + if (execType.equals("tez")) { + assertEquals(js.getState().name(), "FAILED"); + } else { + if(js.getState().name().equals("FAILED")) { + List<Operator> ops=stats.getJobGraph().getSuccessors(js); + for(Operator op : ops ) { + assertEquals(((JobStats)op).getState().toString(), "UNKNOWN"); + } } } } @@ -410,7 +434,7 @@ public class TestPigRunner { w.println("C = foreach B generate group, COUNT(A);"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - String[] args = { "-c", PIG_FILE }; + String[] args = { "-x", execType, "-c", PIG_FILE }; PigStats stats = PigRunner.run(args, null); assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION); // TODO: error message has changed. Need to catch the new message generated from the @@ -422,22 +446,23 @@ public class TestPigRunner { @Test public void simpleNegativeTest2() throws Exception { - String[] args = { "-c", "-e", "this is a test" }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, "-c", "-e", "this is a test" }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS); } @Test public void simpleNegativeTest3() throws Exception { - String[] args = { "-c", "-y" }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, "-c", "-y" }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION); - assertEquals("Found unknown option (-y) at position 2", + assertEquals("Found unknown option (-y) at position 4", stats.getErrorMessage()); } @Test - public void NagetiveTest() throws Exception { + public void streamNegativeTest() throws Exception { + Assume.assumeTrue("Skip this test for TEZ temporarily as it hangs", Util.isMapredExecType(cluster.getExecType())); final String OUTPUT_FILE_2 = "output2"; PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);"); @@ -451,21 +476,32 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; + String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, null); assertTrue(!stats.isSuccessful()); - assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE); - assertTrue(stats.getJobGraph().size() == 2); - JobStats job = (JobStats)stats.getJobGraph().getSources().get(0); - assertTrue(job.isSuccessful()); - job = (JobStats)stats.getJobGraph().getSinks().get(0); - assertTrue(!job.isSuccessful()); - assertTrue(stats.getOutputStats().size() == 3); - for (OutputStats output : stats.getOutputStats()) { - if (output.getName().equals("ee")) { + if (execType.equals("tez")) { + assertTrue(stats.getReturnCode() == ReturnCode.FAILURE); + assertTrue(stats.getJobGraph().size() == 1); + JobStats job = (JobStats)stats.getJobGraph().getSinks().get(0); + assertTrue(!job.isSuccessful()); + assertTrue(stats.getOutputStats().size() == 3); + for (OutputStats output : stats.getOutputStats()) { assertTrue(!output.isSuccessful()); - } else { - assertTrue(output.isSuccessful()); + } + } else { + assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE); + assertTrue(stats.getJobGraph().size() == 2); + JobStats job = (JobStats)stats.getJobGraph().getSources().get(0); + assertTrue(job.isSuccessful()); + job = (JobStats)stats.getJobGraph().getSinks().get(0); + assertTrue(!job.isSuccessful()); + assertTrue(stats.getOutputStats().size() == 3); + for (OutputStats output : stats.getOutputStats()) { + if (output.getName().equals("ee")) { + assertTrue(!output.isSuccessful()); + } else { + assertTrue(output.isSuccessful()); + } } } } finally { @@ -519,8 +555,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -554,8 +590,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -586,8 +622,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -613,9 +649,9 @@ public class TestPigRunner { String jarName = Util.findPigJarName(); String[] args = { "-Dpig.additional.jars=" + jarName, - "-Dmapred.job.queue.name=default", + "-Dmapred.job.queue.name=default", "-x", execType, "-e", "A = load '" + INPUT_FILE + "';store A into '" + OUTPUT_FILE + "';\n" }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); Util.deleteFile(cluster, OUTPUT_FILE); PigContext ctx = stats.getPigContext(); @@ -633,7 +669,7 @@ public class TestPigRunner { @Test public void classLoaderTest() throws Exception { // Skip in hadoop 23 test, see PIG-2449 - if (Util.isHadoop23() || Util.isHadoop2_0()) + if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) return; PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("register test/org/apache/pig/test/data/pigtestloader.jar"); @@ -642,8 +678,8 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); } finally { new File(PIG_FILE).delete(); @@ -658,8 +694,8 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(!stats.isSuccessful()); assertTrue(stats.getReturnCode() == PigRunner.ReturnCode.IO_EXCEPTION); @@ -670,7 +706,7 @@ public class TestPigRunner { @Test // PIG-2006 public void testEmptyFile() throws IOException { - File f1 = new File( PIG_FILE ); + File f1 = new File(PIG_FILE); FileWriter fw1 = new FileWriter(f1); fw1.close(); @@ -698,7 +734,7 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; + String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, null); assertTrue(!stats.isSuccessful()); @@ -721,7 +757,7 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; + String[] args = { "-x", execType, PIG_FILE }; PigStats stats = PigRunner.run(args, null); assertTrue(!stats.isSuccessful()); @@ -751,8 +787,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -783,8 +819,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); @@ -815,17 +851,22 @@ public class TestPigRunner { w1.close(); try { - String[] args = { "-Dpig.disable.counter=true", PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = {"-Dpig.disable.counter=true", "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); assertEquals(1, stats.getNumberJobs()); List<InputStats> inputs = stats.getInputStats(); assertEquals(2, inputs.size()); - for (InputStats instats : inputs) { - // the multi-input counters are disabled - assertEquals(-1, instats.getNumberRecords()); + if (execType.equals("tez")) { + assertEquals(5, inputs.get(0).getNumberRecords()); + assertEquals(5, inputs.get(1).getNumberRecords()); + } else { + for (InputStats instats : inputs) { + // the multi-input counters are disabled + assertEquals(-1, instats.getNumberRecords()); + } } List<OutputStats> outputs = stats.getOutputStats(); @@ -853,27 +894,44 @@ public class TestPigRunner { w.close(); try { - String[] args = { PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); + + String TASK_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.TaskCounter" : MRPigStatsUtil.TASK_COUNTER_GROUP; + String FS_COUNTER_GROUP = execType.equals("tez") ? "org.apache.tez.common.counters.FileSystemCounter" : MRPigStatsUtil.FS_COUNTER_GROUP; - Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters(); - assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.MAP_INPUT_RECORDS).getValue()); - assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue()); - assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue()); - assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue()); - assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue()); - - // Skip for hadoop 20.203+, See PIG-2446 - if (Util.isHadoop203plus()) - return; + if (execType.equals("tez")) { + Counters counter= ((JobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters(); + assertEquals(5, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName( + "INPUT_RECORDS_PROCESSED").getValue()); + assertEquals(2, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue()); + assertEquals(7, counter.getGroup(TASK_COUNTER_GROUP).getCounterForName( + "OUTPUT_RECORDS").getValue()); + assertEquals(20,counter.getGroup(FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue()); + assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_READ).getValue()); + } else { + Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters(); + assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.MAP_INPUT_RECORDS).getValue()); + assertEquals(3, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.MAP_OUTPUT_RECORDS).getValue()); + assertEquals(2, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.REDUCE_INPUT_RECORDS).getValue()); + assertEquals(0, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.REDUCE_OUTPUT_RECORDS).getValue()); + assertEquals(20,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue()); + + // Skip for hadoop 20.203+, See PIG-2446 + if (Util.isHadoop203plus()) + return; - assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName( - MRPigStatsUtil.HDFS_BYTES_READ).getValue()); + assertEquals(30,counter.getGroup(MRPigStatsUtil.FS_COUNTER_GROUP).getCounterForName( + MRPigStatsUtil.HDFS_BYTES_READ).getValue()); + } } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); @@ -892,17 +950,22 @@ public class TestPigRunner { w1.close(); try { - String[] args = { "-Dpig.disable.counter=true", PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-Dpig.disable.counter=true", "-x", execType, PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(stats.isSuccessful()); assertEquals(1, stats.getNumberJobs()); List<OutputStats> outputs = stats.getOutputStats(); assertEquals(2, outputs.size()); - for (OutputStats outstats : outputs) { - // the multi-output counters are disabled - assertEquals(-1, outstats.getNumberRecords()); + if (execType.equals("tez")) { + assertEquals(outputs.get(0).getNumberRecords(), 5); + assertEquals(outputs.get(1).getNumberRecords(), 2); + } else { + for (OutputStats outstats : outputs) { + // the multi-output counters are disabled + assertEquals(-1, outstats.getNumberRecords()); + } } List<InputStats> inputs = stats.getInputStats(); @@ -937,8 +1000,8 @@ public class TestPigRunner { w1.close(); try { - String[] args = { "-F", PIG_FILE }; - PigStats stats = PigRunner.run(args, new TestNotificationListener()); + String[] args = { "-x", execType, "-F", PIG_FILE }; + PigStats stats = PigRunner.run(args, new TestNotificationListener(execType)); assertTrue(!stats.isSuccessful()); @@ -967,6 +1030,15 @@ public class TestPigRunner { private static final int JobsSubmitted = 1; private static final int JobStarted = 2; private static final int JobFinished = 3; + private String execType; + + public TestNotificationListener(String execType) { + this.execType = execType; + } + + public TestNotificationListener() { + this.execType = "mr"; + } @Override public void initialPlanNotification(String id, OperatorPlan<?> plan) { Modified: pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java Thu Nov 27 12:49:54 2014 @@ -115,7 +115,7 @@ public class TestPigScriptParser { // backslash - hence 4. In a pig script in a file, this would be // www\\.xyz\\.com "define minelogs org.apache.pig.test.RegexGroupCount('www\\\\.xyz\\\\.com/sports');" , - "A = load '" + Util.generateURI(Util.encodeEscape(f.getAbsolutePath()), ps.getPigContext()) + "' using PigStorage() as (source : chararray);" , + "A = load '" + Util.generateURI(f.getAbsolutePath(), ps.getPigContext()) + "' using PigStorage() as (source : chararray);" , "B = foreach A generate minelogs(source) as sportslogs;" }; for (String line : queryLines) { ps.registerQuery(line); Modified: pig/branches/spark/test/org/apache/pig/test/TestPigServer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigServer.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigServer.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigServer.java Thu Nov 27 12:49:54 2014 @@ -867,7 +867,7 @@ public class TestPigServer { assertEquals("999", properties.getProperty("pig.exec.reducers.max")); assertEquals("true", properties.getProperty("aggregate.warning")); - assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY)); + assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY)); assertEquals("false", properties.getProperty("stop.on.failure")); //Test with properties file @@ -877,7 +877,7 @@ public class TestPigServer { assertEquals("999", properties.getProperty("pig.exec.reducers.max")); assertEquals("true", properties.getProperty("aggregate.warning")); - assertEquals("true", properties.getProperty(PigConfiguration.OPT_MULTIQUERY)); + assertEquals("true", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY)); assertEquals("false", properties.getProperty("stop.on.failure")); PrintWriter out = new PrintWriter(new FileWriter(propertyFile)); @@ -889,7 +889,7 @@ public class TestPigServer { properties = PropertiesUtil.loadDefaultProperties(); assertEquals("false", properties.getProperty("aggregate.warning")); - assertEquals("false", properties.getProperty(PigConfiguration.OPT_MULTIQUERY)); + assertEquals("false", properties.getProperty(PigConfiguration.PIG_OPT_MULTIQUERY)); assertEquals("true", properties.getProperty("stop.on.failure")); propertyFile.delete(); @@ -968,13 +968,13 @@ public class TestPigServer { pigServer.setValidateEachStatement(true); pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';"); + pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';"); pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("store B into '" + tempDir + "/testGruntValidation2';"); // This should pass + pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass boolean validationExceptionCaptured = false; try { // This should fail due to output validation - pigServer.registerQuery("store A into '" + tempDir + "/testGruntValidation1';"); + pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';"); } catch (FrontendException e) { validationExceptionCaptured = true; } Modified: pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPredeployedJar.java Thu Nov 27 12:49:54 2014 @@ -34,13 +34,12 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.JarManager; -import org.apache.pig.newplan.logical.rules.ColumnPruneVisitor; import org.junit.Assert; import org.junit.Test; /** - * Ensure that jars marked as predeployed are not included in the generated - * job jar. + * Ensure that jars marked as predeployed are not included in the generated + * job jar. */ public class TestPredeployedJar { static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); @@ -53,44 +52,44 @@ public class TestPredeployedJar { File logFile = File.createTempFile("log", ""); FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0); logger.addAppender(appender); - + PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getConfiguration()); - pigServer.getPigContext().getProperties().put(PigConfiguration.OPT_FETCH, "false"); + pigServer.getPigContext().getProperties().put(PigConfiguration.PIG_OPT_FETCH, "false"); String[] inputData = new String[] { "hello", "world" }; Util.createInputFile(cluster, "a.txt", inputData); - String jacksonJar = JarManager.findContainingJar(org.codehaus.jackson.JsonParser.class); + String jodaTimeJar = JarManager.findContainingJar(org.joda.time.DateTime.class); pigServer.registerQuery("a = load 'a.txt' as (line:chararray);"); Iterator<Tuple> it = pigServer.openIterator("a"); String content = FileUtils.readFileToString(logFile); - Assert.assertTrue(content.contains(jacksonJar)); - + Assert.assertTrue(content.contains(jodaTimeJar)); + logFile = File.createTempFile("log", ""); - - // Now let's mark the jackson jar as predeployed. - pigServer.getPigContext().markJarAsPredeployed(jacksonJar); + + // Now let's mark the guava jar as predeployed. + pigServer.getPigContext().markJarAsPredeployed(jodaTimeJar); it = pigServer.openIterator("a"); content = FileUtils.readFileToString(logFile); - Assert.assertFalse(content.contains(jacksonJar)); + Assert.assertFalse(content.contains(jodaTimeJar)); } - + @Test public void testPredeployedJarsProperty() throws ExecException { Properties p = new Properties(); p.setProperty("pig.predeployed.jars", "zzz"); PigServer pigServer = new PigServer(ExecType.LOCAL, p); - + Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("zzz")); - + p = new Properties(); p.setProperty("pig.predeployed.jars", "aaa" + File.pathSeparator + "bbb"); pigServer = new PigServer(ExecType.LOCAL, p); - + Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("aaa")); Assert.assertTrue(pigServer.getPigContext().predeployedJars.contains("bbb")); - + Assert.assertFalse(pigServer.getPigContext().predeployedJars.contains("zzz")); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Thu Nov 27 12:49:54 2014 @@ -282,7 +282,7 @@ public class TestPruneColumn { @Test public void testLoadForEach1() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a1, a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -305,7 +305,7 @@ public class TestPruneColumn { @Test public void testLoadForEach2() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a0, a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -328,7 +328,7 @@ public class TestPruneColumn { @Test public void testLoadForEach3() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a0, a1;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -351,8 +351,8 @@ public class TestPruneColumn { @Test public void testJoin1() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); pigServer.registerQuery("C = join A by a1, B by b1;"); pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;"); @@ -373,8 +373,8 @@ public class TestPruneColumn { @Test public void testJoin2() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); pigServer.registerQuery("C = join A by a1, B by b1;"); pigServer.registerQuery("D = foreach C generate a1, a2, b1;"); @@ -395,7 +395,7 @@ public class TestPruneColumn { @Test public void testForEachFilter() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = filter A by a2==3;"); pigServer.registerQuery("C = foreach B generate a0, a1;"); @@ -414,7 +414,7 @@ public class TestPruneColumn { @Test public void testForEach1() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a0, a1+a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -438,7 +438,7 @@ public class TestPruneColumn { @Test public void testForEach2() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a0 as b0, *;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -466,7 +466,7 @@ public class TestPruneColumn { @Test public void testSplit1() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.registerQuery("split A into B if $0<=1, C if $0>1;"); pigServer.registerQuery("D = foreach B generate $1;"); @@ -484,7 +484,7 @@ public class TestPruneColumn { @Test public void testSplit2() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.registerQuery("split A into B if $0<=1, C if $0>1;"); pigServer.registerQuery("D = foreach B generate $1;"); @@ -502,7 +502,7 @@ public class TestPruneColumn { @Test public void testForeachNoSchema1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';"); pigServer.registerQuery("B = foreach A generate $1, $2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -525,7 +525,7 @@ public class TestPruneColumn { @Test public void testForeachNoSchema2() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';"); pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -548,8 +548,8 @@ public class TestPruneColumn { @Test public void testCoGroup1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);"); pigServer.registerQuery("C = cogroup A by $1, B by $1;"); pigServer.registerQuery("D = foreach C generate AVG($1.$1);"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -579,7 +579,7 @@ public class TestPruneColumn { @Test public void testCoGroup2() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); pigServer.registerQuery("B = group A all;"); pigServer.registerQuery("C = foreach B generate $1;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -595,7 +595,7 @@ public class TestPruneColumn { @Test public void testCoGroup3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); pigServer.registerQuery("B = group A by $1;"); pigServer.registerQuery("C = foreach B generate $1, '1';"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -621,8 +621,8 @@ public class TestPruneColumn { @Test public void testCoGroup4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);"); pigServer.registerQuery("C = cogroup A by ($1), B by ($1);"); pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -655,7 +655,7 @@ public class TestPruneColumn { @Test public void testCoGroup5() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = group A by (a0, a1);"); pigServer.registerQuery("C = foreach B generate flatten(group);"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -681,7 +681,7 @@ public class TestPruneColumn { @Test public void testDistinct1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = distinct A;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -699,7 +699,7 @@ public class TestPruneColumn { @Test public void testStream1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -723,7 +723,7 @@ public class TestPruneColumn { @Test public void testBinCond1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);"); pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -747,8 +747,8 @@ public class TestPruneColumn { @Test public void testCoGroup6() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = cogroup A by ($1), B by ($1);"); pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -776,8 +776,8 @@ public class TestPruneColumn { @Test public void testCoGroup7() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = cogroup A by ($1), B by ($1);"); pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -807,8 +807,8 @@ public class TestPruneColumn { @Test public void testCross1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = cross A, B;"); pigServer.registerQuery("D = foreach C generate $0, $3;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -851,8 +851,8 @@ public class TestPruneColumn { @Test public void testUnion1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); pigServer.registerQuery("C = union A, B;"); pigServer.registerQuery("D = foreach C generate $0, $2;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -892,8 +892,8 @@ public class TestPruneColumn { @Test public void testFRJoin1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';"); pigServer.registerQuery("D = foreach C generate $0, $3;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -920,7 +920,7 @@ public class TestPruneColumn { @Test public void testFilter1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = order A by a1;"); pigServer.registerQuery("C = limit B 10;"); pigServer.registerQuery("D = foreach C generate $0;"); @@ -945,7 +945,7 @@ public class TestPruneColumn { @Test public void testFilter2() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = filter A by a0+a2 == 4;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -969,7 +969,7 @@ public class TestPruneColumn { @Test public void testOrderBy1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = order A by $0;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -993,7 +993,7 @@ public class TestPruneColumn { @Test public void testOrderBy2() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = order A by *;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1017,7 +1017,7 @@ public class TestPruneColumn { @Test public void testCogroup8() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = group A by *;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1041,8 +1041,8 @@ public class TestPruneColumn { @Test public void testJoin3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile4.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); pigServer.registerQuery("C = join A by *, B by * using 'replicated';"); pigServer.registerQuery("D = foreach C generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -1066,7 +1066,7 @@ public class TestPruneColumn { @Test public void testLoadForEach4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate *;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1090,7 +1090,7 @@ public class TestPruneColumn { @Test public void testForEachUDF() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); pigServer.registerQuery("B = foreach A generate StringSize(*);"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1114,8 +1114,8 @@ public class TestPruneColumn { @Test public void testOutJoin1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile6.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);"); pigServer.registerQuery("C = join A by $0 left, B by $0;"); pigServer.registerQuery("D = foreach C generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("D"); @@ -1144,7 +1144,7 @@ public class TestPruneColumn { @Test public void testFilter3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;"); pigServer.registerQuery("C = foreach B generate $0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1168,7 +1168,7 @@ public class TestPruneColumn { @Test public void testMapKey1() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); pigServer.registerQuery("B = foreach A generate a0, a1#'key1';"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -1192,7 +1192,7 @@ public class TestPruneColumn { @Test public void testMapKey2() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); pigServer.registerQuery("B = foreach A generate a1, a1#'key1';"); pigServer.registerQuery("C = foreach B generate $0#'key2', $1;"); @@ -1218,7 +1218,7 @@ public class TestPruneColumn { @Test public void testMapKey3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); pigServer.registerQuery("B = foreach A generate a1, a1#'key1';"); pigServer.registerQuery("C = group B all;"); @@ -1235,7 +1235,7 @@ public class TestPruneColumn { @Test public void testMapKey4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); pigServer.registerQuery("B = limit A 10;"); pigServer.registerQuery("C = foreach B generate $0, $1#'key1';"); @@ -1260,7 +1260,7 @@ public class TestPruneColumn { @Test public void testMapKey5() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);"); pigServer.registerQuery("B = foreach A generate $0, $1#'key1';"); pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;"); @@ -1285,7 +1285,7 @@ public class TestPruneColumn { @Test public void testMapKeyInSplit1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);"); pigServer.registerQuery("B = foreach A generate m#'key1' as key1;"); pigServer.registerQuery("C = foreach A generate m#'key2' as key2;"); pigServer.registerQuery("D = join B by key1, C by key2;"); @@ -1306,7 +1306,7 @@ public class TestPruneColumn { @SuppressWarnings("rawtypes") @Test public void testMapKeyInSplit2() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile12.toString()), pigServer.getPigContext()) + "' as (m:map[]);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);"); pigServer.registerQuery("B = filter A by m#'cond'==1;"); pigServer.registerQuery("C = filter B by m#'key1'==1;"); pigServer.registerQuery("D = filter B by m#'key2'==2;"); @@ -1331,7 +1331,7 @@ public class TestPruneColumn { @Test public void testConstantPlan() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate 1, a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -1355,7 +1355,7 @@ public class TestPruneColumn { @Test public void testPlainPlan() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.registerQuery("B = order A by $0;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -1386,7 +1386,7 @@ public class TestPruneColumn { intermediateFile.delete(); // delete since we don't want the file to be present String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath()); - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.store("A", clusterPath, "BinStorage()"); pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath) @@ -1417,7 +1417,7 @@ public class TestPruneColumn { intermediateFile.delete(); // delete since we don't want the file to be present String clusterPath = Util.removeColon(intermediateFile.getAbsolutePath()); - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); pigServer.store("A", clusterPath, "BinStorage()"); pigServer.registerQuery("A = load '"+ Util.encodeEscape(clusterPath) @@ -1448,7 +1448,7 @@ public class TestPruneColumn { @Test public void testProjectCastKeyLookup() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0, a1);"); pigServer.registerQuery("B = foreach A generate a1#'key1';"); @@ -1474,7 +1474,7 @@ public class TestPruneColumn { @Test public void testRelayFlattenMap() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile3.toString()), pigServer.getPigContext()) + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0, a1:map[]);"); pigServer.registerQuery("B = foreach A generate flatten(a1);"); @@ -1500,8 +1500,8 @@ public class TestPruneColumn { @Test public void testCrossAtLeastOneColumnOneInput() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);"); pigServer.registerQuery("C = cross A, B;"); pigServer.registerQuery("D = foreach C generate $0;"); @@ -1538,8 +1538,8 @@ public class TestPruneColumn { @Test public void testComplex1() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile7.toString()), pigServer.getPigContext()) + "' as (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile8.toString()), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile8.toString(), pigServer.getPigContext()) + "' as (b0, b1, b2, b3);"); pigServer.registerQuery("B1 = foreach B generate b2, b0+b3;"); pigServer.registerQuery("C = join A by $0, B1 by $0;"); pigServer.registerQuery("D = order C by $4;"); @@ -1568,7 +1568,7 @@ public class TestPruneColumn { pigServer.getPigContext().getProperties().setProperty( PigImplConstants.PIG_OPTIMIZER_RULES_KEY, ObjectSerializer.serialize(optimizerRules)); - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile13.toString()), pigServer.getPigContext()) + "' as (a:int, b:chararray);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile13.toString(), pigServer.getPigContext()) + "' as (a:int, b:chararray);"); pigServer.registerQuery("B = FOREACH A generate a;"); pigServer.registerQuery("C = GROUP B by a;"); pigServer.registerQuery("D = filter C by group > 0 and group < 100;"); @@ -1598,8 +1598,8 @@ public class TestPruneColumn { @Test public void testCoGroup8() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);"); pigServer.registerQuery("C = cogroup A by ($1), B by ($1);"); pigServer.registerQuery("D = foreach C generate $0, $1;"); @@ -1631,7 +1631,7 @@ public class TestPruneColumn { // See PIG-1128 @Test public void testUserDefinedSchema() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);"); pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;"); pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;"); @@ -1653,7 +1653,7 @@ public class TestPruneColumn { // See PIG-1127 @Test public void testSharedSchemaObject() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile10.toString()), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);"); pigServer.registerQuery("B = foreach A generate a1;"); pigServer.registerQuery("C = limit B 10;"); @@ -1671,8 +1671,8 @@ public class TestPruneColumn { // See PIG-1142 @Test public void testJoin4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); pigServer.registerQuery("C = join A by a2, B by b2;"); pigServer.registerQuery("D = foreach C generate $0, $1, $2;"); @@ -1696,7 +1696,7 @@ public class TestPruneColumn { @Test public void testFilter4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);"); pigServer.registerQuery("B = filter A by a2==3;"); pigServer.registerQuery("C = foreach B generate $2;"); @@ -1713,7 +1713,7 @@ public class TestPruneColumn { @Test public void testSplit3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);"); pigServer.registerQuery("split A into B if a2==3, C if a2<3;"); pigServer.registerQuery("C = foreach B generate $2;"); @@ -1730,7 +1730,7 @@ public class TestPruneColumn { @Test public void testOrderBy3() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = order A by a2;"); pigServer.registerQuery("C = foreach B generate a2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1754,9 +1754,9 @@ public class TestPruneColumn { @Test public void testCogroup9() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); - pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (c0, c1, c2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);"); + pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (c0, c1, c2);"); pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;"); pigServer.registerQuery("E = foreach D generate $1, $2;"); Iterator<Tuple> iter = pigServer.openIterator("E"); @@ -1781,8 +1781,8 @@ public class TestPruneColumn { // See PIG-1165 @Test public void testOrderbyWrongSignature() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b1);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);"); pigServer.registerQuery("C = order A by a1;"); pigServer.registerQuery("D = join C by a1, B by b0;"); pigServer.registerQuery("E = foreach D generate a1, b0, b1;"); @@ -1802,8 +1802,8 @@ public class TestPruneColumn { // See PIG-1146 @Test public void testUnionMixedPruning() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);"); - pigServer.registerQuery("B = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (b0, b2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);"); + pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);"); pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;"); pigServer.registerQuery("D = union A, C;"); pigServer.registerQuery("E = foreach D generate $0, $2;"); @@ -1846,9 +1846,9 @@ public class TestPruneColumn { // See PIG-1176 @Test public void testUnionMixedSchemaPruning() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate a0;;"); - pigServer.registerQuery("C = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';"); pigServer.registerQuery("D = foreach C generate $0;"); pigServer.registerQuery("E = union B, D;"); Iterator<Tuple> iter = pigServer.openIterator("E"); @@ -1921,7 +1921,7 @@ public class TestPruneColumn { // See PIG-1210 @Test public void testFieldsToReadDuplicatedEntry() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate a0+a0, a1, a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -1941,7 +1941,7 @@ public class TestPruneColumn { // See PIG-1272 @Test public void testSplit4() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate a0;"); pigServer.registerQuery("C = join A by a0, B by a0;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -1959,7 +1959,7 @@ public class TestPruneColumn { @Test public void testSplit5() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile11.toString()), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);"); pigServer.registerQuery("B = foreach A generate a0, a1;"); pigServer.registerQuery("C = join A by a0, B by a0;"); pigServer.registerQuery("D = filter C by A::a1>=B::a1;"); @@ -1981,7 +1981,7 @@ public class TestPruneColumn { // See PIG-1493 @Test public void testInconsistentPruning() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2);"); pigServer.registerQuery("B = foreach A generate CONCAT(a0,a1) as b0, a0, a2;"); pigServer.registerQuery("C = foreach B generate a0, a2;"); Iterator<Tuple> iter = pigServer.openIterator("C"); @@ -2003,12 +2003,12 @@ public class TestPruneColumn { Path output1 = FileLocalizer.getTemporaryPath(pigServer.getPigContext()); Path output2 = FileLocalizer.getTemporaryPath(pigServer.getPigContext()); pigServer.setBatchOn(); - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile5.toString()), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);"); pigServer.registerQuery("B = foreach A generate a0, a1, a2;"); - pigServer.registerQuery("store B into '" + Util.generateURI(Util.encodeEscape(output1.toString()), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("store B into '" + Util.generateURI(output1.toString(), pigServer.getPigContext()) + "';"); pigServer.registerQuery("C = order B by a2;"); pigServer.registerQuery("D = foreach C generate a2;"); - pigServer.registerQuery("store D into '" + Util.generateURI(Util.encodeEscape(output2.toString()), pigServer.getPigContext()) + "';"); + pigServer.registerQuery("store D into '" + Util.generateURI(output2.toString(), pigServer.getPigContext()) + "';"); pigServer.executeBatch(); BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties()))); @@ -2093,7 +2093,7 @@ public class TestPruneColumn { } public void testAliasInRequiredFieldList() throws Exception{ - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile1.toString()), pigServer.getPigContext()) + "' using " + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using " + PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);"); pigServer.registerQuery("B = foreach A generate a1, a2;"); Iterator<Tuple> iter = pigServer.openIterator("B"); @@ -2110,7 +2110,7 @@ public class TestPruneColumn { @Test public void testCogroup10() throws Exception { - pigServer.registerQuery("A = load '"+ Util.generateURI(Util.encodeEscape(tmpFile2.toString()), pigServer.getPigContext()) + "' AS (a0, a1:double);"); + pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1:double);"); pigServer.registerQuery("B = foreach A generate a0, a1, 0 as joinField;"); pigServer.registerQuery("C = group B all;"); pigServer.registerQuery("D = foreach C generate 0 as joinField, SUM(B.a1) as total;"); @@ -2179,4 +2179,4 @@ public class TestPruneColumn { assertTrue(checkLogFileMessage(new String[]{"Map key required for event_serve: $0->[event_guid, filter_key, receive_time]", "Map key required for raw: $0->[cm_serve_id, cm_serve_timestamp_ms, p_url, source, type]"})); } -} \ No newline at end of file +} Modified: pig/branches/spark/test/org/apache/pig/test/TestRank3.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestRank3.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestRank3.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestRank3.java Thu Nov 27 12:49:54 2014 @@ -53,6 +53,12 @@ public class TestRank3 { data = resetData(pigServer); data.set("empty"); + data.set("testsplit", + tuple(1, 2), + tuple(1, 2), + tuple(3, 1), + tuple(2, 4), + tuple(2, 3)); data.set( "testcascade", tuple(3,2,3), @@ -156,6 +162,39 @@ public class TestRank3 { verifyExpected(data.get("empty_result"), expected); } + @Test + public void testRankWithSplitInMap() throws Exception { + String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);" + + "R2 = rank R1 by a ;" + + "R3 = rank R1 ;" + + "R4 = union R2, R3;" + + "store R4 into 'R4' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + List<Tuple> expectedResults = Util + .getTuplesFromConstantTupleStrings(new String[] { "(1L,1,2)", + "(2L,1,2)", "(3L,3,1)", "(4L,2,4)", "(5L,2,3)", "(1L,1,2)", + "(1L,1,2)", "(3L,2,3)", "(3L,2,4)", "(5L,3,1)" }); + Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults); + } + + @Test + public void testRankWithSplitInReduce() throws Exception { + String query = "R1 = LOAD 'testsplit' USING mock.Storage() AS (a:int,b:int);" + + "R1 = ORDER R1 by b;" + + "R2 = rank R1 by a ;" + + "R3 = rank R1;" + + "R4 = union R2, R3;" + + "store R4 into 'R4' using mock.Storage();"; + + Util.registerMultiLineQuery(pigServer, query); + List<Tuple> expectedResults = Util + .getTuplesFromConstantTupleStrings(new String[] { "(1L,3,1)", + "(2L,1,2)", "(3L,1,2)", "(4L,2,3)", "(5L,2,4)", "(1L,1,2)", + "(1L,1,2)", "(3L,2,4)", "(3L,2,3)", "(5L,3,1)" }); + Util.checkQueryOutputsAfterSort(data.get("R4"), expectedResults); + } + public void verifyExpected(List<Tuple> out, Set<Tuple> expected) { for (Tuple tup : out) { assertTrue(expected + " contains " + tup, expected.contains(tup));
