Added: pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java (added) +++ pig/branches/spark/test/org/apache/pig/test/TestNewPredicatePushDown.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import org.apache.pig.*; +import org.apache.pig.impl.PigContext; +import org.apache.pig.newplan.Operator; +import org.apache.pig.newplan.PredicatePushDownFilterExtractor; +import org.apache.pig.newplan.logical.expression.*; +import org.apache.pig.newplan.logical.relational.LOFilter; +import org.apache.pig.newplan.logical.relational.LogicalPlan; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +import static org.apache.pig.Expression.OpType.OP_NOT; +import static org.apache.pig.Expression.OpType.OP_NULL; +import static org.apache.pig.Expression.OpType.OP_AND; +import static org.apache.pig.Expression.OpType.OP_EQ; + +/** + * unit tests to test extracting new push-down filter conditions out of the filter + * condition in the filter following a load which talks to metadata system (.i.e. + * implements {@link LoadMetadata}) + */ +public class TestNewPredicatePushDown { + static PigContext pc = new PigContext(ExecType.LOCAL, new Properties()); + String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, " + + "age:int, browser:map[], location:tuple(country:chararray, zip:int));"; + + // PIG-4940 + @Test + public void testSingleUnaryExpressionUnsupportedPushdown() throws Exception { + String q = query + "b = filter a by not browser#'type' is null;" + + "store b into 'out';"; + test(q, Arrays.asList("browser"), Arrays.asList(OP_NOT, OP_NULL), null, + "(not (browser#'type' is null))", true); + } + + // PIG-4953 + @Test + public void testSingleUnaryExpressionSuccessfulPushdown() throws Exception { + String q = query + "b = filter a by mrkt is not null;" + + "store b into 'out';"; + test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL), + "((mrkt is null) not)", null, true); + } + + @Test + public void testUnaryAndExpressionSuccessfulPushdown() throws Exception { + String q = query + "b = filter a by not (mrkt is null and mrkt == 'us');" + + "store b into 'out';"; + test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL, OP_AND, OP_EQ), + "(((mrkt is null) and (mrkt == 'us')) not)", null, true); + } + + @Test + public void testUnaryAndExpressionUnsupportedPushdown() throws Exception { + String q = query + "b = filter a by not (mrkt is null and mrkt != 'us');" + + "store b into 'out';"; + test(q, Arrays.asList("mrkt"), Arrays.asList(OP_NOT, OP_NULL, OP_AND, OP_EQ), + null, "(not ((mrkt is null) and (mrkt != us)))", true); + } + + private PredicatePushDownFilterExtractor test(String query, List<String> predicateCols, List<Expression.OpType> supportedOpTypes, + String expPushFilterString, String expFilterString, boolean unsupportedExpression) + throws Exception { + PigServer pigServer = new PigServer( pc ); + LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query); + Operator op = newLogicalPlan.getSinks().get(0); + LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0); + PredicatePushDownFilterExtractor pushColExtractor = new PredicatePushDownFilterExtractor( + filter.getFilterPlan(), predicateCols, supportedOpTypes); + pushColExtractor.visit(); + + if (expPushFilterString == null) { + Assert.assertEquals("Checking partition column filter:", null, + pushColExtractor.getPushDownExpression()); + } else { + Assert.assertEquals("Checking partition column filter:", + expPushFilterString, + pushColExtractor.getPushDownExpression().toString()); + } + + if (expFilterString == null) { + Assert.assertTrue("Check that filter can be removed:", + pushColExtractor.isFilterRemovable()); + } else { + if (unsupportedExpression) { + String actual = TestNewPartitionFilterPushDown.getTestExpression( + (LogicalExpression)pushColExtractor.getFilteredPlan().getSources().get(0)).toString(); + Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual); + } else { + String actual = pushColExtractor.getExpression( + (LogicalExpression)pushColExtractor.getFilteredPlan().getSources().get(0)).toString(); + Assert.assertEquals("checking trimmed filter expression:", expFilterString, actual); + } + } + return pushColExtractor; + } + +}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPOGenerate.java Wed Feb 22 09:43:41 2017 @@ -21,8 +21,10 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -46,6 +48,7 @@ public class TestPOGenerate { DataBag cogroup; DataBag partialFlatten; DataBag simpleGenerate; + DataBag mapFlatten; Random r = new Random(); BagFactory bf = BagFactory.getInstance(); TupleFactory tf = TupleFactory.getInstance(); @@ -54,10 +57,25 @@ public class TestPOGenerate { public void setUp() throws Exception { Tuple [] inputA = new Tuple[4]; Tuple [] inputB = new Tuple[4]; + Tuple [] inputC = new Tuple[4]; for(int i = 0; i < 4; i++) { inputA[i] = tf.newTuple(2); inputB[i] = tf.newTuple(1); + inputC[i] = tf.newTuple(2); } + Map map0 = new HashMap<String,String>(); + Map map1 = new HashMap<String,String>(); + Map map2 = new HashMap<String,String>(); + Map map3 = new HashMap<String,String>(); + map0.put("A",""); + map0.put("B",""); + map1.put("A","a"); + map1.put("B","b"); + map2.put("A","aa"); + map2.put("B","bb"); + map3.put("A","aaa"); + map3.put("B","bbb"); + inputA[0].set(0, 'a'); inputA[0].set(1, '1'); inputA[1].set(0, 'b'); @@ -70,6 +88,15 @@ public class TestPOGenerate { inputB[1].set(0, 'b'); inputB[2].set(0, 'a'); inputB[3].set(0, 'd'); + inputC[0].set(0, 0); + inputC[0].set(1, map0); + inputC[1].set(0, 1); + inputC[1].set(1, map1); + inputC[2].set(0, 2); + inputC[2].set(1, map2); + inputC[3].set(0, 3); + inputC[3].set(1, map3); + DataBag cg11 = bf.newDefaultBag(); cg11.add(inputA[0]); cg11.add(inputA[2]); @@ -119,15 +146,22 @@ public class TestPOGenerate { tPartial[3].append(emptyBag); partialFlatten = bf.newDefaultBag(); - for(int i = 0; i < 4; ++i) { + for (int i = 0; i < 4; ++i) { partialFlatten.add(tPartial[i]); } simpleGenerate = bf.newDefaultBag(); - for(int i = 0; i < 4; ++i) { + for (int i = 0; i < 4; ++i) { simpleGenerate.add(inputA[i]); } + + mapFlatten = bf.newDefaultBag(); + for (int i = 0; i < inputC.length; ++i) { + mapFlatten.add(inputC[i]); + } + + //System.out.println("Cogroup : " + cogroup); //System.out.println("Partial : " + partialFlatten); //System.out.println("Simple : " + simpleGenerate); @@ -248,4 +282,49 @@ public class TestPOGenerate { assertEquals(simpleGenerate.size(), count); } + + @Test + public void testMapFlattenGenerate() throws Exception { + ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0); + ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1); + prj1.setResultType(DataType.INTEGER); + prj2.setResultType(DataType.MAP); + List<Boolean> toBeFlattened = new LinkedList<Boolean>(); + toBeFlattened.add(false); + toBeFlattened.add(true); + PhysicalPlan plan1 = new PhysicalPlan(); + plan1.add(prj1); + PhysicalPlan plan2 = new PhysicalPlan(); + plan2.add(prj2); + List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>(); + inputs.add(plan1); + inputs.add(plan2); + PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened); + + List<String> obtained = new LinkedList<String>(); + for (Tuple t : mapFlatten) { + poGen.attachInput(t); + Result output = poGen.getNextTuple(); + while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) { + //System.out.println(output.result); + obtained.add(((Tuple) output.result).toString()); + output = poGen.getNextTuple(); + } + } + + int count = 0; + for (Tuple t : mapFlatten) { + Tuple expected = tf.newTuple(3); + expected.set(0, t.get(0)); + for (Object entryObj : ((Map)t.get(1)).entrySet()){ + Map.Entry entry = ((Map.Entry)entryObj); + expected.set(1, entry.getKey()); + expected.set(2, entry.getValue()); + assertTrue(obtained.contains(expected.toString())); + ++count; + } + } + assertEquals(mapFlatten.size()*2, count); + + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestParamSubPreproc.java Wed Feb 22 09:43:41 2017 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -42,6 +43,7 @@ import org.apache.pig.ExecType; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor; import org.apache.pig.tools.parameters.ParseException; +import org.apache.pig.tools.pigstats.PigStats; import org.junit.Test; public class TestParamSubPreproc { @@ -52,24 +54,9 @@ public class TestParamSubPreproc { private FileInputStream pigExResultStream; private String basedir = "test/org/apache/pig/test/data"; - /* Test case 1 - * Use a parameter within a pig script and provide value on the command line. - */ - @Test - public void testCmdlineParam() throws Exception{ - log.info("Starting test testCmdlineParam() ..."); - ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); - pigIStream = new BufferedReader(new FileReader(basedir + "/input1.pig")); - pigOStream = new FileWriter(basedir + "/output1.pig"); - - String[] arg = {"date=20080228"}; - String[] argFiles = null; - ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); - - FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); - pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); + private void compareResults(InputStream expected, InputStream result) throws IOException { + BufferedReader inExpected = new BufferedReader(new InputStreamReader(expected)); + BufferedReader inResult = new BufferedReader(new InputStreamReader(result)); String exLine; String resLine; @@ -89,6 +76,26 @@ public class TestParamSubPreproc { inExpected.close(); inResult.close(); + } + + /* Test case 1 + * Use a parameter within a pig script and provide value on the command line. + */ + @Test + public void testCmdlineParam() throws Exception{ + log.info("Starting test testCmdlineParam() ..."); + ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); + pigIStream = new BufferedReader(new FileReader(basedir + "/input1.pig")); + pigOStream = new FileWriter(basedir + "/output1.pig"); + + String[] arg = {"date=20080228"}; + String[] argFiles = null; + ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); + + FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); + pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); + + compareResults(pigExResultStream, pigResultStream); log.info("Done"); @@ -110,27 +117,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution from config file failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution from config file failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -152,27 +140,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResultDefault.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -236,27 +205,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution within a value failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution within a value failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -280,27 +230,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -321,27 +253,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResultCmdLnPriorDeclare.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution of command line arg. prior to declare stmt failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution of command line arg. prior to declare stmt failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -364,27 +277,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult4.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution for a command with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution for a command with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -406,27 +301,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -446,27 +323,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -487,27 +346,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Same Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Same Parameter substitution from multiple config files failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -527,27 +368,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); } /* Test case 15,16 @@ -615,27 +438,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 21 @@ -654,27 +458,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } @@ -693,27 +478,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } + compareResults(pigExResultStream, pigResultStream); - inExpected.close(); - inResult.close(); } /* Test case 23 @@ -732,27 +499,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 24 @@ -771,27 +519,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResultMulDecs.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 25 @@ -809,27 +538,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 26 @@ -847,27 +557,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } @@ -886,27 +577,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 29 @@ -924,27 +596,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 30 @@ -962,27 +615,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult2.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case 31 @@ -1001,27 +635,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -1042,27 +657,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/inputNoVars.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; + compareResults(pigExResultStream, pigResultStream); - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); log.info("Done"); } @@ -1083,27 +680,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult3.txt"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -1123,27 +701,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResultComment.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } /* Test case @@ -1212,27 +771,7 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output26.pig"); InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8")); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(expected)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Parameter substitution with shell command failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(expected, pigResultStream); log.info("Done"); } @@ -1252,27 +791,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResultDollarSign.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); } @Test @@ -1289,28 +809,8 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult6.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); log.info("Done"); } @@ -1326,27 +826,9 @@ public class TestParamSubPreproc { FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); pigExResultStream = new FileInputStream(basedir + "/ExpectedResult7.pig"); - BufferedReader inExpected = new BufferedReader(new InputStreamReader(pigExResultStream)); - BufferedReader inResult = new BufferedReader(new InputStreamReader(pigResultStream)); - - String exLine; - String resLine; - int lineNum=0; - - while (true) { - lineNum++; - exLine = inExpected.readLine(); - resLine = inResult.readLine(); - if (exLine==null || resLine==null) - break; - assertEquals("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum ,exLine.trim(), resLine.trim()); - } - if (!(exLine==null && resLine==null)) { - fail ("Command line parameter substitution failed. " + "Expected : "+exLine+" , but got : "+resLine+" in line num : "+lineNum); - } - inExpected.close(); - inResult.close(); + compareResults(pigExResultStream, pigResultStream); + log.info("Done"); } @Test @@ -1398,6 +880,351 @@ public class TestParamSubPreproc { assertEquals(resultContent, "daniel\t10\njenny\t20\n"); } + @Test + public void testCommandLineParamOverwritingDefault() throws Exception { + log.info("Starting test testCommandLineParamOverwritingDefault()"); + File inputFile = Util.createFile( + "runinput", + new String[] { "daniel\t10", + "jenny\t20;"}); + File output1 = File.createTempFile("output1_", ""); + output1.delete(); + + File script1 = Util.createFile("runscript1.pig", + new String[] { "%default output /invalidpathThatShouldFail;", + "a = load 'runinput';", + "store a into '$output';"}); + + PigStats stats = org.apache.pig.PigRunner.run(new String[] { + "-x", Util.getLocalTestMode().toString(), + "-p", "output=" + output1.getAbsolutePath(), + script1.getAbsolutePath()} , null); + Util.deleteDirectory(output1); + + assertTrue("job should succeed", stats.isSuccessful()); + assertTrue("Default param should be overridden by the commandline param", + output1.getAbsolutePath().endsWith(stats.getOutputNames().get(0))); + } + + @Test + public void testRunWithParamOverwritingDefault() throws Exception { + log.info("Starting test testScopeOfParamWithRunCommand()"); + File inputFile = Util.createFile( + "runinput", + new String[] { "daniel\t10", + "jenny\t20;"}); + File output1 = File.createTempFile("output1_", ""); + File output2 = File.createTempFile("output2_", ""); + output1.delete(); + output2.delete(); + + + File script1 = Util.createFile("runscript1.pig", + new String[] { "%default output '" + output2.getAbsolutePath() + "';", + "a = load 'runinput';", + "store a into '$output';"}); + + File mainscript = Util.createFile("mainscript.pig", + new String[] {"run -param output=" + output1.getAbsolutePath() + + " " + script1.getAbsolutePath() + ";"}); + + + PigStats stats = org.apache.pig.PigRunner.run(new String[] { + "-x", Util.getLocalTestMode().toString(), + mainscript.getAbsolutePath()} , null); + Util.deleteDirectory(output1); + Util.deleteDirectory(output2); + + assertTrue("job should succeed", stats.isSuccessful()); + assertEquals("There should only be 1 output.", + 1, stats.getOutputNames().size()); + assertEquals("Output name should be from output1 and not output2", + output1.getAbsolutePath(), + stats.getOutputLocations().get(0)); + } + + @Test + public void testScopeOfParamWithRunCommand() throws Exception { + log.info("Starting test testScopeOfParamWithRunCommand()"); + File inputFile = Util.createFile( + "runinput", + new String[] { "daniel\t10", + "jenny\t20;"}); + File output1 = File.createTempFile("output1_", ""); + File output2 = File.createTempFile("output2_", ""); + output1.delete(); + output2.delete(); + + File script1 = Util.createFile("runscript1.pig", + new String[] { "%default output '" + output1.getAbsolutePath() + "';", + "a = load 'runinput';", + "store a into '$output';"}); + + File script2 = Util.createFile("runscript2.pig", + new String[] { "%default output '" + output2.getAbsolutePath() + "';", + "a = load 'runinput';", + "store a into '$output';"}); + + File mainscript = Util.createFile("mainscript.pig", + new String[] { "run " + script1.getAbsolutePath() + ";", + "run " + script2.getAbsolutePath() + ";" }); + + PigStats stats = org.apache.pig.PigRunner.run(new String[] { + "-x", Util.getLocalTestMode().toString(), + mainscript.getAbsolutePath()} , null); + Util.deleteDirectory(output1); + Util.deleteDirectory(output2); + + assertTrue("job should succeed", stats.isSuccessful()); + assertNotEquals("Two output paths should differ", + stats.getOutputNames().get(0), stats.getOutputNames().get(1)); + assertEquals("Each output should contain 2 records", + 2, stats.getOutputStats().get(0).getNumberRecords()); + assertEquals("Each output should contain 2 records", + 2, stats.getOutputStats().get(1).getNumberRecords()); + } + + @Test + public void testScopeOfParamWithNestedRunCommand() throws Exception { + log.info("Starting test testScopeOfParamWithRunCommand()"); + File inputFile = Util.createFile( + "runinput", + new String[] { "daniel\t10", + "jenny\t20;"}); + /* + * script1 sets a=1, b=2, c=3; calls script2 + * script2 sets b=22 (by -param); calls script3 + * script3 sets c=333; saves $a$b$c (122333) + * script2 saves $a$b$c (1223) + * script1 saves $a$b$c (123) + */ + File script3 = Util.createFile("runscript3.pig", + new String[] { "%declare c '333';", + "a = load 'runinput';", + "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"}); + + File script2 = Util.createFile("runscript2.pig", + new String[] { "run " + script3.getAbsolutePath() + ";", + "a = load 'runinput';", + "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"}); + + File script1 = Util.createFile("runscript1.pig", + new String[] { "%declare a '1';", + "%declare b '2';", + "%declare c '3';", + "run -param b=22 " + script2.getAbsolutePath() + ";", + "a = load 'runinput';", + "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"}); + + PigStats stats = org.apache.pig.PigRunner.run(new String[] { + "-x", Util.getLocalTestMode().toString(), + script1.getAbsolutePath()} , null); + + for( String output : stats.getOutputNames() ) { + assertTrue(output.contains("testScopeOfParamWithNestedRunCommand")); + Util.deleteDirectory(new File(output)); + } + assertTrue("job should succeed", stats.isSuccessful()); + assertEquals("There should be three outputs.", 3, stats.getOutputNames().size()); + + for( String expectedoutput : new String [] {"testScopeOfParamWithNestedRunCommand123", + "testScopeOfParamWithNestedRunCommand1223", + "testScopeOfParamWithNestedRunCommand122333"} ) { + boolean found=false; + for( String output : stats.getOutputNames() ) { + if( output.endsWith(expectedoutput) ) { + found=true; + } + } + assertTrue("Output " + expectedoutput + " should exist.", found); + } + } + + /* This currently does not work since PigMacro only picks the + * param setting from the root script (script1) + * To revisit after Grunt moves to ANTLR in PIG-2597. + * Tracking in PIG-5028. + * + + @Test + public void testScopeOfParamWithMacro() throws Exception { + log.info("Starting test testScopeOfParamWithMacro()"); + File inputFile = Util.createFile( + "runinput", + new String[] { "daniel\t10", + "jenny\t20;"}); + File macro = Util.createFile("testmacro.pig", + new String[] { "DEFINE mymacro (A) RETURNS void {", + "store $A into 'testScopeOfParamWithMacro${a}${b}${c}';", + "};"}); + + File script3 = Util.createFile("runscript3.pig", + new String[] { "%declare c '333';"}); + + File script2 = Util.createFile("runscript2.pig", + new String[] { "%declare b '22';", + "import '" + macro.getAbsolutePath() + "';", + "a = load 'runinput';", + "mymacro(a);", + "exec " + script3.getAbsolutePath() + ";"}); + + File script1 = Util.createFile("runscript1.pig", + new String[] { "%declare a '1';", + "%declare b '2';", + "%declare c '3';", + "exec " + script2.getAbsolutePath() + ";"}); + + PigStats stats = org.apache.pig.PigRunner.run(new String[] { + "-x", Util.getLocalTestMode().toString(), + script1.getAbsolutePath()} , null); + + assertTrue("job should succeed", stats.isSuccessful()); + Util.deleteDirectory(new File(stats.getOutputNames().get(0))); + assertEquals("There should be only 1 output.", 1, stats.getOutputNames().size()); + assertTrue("Expected output testScopeOfParamWithMacro1223 but got " + stats.getOutputNames().get(0), + stats.getOutputNames().get(0).equals("testScopeOfParamWithMacro1223")); + } + */ + + + /* + * Test parameter substition when register contains /* globbing + */ + @Test + public void testSubstitutionWithRegisterGlobbing() throws Exception{ + log.info("Starting test testSubstitutionWithRegisterGlobbing()"); + final String queryString = + "register /abc/$regdir/*.jar;\n" + + "A = LOAD '$input' USING PigStorage ();\n" + + "STORE A INTO '$output';\n" + + " /* comment that would make register globbing to be part of the multi-line comment */\n"; + + + ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); + pigIStream = new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8")))); + pigOStream = new FileWriter(basedir + "/output1.pig"); + + String[] arg = {"input = 'input.txt'", "output = 'output.txt'", "regdir = 'def'"}; + String[] argFiles = null; + ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); + + FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); + + String expectedString = queryString.replaceAll("\\$input","input.txt") + .replaceAll("\\$output","output.txt") + .replaceAll("\\$regdir","def"); + InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8")); + + compareResults(expected, pigResultStream); + + log.info("Done"); + } + + /* + * Test parameter substition when load contains /* globbing + */ + @Test + public void testSubstitutionWithLoadGlobbing() throws Exception{ + log.info("Starting test testSubstitutionWithLoadGlobbing()"); + final String queryString = + "A = LOAD '/zzz/*' USING PigStorage ();\n" + + "STORE A INTO '$output';\n" + + " /* comment that would make register globbing to be part of the multi-line comment */\n"; + + + ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); + pigIStream = new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8")))); + pigOStream = new FileWriter(basedir + "/output1.pig"); + + String[] arg = {"output = 'output.txt'"}; + String[] argFiles = null; + ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); + + FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); + + String expectedString = queryString.replaceAll("\\$output","output.txt"); + InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8")); + + compareResults(expected, pigResultStream); + + log.info("Done"); + } + + @Test + public void testSubstitutionWithRedeclaration() throws Exception{ + log.info("Starting test testSubstitutionWithRedeclaration()"); + final String queryString = + "%declare output '/tmp/abc';\n" + + "%declare actualoutput '$output.out';\n" + + "A = load 'input.txt' ;\n" + + "store A into '$actualoutput';\n" + + "%declare output '/tmp/def';\n" + + "%declare actualoutput '$output.out';\n" + + "store A into '$actualoutput';"; + + + ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); + pigIStream = new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8")))); + pigOStream = new FileWriter(basedir + "/output1.pig"); + + String[] arg = {"output = 'output.txt'"}; + String[] argFiles = null; + ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); + + FileInputStream pigResultStream = new FileInputStream(basedir + "/output1.pig"); + + String expectedString = queryString.replaceAll("%declare [0-9a-zA-Z.'/\\$; ]*\n",";\n") + .replaceAll("\\$","") + .replaceFirst("actualoutput","/tmp/abc.out") + .replaceFirst("actualoutput","/tmp/def.out"); + InputStream expected = new ByteArrayInputStream(expectedString.getBytes("UTF-8")); + + compareResults(expected, pigResultStream); + + log.info("Done"); + } + + @Test + public void testSubstitutionWithRedeclaredShell() throws Exception{ + log.info("Starting test testSubstitutionWithRedeclaredShell()"); + final String queryString = + "A = load 'input.txt' ;\n" + + "%declare now `bash -c \"date +'%Y%m%d_%H:%M:%S'; sleep 1;\"`;\n" + + "store A into '$now';\n" + + "%declare now `bash -c \"date +'%Y%m%d_%H:%M:%S'; sleep 1;\"`;\n" + + "store A into '$now';\n"; + + ParameterSubstitutionPreprocessor ps = new ParameterSubstitutionPreprocessor(50); + pigIStream = new BufferedReader( + new InputStreamReader(new ByteArrayInputStream(queryString.getBytes("UTF-8")))); + pigOStream = new FileWriter(basedir + "/output1.pig"); + + String[] arg = {"output = 'output.txt'"}; + String[] argFiles = null; + ps.genSubstitutedFile(pigIStream , pigOStream , arg , argFiles); + + BufferedReader pigresult = new BufferedReader(new InputStreamReader(new FileInputStream(basedir + "/output1.pig"))); + + + String [] filenames = new String [2]; + int index=0; + String line; + while ((line = pigresult.readLine())!=null) { + if( line.startsWith("store A into") ) { + filenames[index++] = line.split(" ")[3]; + } + } + + assertEquals("There should be 2 store statements", 2, index); + assertNotEquals("Identical shell param should be reexecuted.", + filenames[0], + filenames[1]); + log.info("Done"); + } + @SuppressWarnings("resource") private BufferedReader WithConditionalReplacement(String filename, String orig, String dest, boolean replace) throws IOException { BufferedReader pigOrigIStream = new BufferedReader(new FileReader(filename)); Modified: pig/branches/spark/test/org/apache/pig/test/TestPigContext.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigContext.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigContext.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigContext.java Wed Feb 22 09:43:41 2017 @@ -27,6 +27,7 @@ import java.util.List; import java.util.Properties; import java.util.Random; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; @@ -270,7 +271,7 @@ public class TestPigContext { assertEquals(JOB_TRACKER, pigServer.getPigContext().getProperties().getProperty(MRConfiguration.JOB_TRACKER)); assertEquals(FS_NAME, - pigServer.getPigContext().getProperties().getProperty("fs.default.name")); + pigServer.getPigContext().getProperties().getProperty(FileSystem.FS_DEFAULT_NAME_KEY)); assertEquals(TMP_DIR_PROP, pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir")); } Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Wed Feb 22 09:43:41 2017 @@ -19,6 +19,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -35,6 +36,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; @@ -62,7 +65,10 @@ import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class TestPigRunner { @@ -73,6 +79,9 @@ public class TestPigRunner { private static final String OUTPUT_FILE = "output"; private static final String PIG_FILE = "test.pig"; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @BeforeClass public static void setUpBeforeClass() throws Exception { cluster = MiniGenericCluster.buildCluster(); @@ -803,10 +812,9 @@ public class TestPigRunner { } @Test + @Ignore + // Skip in hadoop 23 test, see PIG-2449 public void classLoaderTest() throws Exception { - // Skip in hadoop 23 test, see PIG-2449 - if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) - return; PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("register test/org/apache/pig/test/data/pigtestloader.jar"); w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();"); @@ -1147,8 +1155,13 @@ public class TestPigRunner { List<OutputStats> outputs = stats.getOutputStats(); assertEquals(2, outputs.size()); if (execType.equals("tez")) { - assertEquals(outputs.get(0).getNumberRecords(), 5); - assertEquals(outputs.get(1).getNumberRecords(), 2); + if( outputs.get(0).getLocation().endsWith("tmp/output") ) { + assertEquals(2, outputs.get(0).getNumberRecords()); + assertEquals(5, outputs.get(1).getNumberRecords()); + } else { + assertEquals(5, outputs.get(0).getNumberRecords()); + assertEquals(2, outputs.get(1).getNumberRecords()); + } } else { for (OutputStats outstats : outputs) { // the multi-output counters are disabled @@ -1214,6 +1227,77 @@ public class TestPigRunner { Util.deleteFile(cluster, "tmp/output"); } } + + @Test + public void testStoredScriptContents() throws Exception { + String scriptContents = "sh echo success;\n"; + FileUtils.writeStringToFile(new File(PIG_FILE), scriptContents); + Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE); + + Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE); + try { + runAndValidateStoredScriptContents(PIG_FILE, scriptContents); + runAndValidateStoredScriptContents(inputInDfs.toString(), scriptContents); + } finally { + FileUtils.deleteQuietly(new File(PIG_FILE)); + Util.deleteQuietly(cluster, PIG_FILE); + } + } + + @Test + public void testErrorLogUnderCustomDir() throws Exception { + try (PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE))) { + w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);"); + w.println("B = foreach A generate StringSize(a0);"); + w.println("store B into '" + OUTPUT_FILE + "';"); + } + Util.copyFromLocalToCluster(cluster, PIG_FILE, PIG_FILE); + + Path inputInDfs = new Path(cluster.getFileSystem().getHomeDirectory(), PIG_FILE); + try { + runAndValidateCustomErrorLogDir(PIG_FILE); + runAndValidateCustomErrorLogDir(inputInDfs.toString()); + } finally { + FileUtils.deleteQuietly(new File(PIG_FILE)); + Util.deleteQuietly(cluster, PIG_FILE); + } + } + + private void runAndValidateStoredScriptContents(String scriptPath, String expectedContents) { + PigStats stats = runPigLocally(scriptPath); + assertTrue(stats.isSuccessful()); + assertEquals(expectedContents, stats.getScript()); + + stats = runPigLocally("-f", scriptPath); + assertTrue(stats.isSuccessful()); + assertEquals(expectedContents, stats.getScript()); + } + + private void runAndValidateCustomErrorLogDir(String scriptPath) throws IOException { + File logsFolder = temporaryFolder.newFolder(); + String logsPath = logsFolder.getAbsolutePath(); + assertFileCountUnderDir(logsFolder, 0); + + PigStats stats = runPigLocally("-l", logsPath, scriptPath); + assertFalse(stats.isSuccessful()); + assertFileCountUnderDir(logsFolder, 1); + + stats = runPigLocally("-l", logsPath, "-f", scriptPath); + assertFalse(stats.isSuccessful()); + assertFileCountUnderDir(logsFolder, 2); + } + + private void assertFileCountUnderDir(File directory, int expectedFileCount) throws IOException { + String[] files = directory.list(); + assertNotNull(files); + assertEquals(expectedFileCount, files.length); + } + + private PigStats runPigLocally(String... extraArgs) { + String[] args = ArrayUtils.addAll(new String[]{"-x", "local"}, extraArgs); + return PigRunner.run(args, new TestNotificationListener("local")); + } + public static class TestNotificationListener implements PigProgressNotificationListener { private Map<String, int[]> numMap = new HashMap<String, int[]>(); Modified: pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigScriptParser.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,9 @@ import java.util.Properties; import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.builtin.mock.Storage; +import org.apache.pig.builtin.mock.Storage.Data; +import static org.apache.pig.builtin.mock.Storage.tuple; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -99,7 +102,7 @@ public class TestPigScriptParser { @Test public void testDefineUDF() throws Exception { - PigServer ps = new PigServer(ExecType.LOCAL); + PigServer ps = new PigServer(Util.getLocalTestMode()); String inputData[] = { "dshfdskfwww.xyz.com/sportsjoadfjdslpdshfdskfwww.xyz.com/sportsjoadfjdsl" , "kas;dka;sd" , @@ -156,6 +159,79 @@ public class TestPigScriptParser { } } + @Test + public void testBackSlashOnly() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); + Data data = Storage.resetData(pig); + data.set("input", tuple("abc"), tuple("\\bcd"), tuple("'cde"), tuple("def\\\\")); + + String query = + "A = load 'input' USING mock.Storage() as (a0:chararray);\n" + // java String is escaping "\" so the following line is equivalent of + // B = FILTER A by STARTSWITH(a0,'\\'); in the pig script + + "B = FILTER A by STARTSWITH(a0,'\\\\');\n" + + "store B into 'out' using mock.Storage;" ; + + Util.registerMultiLineQuery(pig, query); + List<Tuple> list = data.get("out"); + + assertEquals("There should be only one match", 1, list.size()); + Tuple t = list.get(0); + assertEquals("result should have only one field", 1, t.size() ); + assertEquals("\\bcd",(String) t.get(0)); + } + + + @Test + public void testBackSlashSingleQuote() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); + Data data = Storage.resetData(pig); + data.set("input", tuple("abc"), tuple("\\bcd"), tuple("'cde"), tuple("def\\\\")); + + String query = + "A = load 'input' USING mock.Storage() as (a0:chararray);\n" + // java String is escaping "\" so the following line is equivalent of + // B = FILTER A by STARTSWITH(a0,'\''); in the pig script + + "B = FILTER A by STARTSWITH(a0,'\\'');\n" + + "store B into 'out' using mock.Storage;" ; + + Util.registerMultiLineQuery(pig, query); + List<Tuple> list = data.get("out"); + + assertEquals("There should be only one match", 1, list.size()); + Tuple t = list.get(0); + assertEquals("result should have only one field", 1, t.size() ); + assertEquals("'cde",(String) t.get(0)); + } + + @Test + public void testBackSlashReplace() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); + Data data = Storage.resetData(pig); + //After java escaping, these tuples have + //'abc', '\bcd' and 'def\\' respectively + data.set("input", tuple("abc"), tuple("\\bcd"), tuple("def\\\\")); + + String query = + "A = load 'input' USING mock.Storage() as (a0:chararray);\n" + // java String is escaping "\" so the following line is equivalent of + //"B = FOREACH A GENERATE REPLACE(a0,'\\\\','+');\n" + + "B = FOREACH A GENERATE REPLACE(a0,'\\\\\\\\','+');\n" + + "store B into 'out' using mock.Storage;" ; + + // REPLACE(a0,'\\\\','+') + // --> Pig parser unescape and pass "\\" to REPLACE UDF. + // --> REPLACE UDF calls, Pattern.compile("\\"); which + // matches "\" + + Util.registerMultiLineQuery(pig, query); + List<Tuple> list = data.get("out"); + + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] {"('abc')","('+bcd')", "('def++')"}); + Util.checkQueryOutputsAfterSort(list, expectedRes); + } private void checkParsedConstContent(PigServer pigServer, PigContext pigContext, String query, Modified: pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestPigStatsMR.java Wed Feb 22 09:43:41 2017 @@ -103,11 +103,7 @@ public class TestPigStatsMR extends Test private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception { MapReduceLauncher launcher = new MapReduceLauncher(); - java.lang.reflect.Method compile = launcher.getClass() - .getDeclaredMethod("compile", - new Class[] { PhysicalPlan.class, PigContext.class }); - compile.setAccessible(true); - return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx }); + return launcher.compile(pp,ctx); } private static String getAlias(MapReduceOper mro) throws Exception {