Added: pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java (added) +++ pig/trunk/test/org/apache/pig/spark/TestIndexedKey.java Mon May 29 15:00:39 2017 @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.spark; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import static org.junit.Assert.assertEquals; + +@RunWith(JUnit4.class) +public class TestIndexedKey { + + /**Case1:Compare IndexedKeys with same index value + * key1 key2 equal? hashCode1 hashCode2 + * foo null N hashCode(foo) index + * null foo N index hashCode(foo) + * foo foo Y hashCode(foo) hashCode(foo) + * null null Y index index + * (1,1) (1,1) Y hashCode((1,1)) hashCode((1,1)) + * (1,) (1,) Y hashCode((1,)) hashCode((1,)) + * (1,1) (1,2) N hashCode((1,1)) hashCode((1,2)) + */ + @Test + public void testIndexedKeyWithSameIndexValue() throws Exception { + IndexedKey a0 = new IndexedKey(new Byte("0"), "foo"); + IndexedKey a1 = new IndexedKey(new Byte("0"), null); + assertEquals(a0.equals(a1), false); + assertEquals(a0.hashCode()==a1.hashCode(),false); + + IndexedKey a2 = new IndexedKey(new Byte("0"), null); + IndexedKey a3 = new IndexedKey(new Byte("0"), "foo"); + assertEquals(a2.equals(a3),false); + assertEquals(a2.hashCode()==a3.hashCode(),false); + + IndexedKey a4 = new IndexedKey(new Byte("0"), "foo"); + IndexedKey a5 = new IndexedKey(new Byte("0"), "foo"); + assertEquals(a4.equals(a5),true); + assertEquals(a4.hashCode()==a5.hashCode(),true); + + IndexedKey a6 = new IndexedKey(new Byte("0"), null); + IndexedKey a7 = new IndexedKey(new Byte("0"), null); + assertEquals(a6.equals(a7),true); + assertEquals(a6.hashCode()==a7.hashCode(),true); + + Tuple t1 = TupleFactory.getInstance().newTuple(2); + t1.set(0,"1"); + t1.set(1,"1"); + Tuple t2 = TupleFactory.getInstance().newTuple(2); + t2.set(0,"1"); + t2.set(1,"1"); + IndexedKey a8 = new IndexedKey(new Byte("0"), t1); + IndexedKey a9 = new IndexedKey(new Byte("0"), t2); + assertEquals(a8.equals(a9),true); + assertEquals(a8.hashCode()==a9.hashCode(),true); + + Tuple t3 = TupleFactory.getInstance().newTuple(2); + t3.set(0,"1"); + t3.set(1,null); + Tuple t4 = TupleFactory.getInstance().newTuple(2); + t4.set(0,"1"); + t4.set(1,null); + IndexedKey a10 = new IndexedKey(new Byte("0"), t3); + IndexedKey a11 = new IndexedKey(new Byte("0"), t4); + assertEquals(a10.equals(a11),true); + assertEquals(a10.hashCode()==a11.hashCode(),true); + + Tuple t5 = TupleFactory.getInstance().newTuple(2); + t5.set(0,"1"); + t5.set(1,"1"); + Tuple t6 = TupleFactory.getInstance().newTuple(2); + t6.set(0,"1"); + t6.set(1,"2"); + IndexedKey a12 = new IndexedKey(new Byte("0"), t5); + IndexedKey a13 = new IndexedKey(new Byte("0"), t6); + assertEquals(a12.equals(a13),false); + assertEquals(a12.hashCode()==a13.hashCode(),false); + } + + /* + * Case2:Compare IndexedKeys with different index value + * key1 key2 equal? hashCode1 hashCode2 + * foo null N hashCode(foo) index2 + * null foo N index1 hashCode(foo) + * foo foo Y hashCode(foo) hashCode(foo) + * null null N index1 index2 + * (1,1) (1,1) Y hashCode((1,1)) hashCode((1,1)) + * (1,) (1,) N hashCode((1,)) hashCode((1,)) + * (1,1) (1,2) N hashCode((1,1)) hashCode((1,2)) + */ + @Test + public void testIndexedKeyWithDifferentIndexValue() throws Exception { + IndexedKey a0 = new IndexedKey(new Byte("0"), "foo"); + IndexedKey a1 = new IndexedKey(new Byte("1"), null); + assertEquals(a0.equals(a1), false); + assertEquals(a0.hashCode() == a1.hashCode(), false); + + IndexedKey a2 = new IndexedKey(new Byte("0"), null); + IndexedKey a3 = new IndexedKey(new Byte("1"), "foo"); + assertEquals(a2.equals(a3), false); + assertEquals(a2.hashCode() == a3.hashCode(), false); + + IndexedKey a4 = new IndexedKey(new Byte("0"), "foo"); + IndexedKey a5 = new IndexedKey(new Byte("1"), "foo"); + assertEquals(a4.equals(a5), true); + assertEquals(a4.hashCode() == a5.hashCode(), true); + + IndexedKey a6 = new IndexedKey(new Byte("0"), null); + IndexedKey a7 = new IndexedKey(new Byte("1"), null); + assertEquals(a6.equals(a7), false); + assertEquals(a6.hashCode() == a7.hashCode(), false); + + Tuple t1 = TupleFactory.getInstance().newTuple(2); + t1.set(0, "1"); + t1.set(1, "1"); + Tuple t2 = TupleFactory.getInstance().newTuple(2); + t2.set(0, "1"); + t2.set(1, "1"); + IndexedKey a8 = new IndexedKey(new Byte("0"), t1); + IndexedKey a9 = new IndexedKey(new Byte("1"), t2); + assertEquals(a8.equals(a9), true); + assertEquals(a8.hashCode() == a9.hashCode(), true); + + Tuple t3 = TupleFactory.getInstance().newTuple(2); + t3.set(0, "1"); + t3.set(1, null); + Tuple t4 = TupleFactory.getInstance().newTuple(2); + t4.set(0, "1"); + t4.set(1, null); + IndexedKey a10 = new IndexedKey(new Byte("0"), t3); + IndexedKey a11 = new IndexedKey(new Byte("1"), t4); + assertEquals(a10.equals(a11), false); + assertEquals(a10.hashCode() == a11.hashCode(), true); //hashcode of a10 and a11 are equal but they are not equal + + Tuple t5 = TupleFactory.getInstance().newTuple(2); + t5.set(0, "1"); + t5.set(1, "1"); + Tuple t6 = TupleFactory.getInstance().newTuple(2); + t6.set(0, "1"); + t6.set(1, "2"); + IndexedKey a12 = new IndexedKey(new Byte("0"), t5); + IndexedKey a13 = new IndexedKey(new Byte("1"), t6); + assertEquals(a12.equals(a13), false); + assertEquals(a12.hashCode() == a13.hashCode(), false); + } +}
Added: pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java (added) +++ pig/trunk/test/org/apache/pig/spark/TestSecondarySortSpark.java Mon May 29 15:00:39 2017 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.spark; + +import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.SecondaryKeyOptimizerSpark; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.test.MiniGenericCluster; +import org.apache.pig.test.TestSecondarySort; +import org.apache.pig.test.Util; + +/** + * TestSecondarySortSpark. + */ +public class TestSecondarySortSpark extends TestSecondarySort { + + public TestSecondarySortSpark() { + super(); + } + + @Override + public MiniGenericCluster getCluster() { + return MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_SPARK); + } + + @Override + public SecondaryKeyOptimizer visitSecondaryKeyOptimizer(String query) throws Exception { + PhysicalPlan pp = Util.buildPp(pigServer, query); + SparkCompiler comp = new SparkCompiler(pp, pc); + comp.compile(); + SparkOperPlan sparkPlan = comp.getSparkPlan(); + SecondaryKeyOptimizerSpark optimizer = new SecondaryKeyOptimizerSpark(sparkPlan); + optimizer.visit(); + return optimizer; + } +} Added: pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java (added) +++ pig/trunk/test/org/apache/pig/spark/TestSparkCompiler.java Mon May 29 15:00:39 2017 @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.spark; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.PrintStream; +import java.util.Properties; +import java.util.Random; + +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.TransformerException; + +import org.apache.commons.io.FileUtils; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLocalExecType; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.DotSparkPrinter; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkPrinter; +import org.apache.pig.backend.hadoop.executionengine.spark.plan.XMLSparkPrinter; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.plan.NodeIdGenerator; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.test.Util; +import org.apache.pig.test.utils.TestHelper; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test cases to test the SparkCompiler. VERY IMPORTANT NOTE: The tests here + * compare results with a "golden" set of outputs. In each test case here, the + * operators generated have a random operator key which uses Java's Random + * class. So if there is a code change which changes the number of operators + * created in a plan, then the "golden" file for that test case + * need to be changed. + */ + +public class TestSparkCompiler { + private static PigContext pc; + private static PigServer pigServer; + private static final int MAX_SIZE = 100000; + + private enum PlanPrinter { + TEXT, + DOT, + XML; + + public void doPrint(PrintStream ps, SparkOperPlan plan) throws VisitorException, ParserConfigurationException, TransformerException { + switch (this) { + case DOT: + throw new RuntimeException("Testing in DOT format not supported yet"); + //(new DotSparkPrinter(plan, ps)).dump(); + //break; + case XML: + XMLSparkPrinter printer = new XMLSparkPrinter(ps, plan); + printer.visit(); + printer.closePlan(); + break; + case TEXT: + default: + (new SparkPrinter(ps, plan)).visit(); + break; + } + } + } + + // If for some reason, the golden files need to be regenerated, set this to + // true - THIS WILL OVERWRITE THE GOLDEN FILES - So use this with caution + // and only for the test cases you need and are sure of. + private boolean generate = false; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + resetFileLocalizer(); + pc = new PigContext(new SparkLocalExecType(), new Properties()); + FileUtils.deleteDirectory(new File("/tmp/pigoutput")); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + resetFileLocalizer(); + } + + @Before + public void setUp() throws ExecException { + resetScope(); + pigServer = new PigServer(pc); + } + + private void resetScope() { + NodeIdGenerator.reset(); + PigServer.resetScope(); + } + + private static void resetFileLocalizer() { + FileLocalizer.deleteTempFiles(); + FileLocalizer.setInitialized(false); + // Set random seed to generate deterministic temporary paths + FileLocalizer.setR(new Random(1331L)); + } + + @Test + public void testStoreLoad() throws Exception { + String query = + "a = load 'file:///tmp/input' as (x:int, y:int);" + + "store a into 'file:///tmp/pigoutput';" + + "b = load 'file:///tmp/pigoutput' as (x:int, y:int);" + + "store b into 'file:///tmp/pigoutput1';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-text.gld", PlanPrinter.TEXT); + run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-xml.gld", PlanPrinter.XML); + //TODO: enable this when DOT file comparison is supported + //run(query, "test/org/apache/pig/test/data/GoldenFiles/spark/SPARKC-LoadStore-1-dot.gld", PlanPrinter.DOT); + } + + private void run(String query, String expectedFile, PlanPrinter planPrinter) throws Exception { + PhysicalPlan pp = Util.buildPp(pigServer, query); + SparkLauncher launcher = new SparkLauncher(); + pc.inExplain = true; + SparkOperPlan sparkOperPlan = launcher.compile(pp, pc); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + planPrinter.doPrint(ps, sparkOperPlan); + String compiledPlan = baos.toString(); + System.out.println(); + System.out.println("<<<" + compiledPlan + ">>>"); + + if (generate) { + FileOutputStream fos = new FileOutputStream(expectedFile); + fos.write(baos.toByteArray()); + fos.close(); + return; + } + FileInputStream fis = new FileInputStream(expectedFile); + byte[] b = new byte[MAX_SIZE]; + int len = fis.read(b); + fis.close(); + String goldenPlan = new String(b, 0, len); + if (goldenPlan.charAt(len-1) == '\n') { + goldenPlan = goldenPlan.substring(0, len-1); + } + + System.out.println("-------------"); + System.out.println("Golden"); + System.out.println("<<<" + goldenPlan + ">>>"); + System.out.println("-------------"); + + String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim(); + String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim(); + assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)), + TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean))); + } + +} + Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original) +++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Mon May 29 15:00:39 2017 @@ -48,6 +48,7 @@ abstract public class MiniGenericCluster public static String EXECTYPE_MR = "mr"; public static String EXECTYPE_TEZ = "tez"; + public static String EXECTYPE_SPARK = "spark"; /** * Returns the single instance of class MiniGenericCluster that represents @@ -75,6 +76,8 @@ abstract public class MiniGenericCluster INSTANCE = new MiniCluster(); } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) { INSTANCE = new TezMiniCluster(); + } else if (execType.equalsIgnoreCase(EXECTYPE_SPARK)) { + INSTANCE = new SparkMiniCluster(); } else { throw new RuntimeException("Unknown test.exec.type: " + execType); } @@ -157,7 +160,9 @@ abstract public class MiniGenericCluster return MiniCluster.getLauncher(); } else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) { return TezMiniCluster.getLauncher(); - } else { + } else if(execType.equalsIgnoreCase(EXECTYPE_SPARK)){ + return SparkMiniCluster.getLauncher(); + } else{ throw new RuntimeException("Unknown test.exec.type: " + execType); } } Added: pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java?rev=1796639&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java (added) +++ pig/trunk/test/org/apache/pig/test/SparkMiniCluster.java Mon May 29 15:00:39 2017 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; + +public class SparkMiniCluster extends YarnMiniCluster { + + private static final Log LOG = LogFactory + .getLog(SparkMiniCluster.class); + private ExecType spark = new SparkExecType(); + + @Override + public ExecType getExecType() { + return spark; + } + + static public Launcher getLauncher() { + return new SparkLauncher(); + } +} Modified: pig/trunk/test/org/apache/pig/test/TestAssert.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAssert.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestAssert.java (original) +++ pig/trunk/test/org/apache/pig/test/TestAssert.java Mon May 29 15:00:39 2017 @@ -116,7 +116,8 @@ public class TestAssert { try { pigServer.openIterator("A"); } catch (FrontendException fe) { - if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")) { + if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") + || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { Assert.assertTrue(fe.getCause().getMessage().contains( "Assertion violated: i should be greater than 1")); } else { @@ -147,7 +148,8 @@ public class TestAssert { try { pigServer.openIterator("A"); } catch (FrontendException fe) { - if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")) { + if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") + || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { Assert.assertTrue(fe.getCause().getMessage().contains( "Assertion violated: i should be greater than 1")); } else { Modified: pig/trunk/test/org/apache/pig/test/TestCase.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCase.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestCase.java (original) +++ pig/trunk/test/org/apache/pig/test/TestCase.java Mon May 29 15:00:39 2017 @@ -29,6 +29,7 @@ import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.Tuple; import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; import org.junit.Test; public class TestCase { @@ -267,10 +268,15 @@ public class TestCase { pigServer.registerQuery("STORE C INTO 'bar' USING mock.Storage();"); List<Tuple> out = data.get("bar"); - assertEquals(3, out.size()); - assertEquals(tuple(1, "3n+1", bag(tuple("a","x"), tuple("a","y"))), out.get(0)); - assertEquals(tuple(2, "3n+2", bag(tuple("b","x"), tuple("b","y"))), out.get(1)); - assertEquals(tuple(3, "3n", bag(tuple("c","x"), tuple("c","y"))), out.get(2)); + + String[] expected = new String[] { + "(1,3n+1,{(a,x),(a,y)})", + "(2,3n+2,{(b,x),(b,y)})", + "(3,3n,{(c,x),(c,y)})" + }; + Schema s = pigServer.dumpSchema("C"); + Util.checkQueryOutputs(out.iterator(), expected, org.apache.pig.newplan.logical.Util.translateSchema(s), + Util.isSparkExecType(Util.getLocalTestMode())); } /** Modified: pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original) +++ pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Mon May 29 15:00:39 2017 @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.List; import org.apache.pig.CollectableLoadFunc; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException; @@ -296,13 +295,13 @@ public class TestCollectedGroup { @Test public void testMapsideGroupWithMergeJoin() throws IOException{ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);"); - pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using "+DummyCollectableLoader.class.getName() +"() as (id, name, grade);"); + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " + DummyCollectableLoader.class.getName() + "() as (id, name, grade);"); try { DataBag dbfrj = BagFactory.getInstance().newDefaultBag(); DataBag dbshj = BagFactory.getInstance().newDefaultBag(); { - pigServer.registerQuery("C = join A by id, B by id using 'merge';"); + pigServer.registerQuery("C = join A by id, B by id using 'merge';"); pigServer.registerQuery("D = group C by A::id using 'collected';"); pigServer.registerQuery("E = foreach D generate group, COUNT(C);"); Iterator<Tuple> iter = pigServer.openIterator("E"); @@ -312,7 +311,7 @@ public class TestCollectedGroup { } } { - pigServer.registerQuery("F = join A by id, B by id;"); + pigServer.registerQuery("F = join A by id, B by id;"); pigServer.registerQuery("G = group F by A::id;"); pigServer.registerQuery("H = foreach G generate group, COUNT(F);"); Iterator<Tuple> iter = pigServer.openIterator("H"); @@ -321,7 +320,7 @@ public class TestCollectedGroup { dbshj.add(iter.next()); } } - Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0); + Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0); Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj)); } catch (Exception e) { @@ -337,4 +336,4 @@ public class TestCollectedGroup { } } -} +} \ No newline at end of file Modified: pig/trunk/test/org/apache/pig/test/TestCombiner.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCombiner.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestCombiner.java (original) +++ pig/trunk/test/org/apache/pig/test/TestCombiner.java Mon May 29 15:00:39 2017 @@ -116,13 +116,17 @@ public class TestCombiner { inputLines.add("a,c,1"); String inputFileName = loadWithTestLoadFunc("A", pig, inputLines); - pig.registerQuery("B = group A by ($0, $1);"); - pig.registerQuery("C = foreach B generate flatten(group), COUNT($1);"); - Iterator<Tuple> resultIterator = pig.openIterator("C"); - Tuple tuple = resultIterator.next(); - assertEquals("(a,b,2)", tuple.toString()); - tuple = resultIterator.next(); - assertEquals("(a,c,1)", tuple.toString()); + pig.registerQuery("B = foreach A generate $0 as (c0:chararray), $1 as (c1:chararray), $2 as (c2:int);"); + pig.registerQuery("C = group B by ($0, $1);"); + pig.registerQuery("D = foreach C generate flatten(group), COUNT($1) as int;"); + // Since the input has no schema, using Util.getTuplesFromConstantTupleStrings fails assert. + List<Tuple> resultTuples = Util.getTuplesFromConstantTupleStrings( + new String[]{ + "('a','b',2)", + "('a','c',1)", + }); + Iterator<Tuple> resultIterator = pig.openIterator("D"); + Util.checkQueryOutputsAfterSort(resultIterator, resultTuples); return inputFileName; } @@ -185,7 +189,7 @@ public class TestCombiner { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); pigServer.explain("c", ps); - assertTrue(baos.toString().matches("(?si).*combine plan.*")); + checkCombinerUsed(pigServer, "c", true); Iterator<Tuple> it = pigServer.openIterator("c"); Tuple t = it.next(); @@ -235,7 +239,7 @@ public class TestCombiner { ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); pigServer.explain("c", ps); - assertTrue(baos.toString().matches("(?si).*combine plan.*")); + checkCombinerUsed(pigServer, "c", true); HashMap<String, Object[]> results = new HashMap<String, Object[]>(); results.put("pig1", new Object[] { "pig1", 3L, 57L, 5.2, 75L, 9.4, 3L, 3L, 57L }); @@ -256,6 +260,56 @@ public class TestCombiner { } @Test + public void testGroupAndUnion() throws Exception { + // test use of combiner when group elements are accessed in the foreach + String input1[] = { + "ABC\t1\ta\t1", + "ABC\t1\tb\t2", + "ABC\t1\ta\t3", + "ABC\t2\tb\t4", + }; + + Util.createInputFile(cluster, "testGroupElements1.txt", input1); + PigServer pigServer = new PigServer(cluster.getExecType(), properties); + pigServer.debugOn(); + pigServer.registerQuery("a1 = load 'testGroupElements1.txt' " + + "as (str:chararray, num1:int, alph : chararray, num2 : int);"); + pigServer.registerQuery("b1 = group a1 by str;"); + + // check if combiner is present or not for various forms of foreach + pigServer.registerQuery("c1 = foreach b1 generate flatten(group), COUNT(a1.alph), SUM(a1.num2); "); + + String input2[] = { + "DEF\t2\ta\t3", + "DEF\t2\td\t5", + }; + + Util.createInputFile(cluster, "testGroupElements2.txt", input2); + pigServer.registerQuery("a2 = load 'testGroupElements2.txt' " + + "as (str:chararray, num1:int, alph : chararray, num2 : int);"); + pigServer.registerQuery("b2 = group a2 by str;"); + + // check if combiner is present or not for various forms of foreach + pigServer.registerQuery("c2 = foreach b2 generate flatten(group), COUNT(a2.alph), SUM(a2.num2); "); + + pigServer.registerQuery("c = union c1, c2;"); + + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[]{ + "('ABC',4L,10L)", + "('DEF',2L,8L)", + }); + + Iterator<Tuple> it = pigServer.openIterator("c"); + Util.checkQueryOutputsAfterSort(it, expectedRes); + + Util.deleteFile(cluster, "testGroupElements1.txt"); + Util.deleteFile(cluster, "testGroupElements2.txt"); + pigServer.shutdown(); + } + + @Test public void testGroupElements() throws Exception { // test use of combiner when group elements are accessed in the foreach String input[] = { @@ -352,13 +406,19 @@ public class TestCombiner { pigServer.shutdown(); } - private void checkCombinerUsed(PigServer pigServer, String string, boolean combineExpected) + private void checkCombinerUsed(PigServer pigServer, String alias, boolean combineExpected) throws IOException { // make sure there is a combine plan in the explain output ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); - pigServer.explain("c", ps); - boolean combinerFound = baos.toString().matches("(?si).*combine plan.*"); + pigServer.explain(alias, ps); + boolean combinerFound; + if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) { + combinerFound = baos.toString().contains("Reduce By"); + } else { + combinerFound = baos.toString().matches("(?si).*combine plan.*"); + } + System.out.println(baos.toString()); assertEquals("is combiner present as expected", combineExpected, combinerFound); } Modified: pig/trunk/test/org/apache/pig/test/TestCubeOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCubeOperator.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestCubeOperator.java (original) +++ pig/trunk/test/org/apache/pig/test/TestCubeOperator.java Mon May 29 15:00:39 2017 @@ -566,8 +566,9 @@ public class TestCubeOperator { public void testIllustrate() throws Exception { // test for illustrate Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ")); - String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " - + "b = cube a by cube(a1,b1);"; + Assume.assumeTrue("illustrate does not work in spark (PIG-4621)", !Util.getLocalTestMode().toString().startsWith("SPARK")); + String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " + + "b = cube a by cube(a1,b1);"; Util.registerMultiLineQuery(pigServer, query); Map<Operator, DataBag> examples = pigServer.getExamples("b"); Modified: pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEmptyInputDir.java Mon May 29 15:00:39 2017 @@ -33,6 +33,7 @@ import org.apache.pig.PigRunner; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; @@ -85,6 +86,8 @@ public class TestEmptyInputDir { assertEquals(0, js.getNumberMaps()); } + //Spark doesn't create an empty result file part-*, only a _SUCCESS file if input dir was empty + Assume.assumeTrue("Skip this test for Spark. See PIG-5140", !Util.isSparkExecType(cluster.getExecType())); assertEmptyOutputFile(); } finally { new File(PIG_FILE).delete(); Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Mon May 29 15:00:39 2017 @@ -24,6 +24,7 @@ import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -422,9 +423,15 @@ public class TestEvalPipeline { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numIdentity = 0; + List<Tuple> actualResList = new ArrayList<Tuple>(); while(iter.hasNext()){ - Tuple t = iter.next(); + actualResList.add(iter.next()); + } + + Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType())); + + int numIdentity = 0; + for (Tuple t : actualResList) { Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0)); Assert.assertEquals((Long)5L, (Long)t.get(2)); Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01); @@ -467,9 +474,15 @@ public class TestEvalPipeline { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numIdentity = 0; + List<Tuple> actualResList = new ArrayList<Tuple>(); while(iter.hasNext()){ - Tuple t = iter.next(); + actualResList.add(iter.next()); + } + + Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType())); + + int numIdentity = 0; + for (Tuple t : actualResList) { Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0)); Assert.assertEquals((Long)5L, (Long)t.get(2)); Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01); @@ -842,9 +855,15 @@ public class TestEvalPipeline { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numIdentity = 0; + List<Tuple> actualResList = new ArrayList<Tuple>(); while(iter.hasNext()){ - Tuple t = iter.next(); + actualResList.add(iter.next()); + } + + Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType())); + + int numIdentity = 0; + for (Tuple t : actualResList) { Assert.assertEquals((Integer)((numIdentity + 1) * 10), (Integer)t.get(0)); Assert.assertEquals((Long)10L, (Long)t.get(1)); Assert.assertEquals((Long)5L, (Long)t.get(2)); @@ -873,6 +892,10 @@ public class TestEvalPipeline { pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile.toString(), pigContext) + "';"); pigServer.registerQuery("B = distinct A;"); + if (Util.isSparkExecType(cluster.getExecType())) { + pigServer.registerQuery("B = order B by *;"); + } + String query = "C = foreach B {" + "C1 = $1 - $0;" + "C2 = $1%2;" @@ -883,7 +906,6 @@ public class TestEvalPipeline { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numRows = 0; for(int i = 0; i < LOOP_COUNT; i++) { for(int j = 0; j < LOOP_COUNT; j+=2){ @@ -920,6 +942,10 @@ public class TestEvalPipeline { pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile.toString(), pigContext) + "';"); pigServer.registerQuery("B = distinct A;"); + if (Util.isSparkExecType(cluster.getExecType())) { + pigServer.registerQuery("B = order B by *;"); + } + String query = "C = foreach B {" + "C1 = $0 + $1;" + "C2 = C1 + $0;" @@ -929,7 +955,6 @@ public class TestEvalPipeline { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numRows = 0; for(int i = 0; i < LOOP_COUNT; i++) { for(int j = 0; j < LOOP_COUNT; j+=2){ Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Mon May 29 15:00:39 2017 @@ -138,9 +138,15 @@ public class TestEvalPipeline2 { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - int numIdentity = 0; + List<Tuple> actualResList = new ArrayList<Tuple>(); while(iter.hasNext()){ - Tuple tuple = iter.next(); + actualResList.add(iter.next()); + } + + Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType())); + + int numIdentity = 0; + for (Tuple tuple : actualResList) { Tuple t = (Tuple)tuple.get(0); Assert.assertEquals(DataByteArray.class, t.get(0).getClass()); int group = Integer.parseInt(new String(((DataByteArray)t.get(0)).get())); @@ -476,17 +482,24 @@ public class TestEvalPipeline2 { pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;"); Iterator<Tuple> iter = pigServer.openIterator("D"); - Assert.assertTrue(iter.hasNext()); - Tuple t = iter.next(); + if (Util.isSparkExecType(cluster.getExecType())) { + String[] expectedResults = + new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" }; + Util.checkQueryOutputs(iter, expectedResults, + org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), Util.isSparkExecType(cluster.getExecType())); + } else { + Assert.assertTrue(iter.hasNext()); + Tuple t = iter.next(); - Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})")); + Assert.assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})")); - Assert.assertTrue(iter.hasNext()); - t = iter.next(); + Assert.assertTrue(iter.hasNext()); + t = iter.next(); - Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})")); + Assert.assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})")); - Assert.assertFalse(iter.hasNext()); + Assert.assertFalse(iter.hasNext()); + } } // See PIG-1195 @@ -739,16 +752,10 @@ public class TestEvalPipeline2 { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("EventsPerMinute"); - Tuple t = iter.next(); - Assert.assertTrue( (Long)t.get(0) == 60000 && (Long)t.get(1) == 2 && (Long)t.get(2) == 3 ); - - t = iter.next(); - Assert.assertTrue( (Long)t.get(0) == 120000 && (Long)t.get(1) == 2 && (Long)t.get(2) == 2 ); - - t = iter.next(); - Assert.assertTrue( (Long)t.get(0) == 240000 && (Long)t.get(1) == 1 && (Long)t.get(2) == 1 ); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", "(240000L,1L,1L)"}); - Assert.assertFalse(iter.hasNext()); + Util.checkQueryOutputs(iter, expectedResults, Util.isSparkExecType(cluster.getExecType())); } // See PIG-1729 @@ -1580,6 +1587,9 @@ public class TestEvalPipeline2 { pigServer.registerQuery("data = load 'table_testLimitFlatten' as (k,v);"); pigServer.registerQuery("grouped = GROUP data BY k;"); + if (Util.isSparkExecType(cluster.getExecType())) { + pigServer.registerQuery("grouped = ORDER grouped BY group;"); + } pigServer.registerQuery("selected = LIMIT grouped 2;"); pigServer.registerQuery("flattened = FOREACH selected GENERATE FLATTEN (data);"); @@ -1587,7 +1597,9 @@ public class TestEvalPipeline2 { String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"}; - Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened"))); + Util.checkQueryOutputs(iter, expected, + org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")), + Util.isSparkExecType(cluster.getExecType())); } // See PIG-2237 @@ -1650,8 +1662,14 @@ public class TestEvalPipeline2 { return false; } }); - // auto-parallelism is 2 in MR, 20 in Tez, so check >=2 - Assert.assertTrue(partFiles.length >= 2); + + if (Util.isSparkExecType(cluster.getExecType())) { + // TODO: Fix this when we implement auto-parallelism in Spark + Assert.assertTrue(partFiles.length == 1); + } else { + // auto-parallelism is 2 in MR, 20 in Tez, so check >=2 + Assert.assertTrue(partFiles.length >= 2); + } // Check the output Iterator<Tuple> iter = job.getResults(); List<Tuple> results = new ArrayList<Tuple>(); Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java (original) +++ pig/trunk/test/org/apache/pig/test/TestEvalPipelineLocal.java Mon May 29 15:00:39 2017 @@ -37,6 +37,7 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.StringTokenizer; +import java.util.Collections; import org.apache.hadoop.mapreduce.Job; import org.apache.log4j.Appender; @@ -884,7 +885,7 @@ public class TestEvalPipelineLocal { int LOOP_COUNT = 10; File tmpFile = File.createTempFile("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); - for(int i = 0; i < LOOP_COUNT; i++) { + for(int i=0; i<LOOP_COUNT; i++) { for(int j=0;j<LOOP_COUNT;j+=2){ ps.println(i+"\t"+j); ps.println(i+"\t"+j); @@ -908,18 +909,29 @@ public class TestEvalPipelineLocal { Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); int numIdentity = 0; + // When running with spark, output can be in a different order than that + // when running in mr mode. + List<Tuple> resList = new ArrayList<Tuple>(); while(iter.hasNext()){ - Tuple t = iter.next(); - Assert.assertEquals((Integer)((numIdentity + 1) * 10), (Integer)t.get(0)); + resList.add(iter.next()); + } + + numIdentity = resList.size(); + Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode())); + Assert.assertEquals(LOOP_COUNT, numIdentity); + // Since delta differences in some cases are allowed, utility function + // to compare tuple-lists cannot be used here. + // This loop generates sorted expected data + for (int i=0; i<numIdentity; i++) { + Tuple t = resList.get(i); + Assert.assertEquals((Integer)((i + 1) * 10), (Integer)t.get(0)); Assert.assertEquals((Long)10L, (Long)t.get(1)); Assert.assertEquals((Long)5L, (Long)t.get(2)); Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01); Assert.assertEquals(8.0, (Double)t.get(5), 0.01); Assert.assertEquals(5L, ((DataBag)t.get(6)).size()); Assert.assertEquals(7, t.size()); - ++numIdentity; } - Assert.assertEquals(LOOP_COUNT, numIdentity); } @Test @@ -927,12 +939,25 @@ public class TestEvalPipelineLocal { int LOOP_COUNT = 10; File tmpFile = File.createTempFile("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + + List<Tuple> expectedList = new ArrayList<Tuple>(); for(int i = 0; i < LOOP_COUNT; i++) { for(int j=0;j<LOOP_COUNT;j+=2){ ps.println(i+"\t"+j); ps.println(i+"\t"+j); + // Generating expected data + Tuple t = mTf.newTuple(); + t.append(new Double(j - i)); + t.append((Integer)(j%2)); + if(j == 0) { + t.append(0.0); + } else { + t.append((Double)((double)i/j)); + } + expectedList.add(t); } } + Util.sortQueryOutputsIfNeed(expectedList, Util.isSparkExecType(Util.getLocalTestMode())); ps.close(); pigServer.registerQuery("A = LOAD '" @@ -949,25 +974,30 @@ public class TestEvalPipelineLocal { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); + // When ruuning with spark, output can be in a different order than when + // running in mr mode. + List<Tuple> resList = new ArrayList<Tuple>(); + while(iter.hasNext()){ + resList.add(iter.next()); + } - int numRows = 0; + Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode())); + Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size()); + + // Since delta difference in some cases is allowed, utility function + // to compare tuple-lists cannot be used here. for(int i = 0; i < LOOP_COUNT; i++) { for(int j = 0; j < LOOP_COUNT; j+=2){ - Tuple t = null; - if(iter.hasNext()) t = iter.next(); - Assert.assertEquals(3, t.size()); - Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01); - Assert.assertEquals((Integer)(j%2), (Integer)t.get(1)); - if(j == 0) { - Assert.assertEquals(0.0, (Double)t.get(2), 0.01); - } else { - Assert.assertEquals((Double)((double)i/j), (Double)t.get(2), 0.01); - } - ++numRows; + int k = i*LOOP_COUNT/2 + j/2; + Tuple res_t = resList.get(k); + Tuple expec_t = expectedList.get(k); + + Assert.assertEquals(expec_t.size(), res_t.size()); + Assert.assertEquals((Double)expec_t.get(0), (Double)res_t.get(0), 0.01); + Assert.assertEquals((Integer)expec_t.get(1), (Integer)res_t.get(1)); + Assert.assertEquals((Double)expec_t.get(2), (Double)res_t.get(2), 0.01); } } - - Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows); } @Test @@ -975,10 +1005,16 @@ public class TestEvalPipelineLocal { int LOOP_COUNT = 10; File tmpFile = File.createTempFile("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + List<Tuple> expectedList = new ArrayList<Tuple>(); for(int i = 0; i < LOOP_COUNT; i++) { for(int j=0;j<LOOP_COUNT;j+=2){ ps.println(i+"\t"+j); ps.println(i+"\t"+j); + // Generating expected data. + Tuple t = mTf.newTuple(); + t.append(new Double(i+j)); + t.append(new Double(i + j + i)); + expectedList.add(t); } } ps.close(); @@ -996,20 +1032,9 @@ public class TestEvalPipelineLocal { pigServer.registerQuery(query); Iterator<Tuple> iter = pigServer.openIterator("C"); if(!iter.hasNext()) Assert.fail("No output found"); - - int numRows = 0; - for(int i = 0; i < LOOP_COUNT; i++) { - for(int j = 0; j < LOOP_COUNT; j+=2){ - Tuple t = null; - if(iter.hasNext()) t = iter.next(); - Assert.assertEquals(2, t.size()); - Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01); - Assert.assertEquals(new Double(i + j + i), (Double)t.get(1)); - ++numRows; - } - } - - Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows); + // When ruuning with spark, output can be in a different order than that + // when running in mr mode. + Util.checkQueryOutputs(iter, expectedList, Util.isSparkExecType(Util.getLocalTestMode())); } @Test @@ -1128,9 +1153,15 @@ public class TestEvalPipelineLocal { pigServer.registerQuery("b = foreach a generate TOTUPLE(x, y) as t, z;"); pigServer.registerQuery("c = group b by t;"); Iterator<Tuple> iter = pigServer.openIterator("c"); - Assert.assertTrue(iter.next().toString().equals("((1,2),{((1,2),3)})")); - Assert.assertTrue(iter.next().toString().equals("((4,5),{((4,5),6)})")); - Assert.assertFalse(iter.hasNext()); + // When ruuning with spark, output can be in a different order than that + // when running in mr mode. + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "((1,2),{((1,2),3)})", + "((4,5),{((4,5),6)})" + }); + Util.checkQueryOutputs(iter, expectedRes, Util.isSparkExecType(Util.getLocalTestMode())); } @Test @@ -1297,7 +1328,7 @@ public class TestEvalPipelineLocal { logger.removeAppender(appender); - Assert.assertTrue(bos.toString().contains("New For Each(false,false)[tuple]")); + Assert.assertTrue(bos.toString().contains("New For Each")); } @Test Modified: pig/trunk/test/org/apache/pig/test/TestFinish.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFinish.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestFinish.java (original) +++ pig/trunk/test/org/apache/pig/test/TestFinish.java Mon May 29 15:00:39 2017 @@ -45,7 +45,7 @@ public class TestFinish { BagFactory mBf = BagFactory.getInstance(); File f1; - static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); static public class MyEvalFunction extends EvalFunc<Tuple> { String execType; Modified: pig/trunk/test/org/apache/pig/test/TestFlatten.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestFlatten.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestFlatten.java (original) +++ pig/trunk/test/org/apache/pig/test/TestFlatten.java Mon May 29 15:00:39 2017 @@ -95,6 +95,6 @@ public class TestFlatten { List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( new String[] { "('a', 'b', '1', '2')", "('a', 'b', '3', '4')", "('c', 'd', '1', '2')", "('c', 'd', '3', '4')" }); - Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); } } Modified: pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Mon May 29 15:00:39 2017 @@ -101,14 +101,8 @@ public class TestForEachNestedPlanLocal pig.registerQuery("D = foreach C {" + "crossed = cross user, session;" + "generate crossed;" + "}"); - Iterator<Tuple> expectedItr = expectedResults.iterator(); Iterator<Tuple> actualItr = pig.openIterator("D"); - while (expectedItr.hasNext() && actualItr.hasNext()) { - Tuple expectedTuple = expectedItr.next(); - Tuple actualTuple = actualItr.next(); - assertEquals(expectedTuple, actualTuple); - } - assertEquals(expectedItr.hasNext(), actualItr.hasNext()); + Util.checkQueryOutputsAfterSort(actualItr, expectedResults); } @Test @@ -156,14 +150,8 @@ public class TestForEachNestedPlanLocal + "crossed = cross user, distinct_session;" + "filtered = filter crossed by user::region == distinct_session::region;" + "generate filtered;" + "}"); - Iterator<Tuple> expectedItr = expectedResults.iterator(); Iterator<Tuple> actualItr = pig.openIterator("D"); - while (expectedItr.hasNext() && actualItr.hasNext()) { - Tuple expectedTuple = expectedItr.next(); - Tuple actualTuple = actualItr.next(); - assertEquals(expectedTuple, actualTuple); - } - assertEquals(expectedItr.hasNext(), actualItr.hasNext()); + Util.checkQueryOutputsAfterSort(actualItr, expectedResults); } @Test @@ -187,14 +175,8 @@ public class TestForEachNestedPlanLocal pig.registerQuery("D = foreach C {" + "crossed = cross user, session, profile;" + "generate crossed;" + "}"); - Iterator<Tuple> expectedItr = expectedResults.iterator(); Iterator<Tuple> actualItr = pig.openIterator("D"); - while (expectedItr.hasNext() && actualItr.hasNext()) { - Tuple expectedTuple = expectedItr.next(); - Tuple actualTuple = actualItr.next(); - assertEquals(expectedTuple, actualTuple); - } - assertEquals(expectedItr.hasNext(), actualItr.hasNext()); + Util.checkQueryOutputsAfterSort(actualItr, expectedResults); } /* Modified: pig/trunk/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGrunt.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/trunk/test/org/apache/pig/test/TestGrunt.java Mon May 29 15:00:39 2017 @@ -70,7 +70,6 @@ import org.junit.BeforeClass; import org.junit.Test; public class TestGrunt { - static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); private String basedir = "test/org/apache/pig/test/data"; @@ -915,6 +914,15 @@ public class TestGrunt { @Test public void testKeepGoigFailed() throws Throwable { + // in mr mode, the output file 'baz' will be automatically deleted if the mr job fails + // when "cat baz;" is executed, it throws "Encountered IOException. Directory baz does not exist" + // in GruntParser#processCat() and variable "caught" is true + // in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953) + // when "cat baz;" is executed, it does not throw exception and the variable "caught" is false + // TODO: Enable this for Spark when SPARK-7953 is resolved + Assume.assumeTrue( + "Skip this test for Spark until SPARK-7953 is resolved!", + !Util.isSparkExecType(cluster.getExecType())); PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); PigContext context = server.getPigContext(); Util.copyFromLocalToCluster(cluster, "test/org/apache/pig/test/data/passwd", "passwd"); @@ -936,7 +944,6 @@ public class TestGrunt { InputStreamReader reader = new InputStreamReader(cmd); Grunt grunt = new Grunt(new BufferedReader(reader), context); - boolean caught = false; try { grunt.exec(); @@ -1004,7 +1011,13 @@ public class TestGrunt { grunt.exec(); } catch (PigException e) { caught = true; - assertTrue(e.getErrorCode() == 6017); + if (!Util.isSparkExecType(cluster.getExecType())) { + assertTrue(e.getErrorCode() == 6017); + } else { + //In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD, + //So unwrap the exception here + assertTrue(((ExecException) e.getCause()).getErrorCode() == 6017); + } } if (Util.isMapredExecType(cluster.getExecType())) { @@ -1621,7 +1634,7 @@ public class TestGrunt { boolean found = false; for (String line : lines) { if (line.matches(".*Added jar .*" + jarName + ".*")) { - // MR mode + // MR and Spark mode found = true; } else if (line.matches(".*Local resource.*" + jarName + ".*")) { // Tez mode Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Mon May 29 15:00:39 2017 @@ -18,6 +18,7 @@ package org.apache.pig.test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -1443,23 +1445,20 @@ public class TestHBaseStorage { Iterator<Tuple> it = pig.openIterator("c"); int index = 0; - while (it.hasNext()) { - Tuple t = it.next(); - String rowKey = (String) t.get(0); - int col_a = (Integer) t.get(1); - Assert.assertNotNull(t.get(2)); - double col_b = (Double) t.get(2); - String col_c = (String) t.get(3); - - Assert.assertEquals("00".substring((index + "").length()) + index, - rowKey); - Assert.assertEquals(index, col_a); - Assert.assertEquals(index + 0.0, col_b, 1e-6); - Assert.assertEquals("Text_" + index, col_c); + List<Tuple> expected = new ArrayList<Tuple>(); + while (index<TEST_ROW_COUNT) { + Tuple t = TupleFactory.getInstance().newTuple(); + t.append("00".substring((index + "").length()) + index); + t.append(index); + t.append(index + 0.0); + t.append("Text_" + index); + t.append(index); + t.append(new DataByteArray("Text_" + index)); index++; + expected.add(t); } - Assert.assertEquals(index, TEST_ROW_COUNT); - } + Util.checkQueryOutputsAfterSort(it, expected); +} @Test // See PIG-4151 Modified: pig/trunk/test/org/apache/pig/test/TestLimitVariable.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLimitVariable.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestLimitVariable.java (original) +++ pig/trunk/test/org/apache/pig/test/TestLimitVariable.java Mon May 29 15:00:39 2017 @@ -80,15 +80,17 @@ public class TestLimitVariable { Iterator<Tuple> it = pigServer.openIterator("g"); List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { - "(5.0,36)"}); - Util.checkQueryOutputs(it, expectedRes); + "(5.0,36L)"}); + Util.checkQueryOutputsAfterSort(it, expectedRes); pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG); } @Test public void testLimitVariable2() throws IOException { + //add field type here to use Util.checkQueryOutputsAfterSort comparing the expected and actual + //results String query = - "a = load '" + inputFile.getName() + "' as (id, num);" + + "a = load '" + inputFile.getName() + "' as (id:int, num:int);" + "b = filter a by id == 2;" + // only 1 tuple returned (2,3) "c = order a by id ASC;" + "d = limit c b.num;" + // test bytearray to long implicit cast @@ -99,18 +101,20 @@ public class TestLimitVariable { Iterator<Tuple> itD = pigServer.openIterator("d"); List<Tuple> expectedResD = Util.getTuplesFromConstantTupleStrings(new String[] { "(1,11)", "(2,3)", "(3,10)" }); - Util.checkQueryOutputs(itD, expectedResD); + Util.checkQueryOutputsAfterSort(itD, expectedResD); Iterator<Tuple> itE = pigServer.openIterator("e"); List<Tuple> expectedResE = Util.getTuplesFromConstantTupleStrings(new String[] { "(1,11)", "(2,3)", "(3,10)", "(4,11)", "(5,10)", "(6,15)" }); - Util.checkQueryOutputs(itE, expectedResE); + Util.checkQueryOutputsAfterSort(itE, expectedResE); } @Test public void testLimitVariable3() throws IOException { + //add field type here to use Util.checkQueryOutputsAfterSort comparing the expected and actual + //results String query = - "a = load '" + inputFile.getName() + "' ;" + + "a = load '" + inputFile.getName() + "' as (id:int, num:int);" + "b = group a all;" + "c = foreach b generate COUNT(a) as sum;" + "d = order a by $0 ASC;" + @@ -121,7 +125,7 @@ public class TestLimitVariable { Iterator<Tuple> itE = pigServer.openIterator("e"); List<Tuple> expectedResE = Util.getTuplesFromConstantTupleStrings(new String[] { "(1,11)", "(2,3)", "(3,10)", "(4,11)", "(5,10)", "(6,15)" }); - Util.checkQueryOutputs(itE, expectedResE); + Util.checkQueryOutputsAfterSort(itE, expectedResE); } @Test Modified: pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original) +++ pig/trunk/test/org/apache/pig/test/TestLineageFindRelVisitor.java Mon May 29 15:00:39 2017 @@ -259,9 +259,10 @@ public class TestLineageFindRelVisitor { pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n"); Iterator<Tuple> iter = pig.openIterator("E"); - Assert.assertEquals("123", iter.next().get(0)); - Assert.assertEquals("456", iter.next().get(0)); - Assert.assertEquals("789", iter.next().get(0)); + Util.checkQueryOutputs(iter, + new String[]{"(123)", "(456)", "(789)"}, + org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")), Util.isSparkExecType(Util + .getLocalTestMode())); } @Test Modified: pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMapSideCogroup.java Mon May 29 15:00:39 2017 @@ -311,12 +311,11 @@ public class TestMapSideCogroup { pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 + "' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray,c2:int);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray,c2:int);"); - DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag(); + List<Tuple> dbMergeCogrp = new ArrayList<Tuple>(); pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';"); Iterator<Tuple> iter = pigServer.openIterator("C"); - while(iter.hasNext()) { Tuple t = iter.next(); dbMergeCogrp.add(t); @@ -335,12 +334,29 @@ public class TestMapSideCogroup { "(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})" }; - assertEquals(9, dbMergeCogrp.size()); + List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results); + + //We need sort dbMergeCogrp because the result is different in sequence between spark and other mode when + //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...) + for (Tuple t : dbMergeCogrp) { + Util.convertBagToSortedBag(t); + } + for (Tuple t : expected) { + Util.convertBagToSortedBag(t); + } + + Collections.sort(dbMergeCogrp); + Collections.sort(expected); + assertEquals(dbMergeCogrp.size(), expected.size()); + + //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not + //apply schema for each input tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert. + // The schema for C is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}). + //But the schema for result "dbMergeCogrp" is (int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)}) Iterator<Tuple> itr = dbMergeCogrp.iterator(); - for(int i=0; i<9; i++){ - assertEquals(itr.next().toString(), results[i]); + for (int i = 0; i < dbMergeCogrp.size(); i++) { + assertEquals(itr.next().toString(), expected.get(i).toString()); } - assertFalse(itr.hasNext()); } @Test Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMergeJoinOuter.java Mon May 29 15:00:39 2017 @@ -168,7 +168,7 @@ public class TestMergeJoinOuter { @Test public void testLeftOuter() throws IOException { - + pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); @@ -197,7 +197,7 @@ public class TestMergeJoinOuter { @Test public void testRightOuter() throws IOException{ - + pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); pigServer.registerQuery("C = join A by c1 right, B by c1 using 'merge';"); @@ -224,7 +224,7 @@ public class TestMergeJoinOuter { @Test public void testFullOuter() throws IOException{ - + pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);"); pigServer.registerQuery("C = join A by c1 full, B by c1 using 'merge';"); Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon May 29 15:00:39 2017 @@ -31,6 +31,9 @@ import org.apache.pig.backend.executione import org.apache.pig.builtin.mock.Storage; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.tools.pigstats.InputStats; + import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -106,33 +109,25 @@ public class TestMultiQuery { myPig.registerQuery("E = load 'output1' as (a:int, b:int);"); Iterator<Tuple> iter = myPig.openIterator("E"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { + String[] expectedResults = new String[]{ "(1,2)", "(2,3)" - }); + }; + Schema s = myPig.dumpSchema("E"); + Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util + .isSparkExecType(Util.getLocalTestMode())); - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); - } - assertEquals(expectedResults.size(), counter); myPig.registerQuery("E = load 'output2' as (a:int, b:int);"); iter = myPig.openIterator("E"); - expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { + expectedResults = new String[]{ "(2,3)", "(3,4)" - }); - - counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + }; + s = myPig.dumpSchema("E"); + Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util + .isSparkExecType(Util.getLocalTestMode())); } @Test @@ -165,20 +160,15 @@ public class TestMultiQuery { Iterator<Tuple> iter = myPig.openIterator("F"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { + String[] expectedResults = new String[]{ "(1,2)", "(2,3)", "(3,5)", "(5,6)" - }); - - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + }; + Schema s = myPig.dumpSchema("F"); + Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util + .isSparkExecType(Util.getLocalTestMode())); } @Test @@ -299,19 +289,14 @@ public class TestMultiQuery { Iterator<Tuple> iter = myPig.openIterator("E"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { - "(1L,'apple',3,1L,'apple',1L,{(1L)})", - "(2L,'orange',4,2L,'orange',2L,{(2L)})", - "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})" - }); - - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + String[] expectedResults = new String[]{ + "(1L,apple,3,1L,apple,1L,{(1L)})", + "(2L,orange,4,2L,orange,2L,{(2L)})", + "(3L,persimmon,5,3L,persimmon,3L,{(3L)})" + }; + Schema s = myPig.dumpSchema("E"); + Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util + .isSparkExecType(Util.getLocalTestMode())); } @Test @@ -345,19 +330,14 @@ public class TestMultiQuery { Iterator<Tuple> iter = myPig.openIterator("joined_session_info"); - List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( - new String[] { - "('apple',{},{('apple','jar',1L)})", - "('orange',{},{('orange','box',1L)})", - "('strawberry',{(30,'strawberry','quit','bot')},{})" - }); - - int counter = 0; - while (iter.hasNext()) { - assertEquals(expectedResults.get(counter++).toString(), iter.next().toString()); - } - - assertEquals(expectedResults.size(), counter); + String[] expectedResults = new String[]{ + "(apple,{},{(apple,jar,1L)})", + "(orange,{},{(orange,box,1L)})", + "(strawberry,{(30,strawberry,quit,bot)},{})"}; + + Schema s = myPig.dumpSchema("joined_session_info"); + Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util + .isSparkExecType(Util.getLocalTestMode())); } @Test @@ -861,12 +841,12 @@ public class TestMultiQuery { List<Tuple> actualResults = data.get("output1"); List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( new String[] {"(1)", "(2)"}); - Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); actualResults = data.get("output2"); expectedResults = Util.getTuplesFromConstantTupleStrings( new String[] {"(1, 'world')", "(2, 'world')"}); - Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults); } @Test @@ -908,6 +888,27 @@ public class TestMultiQuery { } @Test + public void testMultiQueryJiraPig4899() throws Exception { + myPig.setBatchOn(); + + myPig.registerQuery("a = load 'passwd' " + + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);"); + myPig.registerQuery("b1 = foreach a generate uname;"); + myPig.registerQuery("b2 = foreach a generate uid;"); + myPig.registerQuery("store b1 into 'output1';"); + myPig.registerQuery("store b2 into 'output2';"); + + List<ExecJob> jobs = myPig.executeBatch(); + for (ExecJob job : jobs) { + assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED); + List<InputStats> stats = job.getStatistics().getInputStats(); + assertEquals(1,stats.size()); + InputStats stat = stats.get(0); + assertEquals("Number of records in passwd file is 14",14,stat.getNumberRecords()); + } + } + + @Test public void testMultiQueryJiraPig4883() throws Exception { Storage.Data data = Storage.resetData(myPig); data.set("inputLocation", @@ -934,20 +935,24 @@ public class TestMultiQuery { List<Tuple> actualResults = data.get("output1"); String[] expectedResults = new String[]{"(12, 1)"}; - Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B1"))); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan + .logical.Util.translateSchema(myPig.dumpSchema("B1")), Util.isSparkExecType(Util.getLocalTestMode())); actualResults = data.get("output2"); expectedResults = new String[]{"(c,1)"}; - Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B2"))); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util + .translateSchema(myPig.dumpSchema("B2")), Util.isSparkExecType(Util.getLocalTestMode())); actualResults = data.get("output3"); expectedResults = new String[]{"(-12, 1)"}; - Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C1"))); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util + .translateSchema(myPig.dumpSchema("C1")), Util.isSparkExecType(Util.getLocalTestMode())); actualResults = data.get("output4"); expectedResults = new String[]{"(d,1)"}; - Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C2"))); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util + .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode())); } // -------------------------------------------------------------------------- Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Mon May 29 15:00:39 2017 @@ -487,6 +487,7 @@ public class TestMultiQueryLocal { public void testMultiQueryWithIllustrate() throws Exception { Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ")); + Assume.assumeTrue("illustrate does not work in spark (PIG-4621)", !Util.getLocalTestMode().toString().startsWith("SPARK")); System.out.println("===== test multi-query with illustrate ====="); try { Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original) +++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Mon May 29 15:00:39 2017 @@ -29,6 +29,7 @@ import java.util.List; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; @@ -210,6 +211,13 @@ public class TestNativeMapReduce { } catch (JobCreationException e) { // Running in Tez mode throw exception assertTrue(e.getCause() instanceof FileAlreadyExistsException); + } catch (ExecException e) { + // Running in spark mode throw exception + if (e.getCause() instanceof RuntimeException) { + RuntimeException re = (RuntimeException) e.getCause(); + JobCreationException jce = (JobCreationException) re.getCause(); + assertTrue(jce.getCause() instanceof FileAlreadyExistsException); + } } finally{ // We have to manually delete intermediate mapreduce files Modified: pig/trunk/test/org/apache/pig/test/TestNullConstant.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNullConstant.java?rev=1796639&r1=1796638&r2=1796639&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestNullConstant.java (original) +++ pig/trunk/test/org/apache/pig/test/TestNullConstant.java Mon May 29 15:00:39 2017 @@ -109,16 +109,7 @@ public class TestNullConstant { pigServer.registerQuery("d = foreach c generate flatten((SIZE(a) == 0 ? null: a)), flatten((SIZE(b) == 0 ? null : b));"); Iterator<Tuple> it = pigServer.openIterator("d"); Object[][] results = new Object[][]{{10, "will_join", 10, "will_join"}, {11, "will_not_join", null}, {null, 12, "will_not_join"}}; - int i = 0; - while(it.hasNext()) { - - Tuple t = it.next(); - Object[] result = results[i++]; - assertEquals(result.length, t.size()); - for (int j = 0; j < result.length; j++) { - assertEquals(result[j], t.get(j)); - } - } + Util.checkQueryOutputsAfterSort(it,results); } @Test