Modified: pig/trunk/test/org/apache/pig/test/TestPigServer.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServer.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigServer.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigServer.java Tue Jan 20 22:16:33 2015 @@ -18,15 +18,12 @@ package org.apache.pig.test; -import static org.apache.pig.builtin.mock.Storage.resetData; -import static org.apache.pig.builtin.mock.Storage.tuple; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; @@ -36,15 +33,11 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintStream; import java.io.PrintWriter; -import java.io.StringReader; import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.Random; @@ -54,15 +47,10 @@ import javax.xml.parsers.DocumentBuilder import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; import org.apache.pig.ExecType; import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; -import org.apache.pig.ResourceSchema; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.builtin.mock.Storage; -import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -72,8 +60,6 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.PropertiesUtil; import org.apache.pig.impl.util.Utils; -import org.apache.pig.tools.grunt.Grunt; -import org.apache.pig.tools.grunt.GruntParser; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -93,7 +79,6 @@ public class TestPigServer { @Before public void setUp() throws Exception{ - Util.resetStateForExecModeSwitch(); tempDir = Files.createTempDir(); tempDir.deleteOnExit(); registerNewResource(tempDir.getAbsolutePath()); @@ -665,94 +650,6 @@ public class TestPigServer { assertFalse(iter.hasNext()); } - @Test - public void testParamSubstitution() throws Exception{ - // using params map - PigServer pig=new PigServer(ExecType.LOCAL); - Map<String,String> params=new HashMap<String, String>(); - params.put("input", "test/org/apache/pig/test/data/passwd"); - File scriptFile=Util.createFile(new String[]{"a = load '$input' using PigStorage(':');"}); - pig.registerScript(scriptFile.getAbsolutePath(),params); - Iterator<Tuple> iter=pig.openIterator("a"); - int index=0; - List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - - // using param file - pig=new PigServer(ExecType.LOCAL); - List<String> paramFile=new ArrayList<String>(); - paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath()); - pig.registerScript(scriptFile.getAbsolutePath(),paramFile); - iter=pig.openIterator("a"); - index=0; - expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - - // using both param value and param file, param value should override param file - pig=new PigServer(ExecType.LOCAL); - pig.registerScript(scriptFile.getAbsolutePath(),params,paramFile); - iter=pig.openIterator("a"); - index=0; - expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - } - - // build the pig script from in-memory, and wrap it as ByteArrayInputStream - @Test - public void testRegisterScriptFromStream() throws Exception{ - // using params map - PigServer pig=new PigServer(ExecType.LOCAL); - Map<String,String> params=new HashMap<String, String>(); - params.put("input", "test/org/apache/pig/test/data/passwd"); - String script="a = load '$input' using PigStorage(':');"; - pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params); - Iterator<Tuple> iter=pig.openIterator("a"); - int index=0; - List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - - // using param file - pig=new PigServer(ExecType.LOCAL); - List<String> paramFile=new ArrayList<String>(); - paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath()); - pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),paramFile); - iter=pig.openIterator("a"); - index=0; - expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - - // using both param value and param file, param value should override param file - pig=new PigServer(ExecType.LOCAL); - pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params,paramFile); - iter=pig.openIterator("a"); - index=0; - expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); - while(iter.hasNext()){ - Tuple tuple=iter.next(); - assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); - index++; - } - } @Test public void testPigProperties() throws Throwable { @@ -896,93 +793,6 @@ public class TestPigServer { } @Test - public void testSecondarySort() throws Exception { - PigServer pigServer = new PigServer(ExecType.LOCAL); - Data data = resetData(pigServer); - - data.set("foo", - tuple("a", 1, "b"), - tuple("b", 2, "c"), - tuple("c", 3, "d") - ); - - pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("B = order A by f1,f2,f3 DESC;"); - pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();"); - - List<Tuple> out = data.get("bar"); - assertEquals(tuple("a", 1, "b"), out.get(0)); - assertEquals(tuple("b", 2, "c"), out.get(1)); - assertEquals(tuple("c", 3, "d"), out.get(2)); - } - - @Test(expected = RuntimeException.class) - public void testLocationStrictCheck() throws ExecException, IOException { - Properties properties = PropertiesUtil.loadDefaultProperties(); - properties.setProperty("pig.location.check.strict", "true"); - PigServer pigServer = new PigServer(ExecType.LOCAL, properties); - Data data = resetData(pigServer); - - data.set("foo", - tuple("a", 1, "b"), - tuple("b", 2, "c"), - tuple("c", 3, "d")); - - pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("B = order A by f1,f2,f3 DESC;"); - pigServer.registerQuery("C = order A by f1,f2,f3;"); - // Storing to same location 'bar' should throw a RuntimeException - pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();"); - pigServer.registerQuery("STORE C INTO 'bar' USING mock.Storage();"); - - List<Tuple> out = data.get("bar"); - assertEquals(tuple("a", 1, "b"), out.get(0)); - assertEquals(tuple("b", 2, "c"), out.get(1)); - assertEquals(tuple("c", 3, "d"), out.get(2)); - } - - @Test - public void testSkipParseInRegisterForBatch() throws Throwable { - // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader, - // InputSizeReducerEstimator, getSplits->RandomSampleLoader, - // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) - // numTimesSchemaCalled = 4 (once per registerQuery) - _testSkipParseInRegisterForBatch(false, 10, 4); - // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader, - // InputSizeReducerEstimator, getSplits->RandomSampleLoader, - // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) - // numTimesSchemaCalled = 1 (parseAndBuild) - _testSkipParseInRegisterForBatch(true, 7, 1); - } - - @Test - // See PIG-3967 - public void testGruntValidation() throws IOException { - PigServer pigServer = new PigServer(ExecType.LOCAL); - Data data = resetData(pigServer); - - data.set("foo", - tuple("a", 1, "b"), - tuple("b", 2, "c"), - tuple("c", 3, "d")); - - pigServer.setValidateEachStatement(true); - pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';"); - pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); - pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass - boolean validationExceptionCaptured = false; - try { - // This should fail due to output validation - pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';"); - } catch (FrontendException e) { - validationExceptionCaptured = true; - } - - assertTrue(validationExceptionCaptured); - } - - @Test // See PIG-4109 public void testRegisterJarRemoteScript() throws Throwable { if (Util.WINDOWS) { @@ -992,57 +802,4 @@ public class TestPigServer { pig.registerJar(jarName); } } - - private void _testSkipParseInRegisterForBatch(boolean skipParseInRegisterForBatch, - int numTimesInitiated, int numTimesSchemaCalled) throws Throwable { - MockTrackingStorage.numTimesInitiated = 0; - MockTrackingStorage.numTimesSchemaCalled = 0; - String query = "A = LOAD 'foo' USING " + MockTrackingStorage.class.getName() + "();\n" + - "B = order A by $0,$1,$2;\n" + - "C = LIMIT B 2;\n" + - "STORE C INTO 'bar' USING mock.Storage();\n"; - BufferedReader in = new BufferedReader(new StringReader(query)); - Properties properties = new Properties(); - properties.setProperty("io.sort.mb", "2"); - PigContext pigContext = new PigContext(ExecType.LOCAL, properties); - Data data; - if (skipParseInRegisterForBatch) { - data = resetData(pigContext); - data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d")); - Grunt grunt = new Grunt(in, pigContext); - grunt.exec(); // Calls grunt.parseStopOnError(); which executes as batch - } - else { - PigServer pigServer = new PigServer(pigContext); - data = resetData(pigServer); - data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d")); - GruntParser grunt = new GruntParser(in, pigServer); - grunt.setInteractive(false); - grunt.parseStopOnError(true); //not batch - } - - assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated); - assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled); - List<Tuple> out = data.get("bar"); - assertEquals(2, out.size()); - assertEquals(tuple("a", 1, "b"), out.get(0)); - assertEquals(tuple("b", 2, "c"), out.get(1)); - } - - public static class MockTrackingStorage extends Storage { - - public static int numTimesInitiated = 0; - public static int numTimesSchemaCalled = 0; - - public MockTrackingStorage() { - super(); - numTimesInitiated++; - } - - @Override - public ResourceSchema getSchema(String location, Job job) throws IOException { - numTimesSchemaCalled++; - return super.getSchema(location, job); - } - } }
Added: pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java?rev=1653391&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java (added) +++ pig/trunk/test/org/apache/pig/test/TestPigServerLocal.java Tue Jan 20 22:16:33 2015 @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.apache.pig.builtin.mock.Storage.resetData; +import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.pig.PigServer; +import org.apache.pig.ResourceSchema; +import org.apache.pig.builtin.mock.Storage; +import org.apache.pig.builtin.mock.Storage.Data; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.util.PropertiesUtil; +import org.apache.pig.tools.grunt.Grunt; +import org.apache.pig.tools.grunt.GruntParser; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.io.Files; + +public class TestPigServerLocal { + private File tempDir; + + @Before + public void setUp() throws Exception{ + tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + registerNewResource(tempDir.getAbsolutePath()); + } + + // dynamically add more resources to the system class loader + private static void registerNewResource(String file) throws Exception { + URL urlToAdd = new File(file).toURI().toURL(); + URLClassLoader sysLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); + Method addMethod = URLClassLoader.class. + getDeclaredMethod("addURL", new Class[]{URL.class}); + addMethod.setAccessible(true); + addMethod.invoke(sysLoader, new Object[]{urlToAdd}); + } + + @Test + public void testParamSubstitution() throws Exception{ + // using params map + PigServer pig=new PigServer(Util.getLocalTestMode()); + Map<String,String> params=new HashMap<String, String>(); + params.put("input", "test/org/apache/pig/test/data/passwd"); + File scriptFile=Util.createFile(new String[]{"a = load '$input' using PigStorage(':');"}); + pig.registerScript(scriptFile.getAbsolutePath(),params); + Iterator<Tuple> iter=pig.openIterator("a"); + int index=0; + List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + + // using param file + pig=new PigServer(Util.getLocalTestMode()); + List<String> paramFile=new ArrayList<String>(); + paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath()); + pig.registerScript(scriptFile.getAbsolutePath(),paramFile); + iter=pig.openIterator("a"); + index=0; + expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + + // using both param value and param file, param value should override param file + pig=new PigServer(Util.getLocalTestMode()); + pig.registerScript(scriptFile.getAbsolutePath(),params,paramFile); + iter=pig.openIterator("a"); + index=0; + expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + } + + // build the pig script from in-memory, and wrap it as ByteArrayInputStream + @Test + public void testRegisterScriptFromStream() throws Exception{ + // using params map + PigServer pig=new PigServer(Util.getLocalTestMode()); + Map<String,String> params=new HashMap<String, String>(); + params.put("input", "test/org/apache/pig/test/data/passwd"); + String script="a = load '$input' using PigStorage(':');"; + pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params); + Iterator<Tuple> iter=pig.openIterator("a"); + int index=0; + List<Tuple> expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + + // using param file + pig=new PigServer(Util.getLocalTestMode()); + List<String> paramFile=new ArrayList<String>(); + paramFile.add(Util.createFile(new String[]{"input=test/org/apache/pig/test/data/passwd2"}).getAbsolutePath()); + pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),paramFile); + iter=pig.openIterator("a"); + index=0; + expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd2", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + + // using both param value and param file, param value should override param file + pig=new PigServer(Util.getLocalTestMode()); + pig.registerScript(new ByteArrayInputStream(script.getBytes("UTF-8")),params,paramFile); + iter=pig.openIterator("a"); + index=0; + expectedTuples=Util.readFile2TupleList("test/org/apache/pig/test/data/passwd", ":"); + while(iter.hasNext()){ + Tuple tuple=iter.next(); + assertEquals(tuple.get(0).toString(), expectedTuples.get(index).get(0).toString()); + index++; + } + } + + @Test + public void testSecondarySort() throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode()); + Data data = resetData(pigServer); + + data.set("foo", + tuple("a", 1, "b"), + tuple("b", 2, "c"), + tuple("c", 3, "d") + ); + + pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); + pigServer.registerQuery("B = order A by f1,f2,f3 DESC;"); + pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();"); + + List<Tuple> out = data.get("bar"); + assertEquals(tuple("a", 1, "b"), out.get(0)); + assertEquals(tuple("b", 2, "c"), out.get(1)); + assertEquals(tuple("c", 3, "d"), out.get(2)); + } + + @Test(expected = RuntimeException.class) + public void testLocationStrictCheck() throws Exception { + Properties properties = PropertiesUtil.loadDefaultProperties(); + properties.setProperty("pig.location.check.strict", "true"); + PigServer pigServer = new PigServer(Util.getLocalTestMode(), properties); + Data data = resetData(pigServer); + + data.set("foo", + tuple("a", 1, "b"), + tuple("b", 2, "c"), + tuple("c", 3, "d")); + + pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); + pigServer.registerQuery("B = order A by f1,f2,f3 DESC;"); + pigServer.registerQuery("C = order A by f1,f2,f3;"); + // Storing to same location 'bar' should throw a RuntimeException + pigServer.registerQuery("STORE B INTO 'bar' USING mock.Storage();"); + pigServer.registerQuery("STORE C INTO 'bar' USING mock.Storage();"); + + List<Tuple> out = data.get("bar"); + assertEquals(tuple("a", 1, "b"), out.get(0)); + assertEquals(tuple("b", 2, "c"), out.get(1)); + assertEquals(tuple("c", 3, "d"), out.get(2)); + } + + @Test + public void testSkipParseInRegisterForBatch() throws Throwable { + // numTimesInitiated = 10. 4 (once per registerQuery) + 6 (launchPlan->RandomSampleLoader, + // InputSizeReducerEstimator, getSplits->RandomSampleLoader, + // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) + // numTimesSchemaCalled = 4 (once per registerQuery) + if (Util.getLocalTestMode().toString().startsWith("TEZ")) { + _testSkipParseInRegisterForBatch(false, 6, 4); + _testSkipParseInRegisterForBatch(true, 3, 1); + } else { + _testSkipParseInRegisterForBatch(false, 10, 4); + _testSkipParseInRegisterForBatch(true, 7, 1); + } + // numTimesInitiated = 7 (parseAndBuild, launchPlan->RandomSampleLoader, + // InputSizeReducerEstimator, getSplits->RandomSampleLoader, + // createRecordReader->RandomSampleLoader, getSplits, createRecordReader) + // numTimesSchemaCalled = 1 (parseAndBuild) + } + + @Test + // See PIG-3967 + public void testGruntValidation() throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode()); + Data data = resetData(pigServer); + + data.set("foo", + tuple("a", 1, "b"), + tuple("b", 2, "c"), + tuple("c", 3, "d")); + + pigServer.setValidateEachStatement(true); + pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); + pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation1';"); + pigServer.registerQuery("B = LOAD 'foo' USING mock.Storage() AS (f1:chararray,f2:int,f3:chararray);"); + pigServer.registerQuery("store B into '" + Util.generateURI(tempDir.toString(), pigServer.getPigContext()) + "/testGruntValidation2';"); // This should pass + boolean validationExceptionCaptured = false; + try { + // This should fail due to output validation + pigServer.registerQuery("store A into '" + Util.generateURI(tempDir.toString(),pigServer.getPigContext()) + "/testGruntValidation1';"); + } catch (FrontendException e) { + validationExceptionCaptured = true; + } + + assertTrue(validationExceptionCaptured); + } + + private void _testSkipParseInRegisterForBatch(boolean skipParseInRegisterForBatch, + int numTimesInitiated, int numTimesSchemaCalled) throws Throwable { + MockTrackingStorage.numTimesInitiated = 0; + MockTrackingStorage.numTimesSchemaCalled = 0; + String query = "A = LOAD 'foo' USING " + MockTrackingStorage.class.getName() + "();\n" + + "B = order A by $0,$1,$2;\n" + + "C = LIMIT B 2;\n" + + "STORE C INTO 'bar' USING mock.Storage();\n"; + BufferedReader in = new BufferedReader(new StringReader(query)); + Properties properties = new Properties(); + properties.setProperty("io.sort.mb", "2"); + PigContext pigContext = new PigContext(Util.getLocalTestMode(), properties); + Data data; + if (skipParseInRegisterForBatch) { + data = resetData(pigContext); + data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d")); + Grunt grunt = new Grunt(in, pigContext); + grunt.exec(); // Calls grunt.parseStopOnError(); which executes as batch + } + else { + PigServer pigServer = new PigServer(pigContext); + data = resetData(pigServer); + data.set("foo", tuple("a", 1, "b"), tuple("b", 2, "c"), tuple("c", 3, "d")); + GruntParser grunt = new GruntParser(in, pigServer); + grunt.setInteractive(false); + grunt.parseStopOnError(true); //not batch + } + + assertEquals(numTimesInitiated, MockTrackingStorage.numTimesInitiated); + assertEquals(numTimesSchemaCalled, MockTrackingStorage.numTimesSchemaCalled); + List<Tuple> out = data.get("bar"); + assertEquals(2, out.size()); + assertEquals(tuple("a", 1, "b"), out.get(0)); + assertEquals(tuple("b", 2, "c"), out.get(1)); + } + + public static class MockTrackingStorage extends Storage { + + public static int numTimesInitiated = 0; + public static int numTimesSchemaCalled = 0; + + public MockTrackingStorage() { + super(); + numTimesInitiated++; + } + + @Override + public ResourceSchema getSchema(String location, Job job) throws IOException { + numTimesSchemaCalled++; + return super.getSchema(location, job); + } + } +} Modified: pig/trunk/test/org/apache/pig/test/TestPigStats.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigStats.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigStats.java Tue Jan 20 22:16:33 2015 @@ -33,27 +33,23 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.newplan.logical.relational.LogicalPlan; import org.apache.pig.tools.pigstats.PigStats; -import org.apache.pig.tools.pigstats.PigStats.JobGraph; -import org.apache.pig.tools.pigstats.ScriptState; -import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; +import org.junit.Ignore; import org.junit.Test; -public class TestPigStats { +@Ignore +abstract public class TestPigStats { private static final Log LOG = LogFactory.getLog(TestPigStats.class); + abstract public void addSettingsToConf(Configuration conf, String scriptFileName); + @Test public void testPigScriptInConf() throws Exception { PrintWriter w = new PrintWriter(new FileWriter("test.pig")); @@ -63,11 +59,8 @@ public class TestPigStats { w.println("register /mydir/lib/jackson-mapper-asl-1.4.2.jar"); w.close(); - MRScriptState ss = MRScriptState.get(); - ss.setScript(new File("test.pig")); Configuration conf = new Configuration(); - MapReduceOper mro = new MapReduceOper(new OperatorKey()); - ss.addSettingsToConf(mro, conf); + addSettingsToConf(conf, "test.pig"); String s = conf.get("pig.script"); String script = new String(Base64.decodeBase64(s.getBytes())); @@ -100,11 +93,8 @@ public class TestPigStats { Util.createLocalInputFile( "testScript.py", script); - MRScriptState ss = MRScriptState.get(); - ss.setScript(new File("testScript.py")); Configuration conf = new Configuration(); - MapReduceOper mro = new MapReduceOper(new OperatorKey()); - ss.addSettingsToConf(mro, conf); + addSettingsToConf(conf, "testScript.py"); String s = conf.get("pig.script"); String actual = new String(Base64.decodeBase64(s.getBytes())); @@ -127,7 +117,7 @@ public class TestPigStats { } @Test - public void testBytesWritten_JIRA_1027() { + public void testBytesWritten_JIRA_1027() throws Exception { File outputFile = null; try { @@ -135,11 +125,12 @@ public class TestPigStats { outputFile = File.createTempFile(fileName, ".out"); String filePath = outputFile.getAbsolutePath(); outputFile.delete(); - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.registerQuery("A = load 'test/org/apache/pig/test/data/passwd';"); ExecJob job = pig.store("A", filePath); PigStats stats = job.getStatistics(); - File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" ); + File dataFile = Util.getFirstPartFile(outputFile); + // This check fails in MR due to lack of counters in local mode assertEquals(dataFile.length(), stats.getBytesWritten()); } catch (IOException e) { LOG.error("Error while generating file", e); @@ -152,11 +143,13 @@ public class TestPigStats { } } } - + + abstract public void checkPigStatsAlias(PhysicalPlan pp, PigContext pc) throws Exception; + @Test public void testPigStatsAlias() throws Exception { try { - PigServer pig = new PigServer(ExecType.LOCAL); + PigServer pig = new PigServer(Util.getLocalTestMode()); pig.setBatchOn(); pig.registerQuery("A = load 'input' as (name, age, gpa);"); pig.registerQuery("B = group A by name;"); @@ -167,19 +160,9 @@ public class TestPigStats { LogicalPlan lp = getLogicalPlan(pig); lp.optimize(pig.getPigContext()); - PhysicalPlan pp = ((MRExecutionEngine)pig.getPigContext().getExecutionEngine()).compile(lp, + PhysicalPlan pp = ((HExecutionEngine)pig.getPigContext().getExecutionEngine()).compile(lp, null); - MROperPlan mp = getMRPlan(pp, pig.getPigContext()); - assertEquals(4, mp.getKeys().size()); - - MapReduceOper mro = mp.getRoots().get(0); - assertEquals("A,B,C", getAlias(mro)); - - mro = mp.getSuccessors(mro).get(0); - assertEquals("D", getAlias(mro)); - - mro = mp.getSuccessors(mro).get(0); - assertEquals("D", getAlias(mro)); + checkPigStatsAlias(pp, pig.getPigContext()); } finally { File outputfile = new File("alias_output"); if (outputfile.exists()) { @@ -189,25 +172,25 @@ public class TestPigStats { } } } - + + abstract public void checkPigStats(ExecJob job); + @Test - public void testPigStatsGetList() { + public void testPigStatsGetList() throws Exception { File outputFile = null; try { String filename = this.getClass().getSimpleName() + "_" + "testPigStatsGetList"; outputFile = File.createTempFile(filename, ".out"); String filePath = outputFile.getAbsolutePath(); outputFile.delete(); - PigServer pigServer = new PigServer(ExecType.LOCAL); + PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery("a = load 'test/org/apache/pig/test/data/passwd';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, COUNT(a) as cnt;"); pigServer.registerQuery("d = group c by cnt;"); pigServer.registerQuery("e = foreach d generate group;"); ExecJob job = pigServer.store("e", filePath); - JobGraph jobGraph = job.getStatistics().getJobGraph(); - assertEquals(2, jobGraph.getJobList().size()); - + checkPigStats(job); } catch (IOException e) { LOG.error("IOException while creating file ", e); fail("Encountered IOException"); @@ -232,23 +215,5 @@ public class TestPigStats { buildLp.setAccessible(true); return (LogicalPlan ) buildLp.invoke( pig ); } - - public static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception { - MapReduceLauncher launcher = new MapReduceLauncher(); - java.lang.reflect.Method compile = launcher.getClass() - .getDeclaredMethod("compile", - new Class[] { PhysicalPlan.class, PigContext.class }); - compile.setAccessible(true); - return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx }); - } - - public static String getAlias(MapReduceOper mro) throws Exception { - ScriptState ss = ScriptState.get(); - java.lang.reflect.Method getAlias = ss.getClass() - .getDeclaredMethod("getAlias", - new Class[] { MapReduceOper.class }); - getAlias.setAccessible(true); - return (String)getAlias.invoke(ss, new Object[] { mro }); - } - + } Added: pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java?rev=1653391&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java (added) +++ pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java Tue Jan 20 22:16:33 2015 @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.PigStats.JobGraph; +import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; + +public class TestPigStatsMR extends TestPigStats { + @Override + public void addSettingsToConf(Configuration conf, String scriptFileName) { + MRScriptState ss = MRScriptState.get(); + ss.setScript(new File(scriptFileName)); + MapReduceOper mro = new MapReduceOper(new OperatorKey()); + ss.addSettingsToConf(mro, conf); + } + + @Override + public void checkPigStats(ExecJob job) { + JobGraph jobGraph = job.getStatistics().getJobGraph(); + assertEquals(2, jobGraph.getJobList().size()); + } + + @Override + public void checkPigStatsAlias(PhysicalPlan pp, PigContext pc) throws Exception { + MROperPlan mp = getMRPlan(pp, pc); + assertEquals(4, mp.getKeys().size()); + + MapReduceOper mro = mp.getRoots().get(0); + assertEquals("A,B,C", getAlias(mro)); + + mro = mp.getSuccessors(mro).get(0); + assertEquals("D", getAlias(mro)); + + mro = mp.getSuccessors(mro).get(0); + assertEquals("D", getAlias(mro)); + } + + private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception { + MapReduceLauncher launcher = new MapReduceLauncher(); + java.lang.reflect.Method compile = launcher.getClass() + .getDeclaredMethod("compile", + new Class[] { PhysicalPlan.class, PigContext.class }); + compile.setAccessible(true); + return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx }); + } + + private static String getAlias(MapReduceOper mro) throws Exception { + ScriptState ss = ScriptState.get(); + java.lang.reflect.Method getAlias = ss.getClass() + .getDeclaredMethod("getAlias", + new Class[] { MapReduceOper.class }); + getAlias.setAccessible(true); + return (String)getAlias.invoke(ss, new Object[] { mro }); + } +} Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Tue Jan 20 22:16:33 2015 @@ -109,7 +109,7 @@ public class TestPruneColumn { FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0); logger.addAppender(appender); - pigServer = new PigServer("local"); + pigServer = new PigServer(Util.getLocalTestMode()); tmpFile1 = File.createTempFile("prune", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile1)); ps.println("1\t2\t3"); Modified: pig/trunk/test/org/apache/pig/test/TestRank3.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRank3.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestRank3.java (original) +++ pig/trunk/test/org/apache/pig/test/TestRank3.java Tue Jan 20 22:16:33 2015 @@ -49,7 +49,7 @@ public class TestRank3 { @Before public void setUp() throws Exception { try { - pigServer = new PigServer("local"); + pigServer = new PigServer(Util.getLocalTestMode()); data = resetData(pigServer); data.set("empty"); Modified: pig/trunk/test/org/apache/pig/test/TestScalarAliases.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScalarAliases.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestScalarAliases.java (original) +++ pig/trunk/test/org/apache/pig/test/TestScalarAliases.java Tue Jan 20 22:16:33 2015 @@ -17,7 +17,6 @@ */ package org.apache.pig.test; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,30 +25,16 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.junit.AfterClass; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; public class TestScalarAliases { - private static final String BUILD_TEST_TMP = "build/test/tmp/"; static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); private PigServer pigServer; - TupleFactory mTf = TupleFactory.getInstance(); - BagFactory mBf = BagFactory.getInstance(); - - @Before - public void setUp() throws Exception{ - Util.resetStateForExecModeSwitch(); - pigServer = new PigServer(ExecType.LOCAL); - } - @AfterClass public static void oneTimeTearDown() throws Exception { cluster.shutDown(); @@ -69,417 +54,7 @@ public class TestScalarAliases { // See PIG-1434 @Test - public void testScalarAliasesBatchNobatch() throws Exception{ - String[] input = { - "1\t5", - "2\t10", - "3\t20" - }; - - String output = BUILD_TEST_TMP+"table_testScalarAliasesDir"; - TestScalarAliases.deleteDirectory(new File(output)); - // Test the use of scalars in expressions - String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesBatch"; - TestScalarAliases.createLocalInputFile(inputPath, input); - // Test in script mode - pigServer.setBatchOn(); - pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); - pigServer.registerQuery("B = group A all;"); - pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); - pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); - pigServer.registerQuery("Store Y into '" + output + "';"); - pigServer.executeBatch(); - // Check output - pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); - - Iterator<Tuple> iter; - Tuple t; - iter = pigServer.openIterator("Z"); - - t = iter.next(); - assertTrue(t.toString().equals("(3,0.25)")); - - t = iter.next(); - assertTrue(t.toString().equals("(6,0.5)")); - - t = iter.next(); - assertTrue(t.toString().equals("(9,1.0)")); - - assertFalse(iter.hasNext()); - - iter = pigServer.openIterator("Y"); - - t = iter.next(); - assertTrue(t.toString().equals("(3,0.25)")); - - t = iter.next(); - assertTrue(t.toString().equals("(6,0.5)")); - - t = iter.next(); - assertTrue(t.toString().equals("(9,1.0)")); - - assertFalse(iter.hasNext()); - - - } - - // See PIG-1434 - @Test - public void testUseScalarMultipleTimes() throws Exception{ - String[] input = { - "1\t5", - "2\t10", - "3\t20" - }; - - String outputY = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutY"; - TestScalarAliases.deleteDirectory(new File(outputY)); - String outputZ = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutZ"; - TestScalarAliases.deleteDirectory(new File(outputZ)); - // Test the use of scalars in expressions - String inputPath = BUILD_TEST_TMP+"table_testUseScalarMultipleTimes"; - TestScalarAliases.createLocalInputFile(inputPath, input); - pigServer.setBatchOn(); - pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); - pigServer.registerQuery("B = group A all;"); - pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); - pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); - pigServer.registerQuery("Store Y into '" + outputY + "';"); - pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);"); - pigServer.registerQuery("Store Z into '" + outputZ + "';"); - // Test Multiquery store - pigServer.executeBatch(); - - // Check output - pigServer.registerQuery("M = LOAD '" + outputY + "' as (a0: int, a1: double);"); - - Iterator<Tuple> iter; - Tuple t; - iter = pigServer.openIterator("M"); - - t = iter.next(); - assertTrue(t.toString().equals("(3,0.25)")); - - t = iter.next(); - assertTrue(t.toString().equals("(6,0.5)")); - - t = iter.next(); - assertTrue(t.toString().equals("(9,1.0)")); - - assertFalse(iter.hasNext()); - - // Check output - pigServer.registerQuery("N = LOAD '" + outputZ + "' as (a0: double, a1: double);"); - - iter = pigServer.openIterator("N"); - - t = iter.next(); - assertTrue(t.toString().equals("(8.0,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(13.0,40.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(23.0,60.0)")); - - assertFalse(iter.hasNext()); - - // Non batch mode - iter = pigServer.openIterator("Y"); - - t = iter.next(); - assertTrue(t.toString().equals("(3,0.25)")); - - t = iter.next(); - assertTrue(t.toString().equals("(6,0.5)")); - - t = iter.next(); - assertTrue(t.toString().equals("(9,1.0)")); - - assertFalse(iter.hasNext()); - - // Check in non-batch mode - iter = pigServer.openIterator("Z"); - - t = iter.next(); - assertTrue(t.toString().equals("(8.0,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(13.0,40.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(23.0,60.0)")); - - assertFalse(iter.hasNext()); - - - } - - // See PIG-1434 - @Test - public void testScalarWithNoSchema() throws Exception{ - String[] scalarInput = { - "1\t5" - }; - String[] input = { - "1\t5", - "2\t10", - "3\t20" - }; - String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchema"; - TestScalarAliases.createLocalInputFile(inputPath, input); - String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaScalar"; - TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); - // Load A as a scalar - pigServer.registerQuery("A = LOAD '" + inputPath + "';"); - pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "' as (count, total);"); - pigServer.registerQuery("B = foreach A generate 5 / scalar.total;"); - - Iterator<Tuple> iter = pigServer.openIterator("B"); - - Tuple t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - assertFalse(iter.hasNext()); - - } - - // See PIG-1434 - @Test - public void testScalarWithTwoBranches() throws Exception{ - String[] inputA = { - "1\t5", - "2\t10", - "3\t20" - }; - - String[] inputX = { - "pig", - "hadoop", - "rocks" - }; - - String output = BUILD_TEST_TMP+"testScalarWithTwoBranchesDir"; - TestScalarAliases.deleteDirectory(new File(output)); - // Test the use of scalars in expressions - String inputPathA = BUILD_TEST_TMP+"testScalarWithTwoBranchesA"; - TestScalarAliases.createLocalInputFile(inputPathA, inputA); - String inputPathX = BUILD_TEST_TMP+"testScalarWithTwoBranchesX"; - TestScalarAliases.createLocalInputFile(inputPathX, inputX); - // Test in script mode - pigServer.setBatchOn(); - pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0: long, a1: double);"); - pigServer.registerQuery("B = group A all;"); - pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); - pigServer.registerQuery("X = LOAD '" + inputPathX + "' as (names: chararray);"); - pigServer.registerQuery("Y = foreach X generate names, C.max;"); - pigServer.registerQuery("Store Y into '" + output + "';"); - pigServer.executeBatch(); - // Check output - pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: chararray, a1: double);"); - - Iterator<Tuple> iter = pigServer.openIterator("Z"); - - Tuple t = iter.next(); - assertTrue(t.toString().equals("(pig,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(hadoop,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(rocks,20.0)")); - - assertFalse(iter.hasNext()); - - // Check in non-batch mode - iter = pigServer.openIterator("Y"); - - t = iter.next(); - assertTrue(t.toString().equals("(pig,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(hadoop,20.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(rocks,20.0)")); - - assertFalse(iter.hasNext()); - - - } - - // See PIG-1434 - @Test - public void testFilteredScalarDollarProj() throws Exception{ - String output = BUILD_TEST_TMP+"table_testFilteredScalarDollarProjDir"; - TestScalarAliases.deleteDirectory(new File(output)); - String[] input = { - "1\t5\t[state#maine,city#portland]\t{(a),(b)}\t(a,b)", - "2\t10\t\t\t", - "3\t20\t\t\t" - }; - - // Test the use of scalars in expressions - String inputPath = BUILD_TEST_TMP+"table_testFilteredScalarDollarProj"; - TestScalarAliases.createLocalInputFile(inputPath, input); - // Test in script mode - pigServer.setBatchOn(); - pigServer.registerQuery("A = LOAD '" + inputPath + "'" + " as (a0: long, a1: double, a2 : bytearray, " + "a3: bag{ t : tuple(tc : chararray)}, " + "a4: tuple(c1 : chararray, c2 : chararray) );"); - pigServer.registerQuery("B = filter A by $1 < 8;"); - pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1), B.$2, B.$2#'state', B.$3, B.a4;"); - pigServer.registerQuery("Store Y into '" + output + "';"); - pigServer.explain("Y", System.err); - pigServer.executeBatch(); - // Check output - pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); - pigServer.explain("Z", System.err); - - Iterator<Tuple> iter = pigServer.openIterator("Z"); - - Tuple t = iter.next(); - assertTrue(t.toString().equals("(1,1.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(2,2.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(3,4.0)")); - - assertFalse(iter.hasNext()); - - // Check in non-batch mode - iter = pigServer.openIterator("Y"); - - t = iter.next(); - assertEquals(t.toString(),"(1,1.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); - - t = iter.next(); - assertEquals(t.toString(),"(2,2.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); - - t = iter.next(); - assertEquals(t.toString(),"(3,4.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); - - assertFalse(iter.hasNext()); - - - } - - // See PIG-1434 - @Test - public void testScalarWithNoSchemaDollarProj() throws Exception{ - String[] scalarInput = { - "1\t5" - }; - String[] input = { - "1\t5", - "2\t10", - "3\t20" - }; - String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProj"; - TestScalarAliases.createLocalInputFile(inputPath, input); - String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProjScalar"; - TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); - // Load A as a scalar - pigServer.registerQuery("A = LOAD '" + inputPath + "';"); - pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "';"); - pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;"); - - Iterator<Tuple> iter = pigServer.openIterator("B"); - - Tuple t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - t = iter.next(); - assertTrue(t.get(0).toString().equals("1")); - - assertFalse(iter.hasNext()); - - } - - // See PIG-1434 - @Test - public void testScalarAliasesJoinClause() throws Exception{ - String[] inputA = { - "1\t5", - "2\t10", - "3\t20" - }; - String[] inputB = { - "Total3\tthree", - "Total2\ttwo", - "Total1\tone" - }; - - // Test the use of scalars in expressions - String inputPathA = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseA"; - TestScalarAliases.createLocalInputFile(inputPathA, inputA); - String inputPathB = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseB"; - TestScalarAliases.createLocalInputFile(inputPathB, inputB); - // Test in script mode - pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0, a1);"); - pigServer.registerQuery("G = group A all;"); - pigServer.registerQuery("C = foreach G generate COUNT(A) as count;"); - - pigServer.registerQuery("B = LOAD '" + inputPathB + "' as (b0:chararray, b1:chararray);"); - pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;"); - - Iterator<Tuple> iter = pigServer.openIterator("Y"); - - String[] expected = new String[] { - "(1,5,Total3,three)", - "(2,10,Total3,three)", - "(3,20,Total3,three)" - }; - - Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("Y"))); - } - - // See PIG-1434 - @Test - public void testScalarAliasesFilterClause() throws Exception{ - String[] input = { - "1\t5", - "2\t10", - "3\t20", - "4\t12", - "5\t8" - }; - - // Test the use of scalars in expressions - String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesFilterClause"; - TestScalarAliases.createLocalInputFile(inputPath, input); - // Test in script mode - pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0, a1);"); - pigServer.registerQuery("G = group A all;"); - pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;"); - - pigServer.registerQuery("Y = filter A by a1 > C.average;"); - - Iterator<Tuple> iter = pigServer.openIterator("Y"); - - // Average is 11 - Tuple t = iter.next(); - assertTrue(t.toString().equals("(3,20)")); - - t = iter.next(); - assertTrue(t.toString().equals("(4,12)")); - - assertFalse(iter.hasNext()); - } - - // See PIG-1434 - @Test public void testScalarAliasesSplitClause() throws Exception{ - Util.resetStateForExecModeSwitch(); pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); String[] input = { "1\t5", @@ -540,94 +115,4 @@ public class TestScalarAliases { ); } } - - - // See PIG-1434 - @Test - public void testScalarAliasesGrammarNegative() throws Exception{ - String[] input = { - "1\t5", - "2\t10", - "3\t20" - }; - - String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesGrammar"; - TestScalarAliases.createLocalInputFile(inputPath, input); - - try { - pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); - pigServer.registerQuery("B = group A all;"); - pigServer.registerQuery("C = foreach B generate COUNT(A);"); - // Only projections of C are supported - pigServer.registerQuery("Y = foreach A generate C;"); - pigServer.openIterator( "Y" ); - //Control should not reach here - fail("Scalar projections are only supported"); - } catch (IOException pe){ - assertTrue(pe.getMessage().contains("Invalid scalar projection: C")); - } - } - - // See PIG-1636 - @Test - public void testScalarAliasesLimit() throws Exception{ - String[] input = { - "a\t1", - "b\t2", - "c\t3", - "a\t4", - "c\t5" - }; - - // Test the use of scalars in expressions - String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesLimit"; - TestScalarAliases.createLocalInputFile(inputPath, input); - // Test in script mode - pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0:chararray, a1: int);"); - pigServer.registerQuery("G = group A all;"); - pigServer.registerQuery("C = foreach G generate SUM(A.$1) as total;"); - pigServer.registerQuery("C1 = limit C 1;"); - pigServer.registerQuery("Y = foreach A generate a0, a1 * (double)C1.total;"); - - Iterator<Tuple> iter = pigServer.openIterator("Y"); - - // Average is 11 - Tuple t = iter.next(); - assertTrue(t.toString().equals("(a,15.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(b,30.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(c,45.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(a,60.0)")); - - t = iter.next(); - assertTrue(t.toString().equals("(c,75.0)")); - - assertFalse(iter.hasNext()); - } - - /** - * Test that a specific string is included in the error message when an - * exception is thrown for using a relation in a - * scalar context without projecting any columns out of it - */ - // See PIG-1788 - @Test - public void testScalarWithNoProjection() throws Exception{ - String query = - " A = load 'table_testScalarWithNoProjection' as (x, y);" + - " B = group A by x;" + - // B is unintentionally being used as scalar, - // the user intends it to be COUNT(A) - " C = foreach B generate COUNT(B);"; - - Util.checkExceptionMessage(query, "C", - "A column needs to be projected from a relation" + - " for it to be used as a scalar" - ); - } } Added: pig/trunk/test/org/apache/pig/test/TestScalarAliasesLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestScalarAliasesLocal.java?rev=1653391&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestScalarAliasesLocal.java (added) +++ pig/trunk/test/org/apache/pig/test/TestScalarAliasesLocal.java Tue Jan 20 22:16:33 2015 @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.pig.PigServer; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.junit.Before; +import org.junit.Test; + +public class TestScalarAliasesLocal { + private static final String BUILD_TEST_TMP = "build/test/tmp/"; + private PigServer pigServer; + + TupleFactory mTf = TupleFactory.getInstance(); + BagFactory mBf = BagFactory.getInstance(); + + @Before + public void setUp() throws Exception{ + pigServer = new PigServer(Util.getLocalTestMode()); + } + + public static void deleteDirectory(File file) { + if (file.exists()) { + Util.deleteDirectory(file); + } + } + + public static File createLocalInputFile(String filename, String[] inputData) + throws IOException { + new File(filename).getParentFile().mkdirs(); + return Util.createLocalInputFile(filename, inputData); + } + + // See PIG-1434 + @Test + public void testScalarAliasesBatchNobatch() throws Exception{ + String[] input = { + "1\t5", + "2\t10", + "3\t20" + }; + + String output = BUILD_TEST_TMP+"table_testScalarAliasesDir"; + TestScalarAliases.deleteDirectory(new File(output)); + // Test the use of scalars in expressions + String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesBatch"; + TestScalarAliases.createLocalInputFile(inputPath, input); + // Test in script mode + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); + pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); + pigServer.registerQuery("Store Y into '" + output + "';"); + pigServer.executeBatch(); + // Check output + pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); + + Iterator<Tuple> iter; + Tuple t; + iter = pigServer.openIterator("Z"); + + t = iter.next(); + assertTrue(t.toString().equals("(3,0.25)")); + + t = iter.next(); + assertTrue(t.toString().equals("(6,0.5)")); + + t = iter.next(); + assertTrue(t.toString().equals("(9,1.0)")); + + assertFalse(iter.hasNext()); + + iter = pigServer.openIterator("Y"); + + t = iter.next(); + assertTrue(t.toString().equals("(3,0.25)")); + + t = iter.next(); + assertTrue(t.toString().equals("(6,0.5)")); + + t = iter.next(); + assertTrue(t.toString().equals("(9,1.0)")); + + assertFalse(iter.hasNext()); + + + } + + // See PIG-1434 + @Test + public void testUseScalarMultipleTimes() throws Exception{ + String[] input = { + "1\t5", + "2\t10", + "3\t20" + }; + + String outputY = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutY"; + TestScalarAliases.deleteDirectory(new File(outputY)); + String outputZ = BUILD_TEST_TMP+"table_testUseScalarMultipleTimesOutZ"; + TestScalarAliases.deleteDirectory(new File(outputZ)); + // Test the use of scalars in expressions + String inputPath = BUILD_TEST_TMP+"table_testUseScalarMultipleTimes"; + TestScalarAliases.createLocalInputFile(inputPath, input); + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); + pigServer.registerQuery("Y = foreach A generate (a0 * C.count), (a1 / C.max);"); + pigServer.registerQuery("Store Y into '" + outputY + "';"); + pigServer.registerQuery("Z = foreach A generate (a1 + C.count), (a0 * C.max);"); + pigServer.registerQuery("Store Z into '" + outputZ + "';"); + // Test Multiquery store + pigServer.executeBatch(); + + // Check output + pigServer.registerQuery("M = LOAD '" + outputY + "' as (a0: int, a1: double);"); + + Iterator<Tuple> iter; + Tuple t; + iter = pigServer.openIterator("M"); + + t = iter.next(); + assertTrue(t.toString().equals("(3,0.25)")); + + t = iter.next(); + assertTrue(t.toString().equals("(6,0.5)")); + + t = iter.next(); + assertTrue(t.toString().equals("(9,1.0)")); + + assertFalse(iter.hasNext()); + + // Check output + pigServer.registerQuery("N = LOAD '" + outputZ + "' as (a0: double, a1: double);"); + + iter = pigServer.openIterator("N"); + + t = iter.next(); + assertTrue(t.toString().equals("(8.0,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(13.0,40.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(23.0,60.0)")); + + assertFalse(iter.hasNext()); + + // Non batch mode + iter = pigServer.openIterator("Y"); + + t = iter.next(); + assertTrue(t.toString().equals("(3,0.25)")); + + t = iter.next(); + assertTrue(t.toString().equals("(6,0.5)")); + + t = iter.next(); + assertTrue(t.toString().equals("(9,1.0)")); + + assertFalse(iter.hasNext()); + + // Check in non-batch mode + iter = pigServer.openIterator("Z"); + + t = iter.next(); + assertTrue(t.toString().equals("(8.0,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(13.0,40.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(23.0,60.0)")); + + assertFalse(iter.hasNext()); + + + } + + // See PIG-1434 + @Test + public void testScalarWithNoSchema() throws Exception{ + String[] scalarInput = { + "1\t5" + }; + String[] input = { + "1\t5", + "2\t10", + "3\t20" + }; + String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchema"; + TestScalarAliases.createLocalInputFile(inputPath, input); + String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaScalar"; + TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); + // Load A as a scalar + pigServer.registerQuery("A = LOAD '" + inputPath + "';"); + pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "' as (count, total);"); + pigServer.registerQuery("B = foreach A generate 5 / scalar.total;"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + + Tuple t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + assertFalse(iter.hasNext()); + + } + + // See PIG-1434 + @Test + public void testScalarWithTwoBranches() throws Exception{ + String[] inputA = { + "1\t5", + "2\t10", + "3\t20" + }; + + String[] inputX = { + "pig", + "hadoop", + "rocks" + }; + + String output = BUILD_TEST_TMP+"testScalarWithTwoBranchesDir"; + TestScalarAliases.deleteDirectory(new File(output)); + // Test the use of scalars in expressions + String inputPathA = BUILD_TEST_TMP+"testScalarWithTwoBranchesA"; + TestScalarAliases.createLocalInputFile(inputPathA, inputA); + String inputPathX = BUILD_TEST_TMP+"testScalarWithTwoBranchesX"; + TestScalarAliases.createLocalInputFile(inputPathX, inputX); + // Test in script mode + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0: long, a1: double);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("C = foreach B generate COUNT(A) as count, MAX(A.$1) as max;"); + pigServer.registerQuery("X = LOAD '" + inputPathX + "' as (names: chararray);"); + pigServer.registerQuery("Y = foreach X generate names, C.max;"); + pigServer.registerQuery("Store Y into '" + output + "';"); + pigServer.executeBatch(); + // Check output + pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: chararray, a1: double);"); + + Iterator<Tuple> iter = pigServer.openIterator("Z"); + + Tuple t = iter.next(); + assertTrue(t.toString().equals("(pig,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(hadoop,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(rocks,20.0)")); + + assertFalse(iter.hasNext()); + + // Check in non-batch mode + iter = pigServer.openIterator("Y"); + + t = iter.next(); + assertTrue(t.toString().equals("(pig,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(hadoop,20.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(rocks,20.0)")); + + assertFalse(iter.hasNext()); + + pigServer.getPigContext().getProperties().remove("tez.am.inline.task.execution.max-tasks"); + + } + + // See PIG-1434 + @Test + public void testFilteredScalarDollarProj() throws Exception{ + String output = BUILD_TEST_TMP+"table_testFilteredScalarDollarProjDir"; + TestScalarAliases.deleteDirectory(new File(output)); + String[] input = { + "1\t5\t[state#maine,city#portland]\t{(a),(b)}\t(a,b)", + "2\t10\t\t\t", + "3\t20\t\t\t" + }; + + // Test the use of scalars in expressions + String inputPath = BUILD_TEST_TMP+"table_testFilteredScalarDollarProj"; + TestScalarAliases.createLocalInputFile(inputPath, input); + // Test in script mode + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD '" + inputPath + "'" + " as (a0: long, a1: double, a2 : bytearray, " + "a3: bag{ t : tuple(tc : chararray)}, " + "a4: tuple(c1 : chararray, c2 : chararray) );"); + pigServer.registerQuery("B = filter A by $1 < 8;"); + pigServer.registerQuery("Y = foreach A generate (a0 * B.$0), (a1 / B.$1), B.$2, B.$2#'state', B.$3, B.a4;"); + pigServer.registerQuery("Store Y into '" + output + "';"); + pigServer.explain("Y", System.err); + pigServer.executeBatch(); + // Check output + pigServer.registerQuery("Z = LOAD '" + output + "' as (a0: int, a1: double);"); + pigServer.explain("Z", System.err); + + Iterator<Tuple> iter = pigServer.openIterator("Z"); + + Tuple t = iter.next(); + assertTrue(t.toString().equals("(1,1.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(2,2.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(3,4.0)")); + + assertFalse(iter.hasNext()); + + // Check in non-batch mode + iter = pigServer.openIterator("Y"); + + t = iter.next(); + assertEquals(t.toString(),"(1,1.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); + + t = iter.next(); + assertEquals(t.toString(),"(2,2.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); + + t = iter.next(); + assertEquals(t.toString(),"(3,4.0,[state#maine,city#portland],maine,{(a),(b)},(a,b))"); + + assertFalse(iter.hasNext()); + + + } + + // See PIG-1434 + @Test + public void testScalarWithNoSchemaDollarProj() throws Exception{ + String[] scalarInput = { + "1\t5" + }; + String[] input = { + "1\t5", + "2\t10", + "3\t20" + }; + String inputPath = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProj"; + TestScalarAliases.createLocalInputFile(inputPath, input); + String inputPathScalar = BUILD_TEST_TMP+"table_testScalarWithNoSchemaDollarProjScalar"; + TestScalarAliases.createLocalInputFile(inputPathScalar, scalarInput); + // Load A as a scalar + pigServer.registerQuery("A = LOAD '" + inputPath + "';"); + pigServer.registerQuery("scalar = LOAD '" + inputPathScalar + "';"); + pigServer.registerQuery("B = foreach A generate 5 / scalar.$1;"); + + Iterator<Tuple> iter = pigServer.openIterator("B"); + + Tuple t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + t = iter.next(); + assertTrue(t.get(0).toString().equals("1")); + + assertFalse(iter.hasNext()); + + } + + // See PIG-1434 + @Test + public void testScalarAliasesJoinClause() throws Exception{ + String[] inputA = { + "1\t5", + "2\t10", + "3\t20" + }; + String[] inputB = { + "Total3\tthree", + "Total2\ttwo", + "Total1\tone" + }; + + // Test the use of scalars in expressions + String inputPathA = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseA"; + TestScalarAliases.createLocalInputFile(inputPathA, inputA); + String inputPathB = BUILD_TEST_TMP+"table_testScalarAliasesJoinClauseB"; + TestScalarAliases.createLocalInputFile(inputPathB, inputB); + // Test in script mode + pigServer.registerQuery("A = LOAD '" + inputPathA + "' as (a0, a1);"); + pigServer.registerQuery("G = group A all;"); + pigServer.registerQuery("C = foreach G generate COUNT(A) as count;"); + + pigServer.registerQuery("B = LOAD '" + inputPathB + "' as (b0:chararray, b1:chararray);"); + pigServer.registerQuery("Y = join A by CONCAT('Total', (chararray)C.count), B by $0;"); + + Iterator<Tuple> iter = pigServer.openIterator("Y"); + + String[] expected = new String[] { + "(1,5,Total3,three)", + "(2,10,Total3,three)", + "(3,20,Total3,three)" + }; + + Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("Y"))); + } + + // See PIG-1434 + @Test + public void testScalarAliasesFilterClause() throws Exception{ + String[] input = { + "1\t5", + "2\t10", + "3\t20", + "4\t12", + "5\t8" + }; + + // Test the use of scalars in expressions + String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesFilterClause"; + TestScalarAliases.createLocalInputFile(inputPath, input); + // Test in script mode + pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0, a1);"); + pigServer.registerQuery("G = group A all;"); + pigServer.registerQuery("C = foreach G generate AVG(A.$1) as average;"); + + pigServer.registerQuery("Y = filter A by a1 > C.average;"); + + Iterator<Tuple> iter = pigServer.openIterator("Y"); + + // Average is 11 + Tuple t = iter.next(); + assertTrue(t.toString().equals("(3,20)")); + + t = iter.next(); + assertTrue(t.toString().equals("(4,12)")); + + assertFalse(iter.hasNext()); + } + + // See PIG-1434 + @Test + public void testScalarAliasesGrammarNegative() throws Exception{ + String[] input = { + "1\t5", + "2\t10", + "3\t20" + }; + + String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesGrammar"; + TestScalarAliases.createLocalInputFile(inputPath, input); + + try { + pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0: long, a1: double);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("C = foreach B generate COUNT(A);"); + // Only projections of C are supported + pigServer.registerQuery("Y = foreach A generate C;"); + pigServer.openIterator( "Y" ); + //Control should not reach here + fail("Scalar projections are only supported"); + } catch (IOException pe){ + assertTrue(pe.getMessage().contains("Invalid scalar projection: C")); + } + } + + // See PIG-1636 + @Test + public void testScalarAliasesLimit() throws Exception{ + String[] input = { + "a\t1", + "b\t2", + "c\t3", + "a\t4", + "c\t5" + }; + + // Test the use of scalars in expressions + String inputPath = BUILD_TEST_TMP+"table_testScalarAliasesLimit"; + TestScalarAliases.createLocalInputFile(inputPath, input); + // Test in script mode + pigServer.registerQuery("A = LOAD '" + inputPath + "' as (a0:chararray, a1: int);"); + pigServer.registerQuery("G = group A all;"); + pigServer.registerQuery("C = foreach G generate SUM(A.$1) as total;"); + pigServer.registerQuery("C1 = limit C 1;"); + pigServer.registerQuery("Y = foreach A generate a0, a1 * (double)C1.total;"); + + Iterator<Tuple> iter = pigServer.openIterator("Y"); + + // Average is 11 + Tuple t = iter.next(); + assertTrue(t.toString().equals("(a,15.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(b,30.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(c,45.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(a,60.0)")); + + t = iter.next(); + assertTrue(t.toString().equals("(c,75.0)")); + + assertFalse(iter.hasNext()); + } + + /** + * Test that a specific string is included in the error message when an + * exception is thrown for using a relation in a + * scalar context without projecting any columns out of it + */ + // See PIG-1788 + @Test + public void testScalarWithNoProjection() throws Exception{ + String query = + " A = load 'table_testScalarWithNoProjection' as (x, y);" + + " B = group A by x;" + + // B is unintentionally being used as scalar, + // the user intends it to be COUNT(A) + " C = foreach B generate COUNT(B);"; + + Util.checkExceptionMessage(query, "C", + "A column needs to be projected from a relation" + + " for it to be used as a scalar" + ); + } + +} Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Tue Jan 20 22:16:33 2015 @@ -1380,4 +1380,13 @@ public class Util { }); return parts[0].getPath(); } + + public static File getFirstPartFile(File dir) throws Exception { + File[] parts = dir.listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.startsWith("part-"); + }; + }); + return parts[0]; + } } Added: pig/trunk/test/org/apache/pig/tez/TestPigStatsTez.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestPigStatsTez.java?rev=1653391&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestPigStatsTez.java (added) +++ pig/trunk/test/org/apache/pig/tez/TestPigStatsTez.java Tue Jan 20 22:16:33 2015 @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.tez; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.executionengine.ExecJob; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; +import org.apache.pig.impl.PigContext; +import org.apache.pig.test.TestPigStats; +import org.apache.pig.tools.pigstats.PigStats.JobGraph; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; +import org.apache.pig.tools.pigstats.tez.TezScriptState; +import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo; + +public class TestPigStatsTez extends TestPigStats { + @Override + public void addSettingsToConf(Configuration conf, String scriptFileName) { + TezScriptState ss = TezScriptState.get(); + ss.setScript(new File(scriptFileName)); + ss.addDAGSettingsToConf(conf); + } + + @Override + public void checkPigStats(ExecJob job) { + JobGraph jobGraph = job.getStatistics().getJobGraph(); + assertEquals(1, jobGraph.getJobList().size()); + } + + @Override + public void checkPigStatsAlias(PhysicalPlan pp, PigContext pc) throws Exception { + TezPlanContainer planContainer = new TezLauncher().compile(pp, pc); + assertEquals(planContainer.size(), 1); + TezPlanContainerNode containerNode = planContainer.iterator().next(); + TezOperPlan tezPlan = containerNode.getTezOperPlan(); + assertEquals(tezPlan.size(), 6); + + TezPigScriptStats tezStats = new TezPigScriptStats(pc); + tezStats.initialize(planContainer); + + TezScriptState ss = TezScriptState.get(); + TezDAGScriptInfo scriptInfo = ss.getDAGScriptInfo(containerNode.getOperatorKey().toString()); + + TezOperator tezOper = tezPlan.getRoots().get(0); + assertEquals(getAlias(scriptInfo, tezOper), "A,B,C"); + tezOper = tezPlan.getSuccessors(tezOper).get(0); + assertEquals(getAlias(scriptInfo, tezOper), "C,D"); + TezOperator partitionerOper = null; + for (TezOperator succ : tezPlan.getSuccessors(tezOper)) { + if (succ.isSampleAggregation()) { + assertEquals(getAlias(scriptInfo, succ), ""); + } else { + assertEquals(getAlias(scriptInfo, succ), "D"); + partitionerOper = succ; + } + } + tezOper = partitionerOper; + assertEquals(getAlias(scriptInfo, tezOper), "D"); + tezOper = tezPlan.getSuccessors(tezOper).get(0); + assertEquals(getAlias(scriptInfo, tezOper), ""); + tezOper = tezPlan.getSuccessors(tezOper).get(0); + assertEquals(getAlias(scriptInfo, tezOper), ""); + } + + public static String getAlias(TezDAGScriptInfo scriptInfo, TezOperator oper) throws Exception { + String alias = scriptInfo.getAlias(oper); + return alias; + } +} Modified: pig/trunk/test/tez-local-tests URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653391&r1=1653390&r2=1653391&view=diff ============================================================================== --- pig/trunk/test/tez-local-tests (original) +++ pig/trunk/test/tez-local-tests Tue Jan 20 22:16:33 2015 @@ -71,3 +71,12 @@ **/TestPigServerWithMacros.java **/TestMultiQueryBasic.java **/TestLoadStoreFuncLifeCycle.java +**/TestCubeOperator.java +**/TestJoinLocal.java +**/TestMultiQuery.java +**/TestNewPlanColumnPrune.java +**/TestPigServerLocal.java +**/TestPruneColumn.java +**/TestRank3.java +**/TestScalarAliasesLocal.java +**/TestPigStatsTez.java
