Added: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (added) +++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; + +public class SparkMiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("build/classes"); + private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml"); + private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml"); + private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml"); + private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml"); + + private Configuration m_dfs_conf = null; + protected MiniMRYarnCluster m_mr = null; + private Configuration m_mr_conf = null; + + private static final Log LOG = LogFactory + .getLog(SparkMiniCluster.class); + private ExecType spark = new SparkExecType(); + SparkMiniCluster() { + + } + + @Override + public ExecType getExecType() { + return spark; + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + deleteConfFiles(); + CONF_DIR.mkdirs(); + + // Build mini DFS cluster + Configuration hdfsConf = new Configuration(); + m_dfs = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(2) + .format(true) + .racks(null) + .build(); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + //Create user home directory + m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); + // Write core-site.xml + Configuration core_site = new Configuration(false); + core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + core_site.writeXml(new FileOutputStream(CORE_CONF_FILE)); + + Configuration hdfs_site = new Configuration(false); + for (Map.Entry<String, String> conf : m_dfs_conf) { + if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) { + hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey())); + } + } + hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE)); + + // Build mini YARN cluster + m_mr = new MiniMRYarnCluster("PigMiniCluster", 2); + m_mr.init(m_dfs_conf); + m_mr.start(); + m_mr_conf = m_mr.getConfig(); + m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + System.getProperty("java.class.path")); + + Configuration mapred_site = new Configuration(false); + Configuration yarn_site = new Configuration(false); + for (Map.Entry<String, String> conf : m_mr_conf) { + if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) { + if (conf.getKey().contains("yarn")) { + yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } else if (!conf.getKey().startsWith("dfs")){ + mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } + } + } + + mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE)); + yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE)); + + m_conf = m_mr_conf; + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + System.setProperty("hadoop.log.dir", "build/test/logs"); + } catch (IOException e) { + throw new RuntimeException(e); + + } + } + + @Override + protected void shutdownMiniMrClusters() { + deleteConfFiles(); + if (m_mr != null) { + m_mr.stop(); + m_mr = null; + } + } + + private void deleteConfFiles() { + + if(CORE_CONF_FILE.exists()) { + CORE_CONF_FILE.delete(); + } + if(HDFS_CONF_FILE.exists()) { + HDFS_CONF_FILE.delete(); + } + if(MAPRED_CONF_FILE.exists()) { + MAPRED_CONF_FILE.delete(); + } + if(YARN_CONF_FILE.exists()) { + YARN_CONF_FILE.delete(); + } + } + + static public Launcher getLauncher() { + return new SparkLauncher(); + } +}
Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Wed Feb 22 09:43:41 2017 @@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -67,16 +66,10 @@ public class TestBZip { @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.") public static Iterable<Object[]> data() { - if ( HadoopShims.isHadoopYARN() ) { - return Arrays.asList(new Object[][] { - { false }, - { true } - }); - } else { - return Arrays.asList(new Object[][] { - { false } - }); - } + return Arrays.asList(new Object[][] { + { false }, + { true } + }); } public TestBZip (Boolean useBzipFromHadoop) { Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Wed Feb 22 09:43:41 2017 @@ -130,6 +130,7 @@ import org.apache.pig.data.DefaultBagFac import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.ReadToEndLoader; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -3206,29 +3207,31 @@ public class TestBuiltin { @Test public void testUniqueID() throws Exception { Util.resetStateForExecModeSwitch(); - String inputFileName = "testUniqueID.txt"; - Util.createInputFile(cluster, inputFileName, new String[] - {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"}); Properties copyproperties = new Properties(); copyproperties.putAll(cluster.getProperties()); PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties); - pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10"); + + // running with 2 mappers each taking 5 records + String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath(); + Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"}); + Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"}); pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); - pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); + + pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);"); pigServer.registerQuery("B = foreach A generate name, UniqueID();"); Iterator<Tuple> iter = pigServer.openIterator("B"); if (!Util.isSparkExecType(cluster.getExecType())) { - assertEquals(iter.next().get(1), "0-0"); - assertEquals(iter.next().get(1), "0-1"); - assertEquals(iter.next().get(1), "0-2"); - assertEquals(iter.next().get(1), "0-3"); - assertEquals(iter.next().get(1), "0-4"); - assertEquals(iter.next().get(1), "1-0"); - assertEquals(iter.next().get(1), "1-1"); - assertEquals(iter.next().get(1), "1-2"); - assertEquals(iter.next().get(1), "1-3"); - assertEquals(iter.next().get(1), "1-4"); - } else{ + assertEquals("0-0", iter.next().get(1)); + assertEquals("0-1", iter.next().get(1)); + assertEquals("0-2", iter.next().get(1)); + assertEquals("0-3", iter.next().get(1)); + assertEquals("0-4", iter.next().get(1)); + assertEquals("1-0", iter.next().get(1)); + assertEquals("1-1", iter.next().get(1)); + assertEquals("1-2", iter.next().get(1)); + assertEquals("1-3", iter.next().get(1)); + assertEquals("1-4", iter.next().get(1)); + } else { //there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes) //Split0: // 1\n @@ -3244,34 +3247,35 @@ public class TestBuiltin { // 5\n //The size of Split0 is 12 not 10 because LineRecordReader#nextKeyValue will read one more line //More detail see PIG-4383 - assertEquals(iter.next().get(1), "0-0"); - assertEquals(iter.next().get(1), "0-1"); - assertEquals(iter.next().get(1), "0-2"); - assertEquals(iter.next().get(1), "0-3"); - assertEquals(iter.next().get(1), "0-4"); - assertEquals(iter.next().get(1), "0-5"); - assertEquals(iter.next().get(1), "1-0"); - assertEquals(iter.next().get(1), "1-1"); - assertEquals(iter.next().get(1), "1-2"); - assertEquals(iter.next().get(1), "1-3"); + assertEquals("0-0", iter.next().get(1)); + assertEquals("0-1", iter.next().get(1)); + assertEquals("0-2", iter.next().get(1)); + assertEquals("0-3", iter.next().get(1)); + assertEquals("0-4", iter.next().get(1)); + assertEquals("0-5", iter.next().get(1)); + assertEquals("1-0", iter.next().get(1)); + assertEquals("1-1", iter.next().get(1)); + assertEquals("1-2", iter.next().get(1)); + assertEquals("1-3", iter.next().get(1)); } - Util.deleteFile(cluster, inputFileName); + Util.deleteFile(cluster, TMP_DIR + "/input1.txt"); + Util.deleteFile(cluster, TMP_DIR + "/input2.txt"); } @Test public void testRANDOMWithJob() throws Exception { Util.resetStateForExecModeSwitch(); - String inputFileName = "testRANDOM.txt"; - Util.createInputFile(cluster, inputFileName, new String[] - {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"}); - Properties copyproperties = new Properties(); copyproperties.putAll(cluster.getProperties()); PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties); - // running with two mappers - pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10"); + + // running with 2 mappers each taking 5 records + String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath(); + Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"}); + Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"}); pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); - pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); + + pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);"); pigServer.registerQuery("B = foreach A generate name, RANDOM();"); Iterator<Tuple> iter = pigServer.openIterator("B"); double [] mapper1 = new double[5]; @@ -3294,7 +3298,8 @@ public class TestBuiltin { for( int i = 0; i < 5; i++ ){ assertNotEquals(mapper1[i], mapper2[i], 0.0001); } - Util.deleteFile(cluster, inputFileName); + Util.deleteFile(cluster, TMP_DIR + "/input1.txt"); + Util.deleteFile(cluster, TMP_DIR + "/input2.txt"); } Added: pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java (added) +++ pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + + +import java.util.Properties; + +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +import org.junit.Assert; +import org.junit.Test; + +public class TestConfigurationUtil { + + @Test + public void testExpandForAlternativeNames() { + Properties properties = null; + properties = ConfigurationUtil.expandForAlternativeNames("fs.df.interval", "500"); + Assert.assertEquals(1,properties.size()); + Assert.assertEquals("500",properties.get("fs.df.interval")); + + properties = ConfigurationUtil.expandForAlternativeNames("dfs.df.interval", "600"); + Assert.assertEquals(2,properties.size()); + Assert.assertEquals("600",properties.get("fs.df.interval")); + Assert.assertEquals("600",properties.get("dfs.df.interval")); + + properties = ConfigurationUtil.expandForAlternativeNames("", ""); + Assert.assertEquals(1,properties.size()); + Assert.assertEquals("",properties.get("")); + + } +} Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Wed Feb 22 09:43:41 2017 @@ -30,17 +30,17 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.fs.Path; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; -import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,8 +49,8 @@ import org.junit.runners.JUnit4; public class TestCounters { String file = "input.txt"; - static MiniCluster cluster = MiniCluster.buildCluster(); - + static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + final int MAX = 100*1000; Random r = new Random(); @@ -59,7 +59,7 @@ public class TestCounters { public static void oneTimeTearDown() throws Exception { cluster.shutDown(); } - + @Test public void testMapOnly() throws IOException, ExecException { int count = 0; @@ -70,13 +70,13 @@ public class TestCounters { if(t > 50) count ++; } pw.close(); - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only"); PigStats pigStats = job.getStatistics(); - + //counting the no. of bytes in the output file //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen(); InputStream is = FileLocalizer.open(FileLocalizer.fullPath( @@ -85,9 +85,9 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output_map_only"), true); @@ -98,7 +98,7 @@ public class TestCounters { JobGraph jg = pigStats.getJobGraph(); Iterator<JobStats> iter = jg.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); @@ -123,20 +123,20 @@ public class TestCounters { count ++; } pw.close(); - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath( "output_map_only", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -149,8 +149,8 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); - + JobStats js = iter.next(); + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -158,7 +158,7 @@ public class TestCounters { assertEquals(0, js.getReduceInputRecords()); assertEquals(0, js.getReduceOutputRecords()); } - + System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } @@ -183,7 +183,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -195,7 +195,7 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -208,7 +208,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -242,7 +242,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -253,9 +253,9 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -266,7 +266,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -300,7 +300,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); @@ -311,20 +311,20 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -337,7 +337,7 @@ public class TestCounters { System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } - + @Test public void testMapCombineReduceBinStorage() throws IOException, ExecException { int count = 0; @@ -358,20 +358,20 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); ExecJob job = pigServer.store("c", "output", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -379,11 +379,11 @@ public class TestCounters { System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -399,6 +399,8 @@ public class TestCounters { @Test public void testMultipleMRJobs() throws IOException, ExecException { + Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job", + Util.isMapredExecType(cluster.getExecType())); int count = 0; PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); int [] nos = new int[10]; @@ -413,38 +415,38 @@ public class TestCounters { } pw.close(); - for(int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = order a by $0;"); pigServer.registerQuery("c = group b by $0;"); pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); ExecJob job = pigServer.store("d", "output"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); - + System.out.println("============================================"); System.out.println("Test case MultipleMRJobs"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); - MRJobStats js = (MRJobStats)jp.getSinks().get(0); - + JobStats js = (JobStats)jp.getSinks().get(0); + System.out.println("Job id: " + js.getName()); System.out.println(jp.toString()); - + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -453,12 +455,12 @@ public class TestCounters { assertEquals(count, js.getReduceInputRecords()); System.out.println("Reduce output records : " + js.getReduceOutputRecords()); assertEquals(count, js.getReduceOutputRecords()); - + System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten()); assertEquals(filesize, js.getHdfsBytesWritten()); } - + @Test public void testMapOnlyMultiQueryStores() throws Exception { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); @@ -467,8 +469,8 @@ public class TestCounters { pw.println(t); } pw.close(); - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -479,22 +481,22 @@ public class TestCounters { List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); - + JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(MAX, counter); - } - + + assertEquals(MAX, counter); + } + @Test public void testMultiQueryStores() throws Exception { int[] nums = new int[100]; @@ -505,13 +507,13 @@ public class TestCounters { nums[t]++; } pw.close(); - + int groups = 0; for (int i : nums) { if (i > 0) groups++; } - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -525,29 +527,29 @@ public class TestCounters { pigServer.registerQuery("store g into '/tmp/outout2';"); List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); - + assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); - + JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(groups, counter); - } - - /* + + assertEquals(groups, counter); + } + + /* * IMPORTANT NOTE: * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE - * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED - */ + */ // @Test // public void testLocal() throws IOException, ExecException { // int count = 0; @@ -566,7 +568,7 @@ public class TestCounters { // } // pw.close(); // -// for(int i = 0; i < 10; i++) +// for(int i = 0; i < 10; i++) // if(nos[i] > 0) // count ++; // @@ -580,56 +582,56 @@ public class TestCounters { // pigServer.registerQuery("c = group b by $0;"); // pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); // PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics(); -// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs()); +// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs()); // long filesize = 0; // while(is.read() != -1) filesize++; -// +// // is.close(); // out.delete(); -// +// // //Map<String, Map<String, String>> stats = pigStats.getPigStats(); -// +// // assertEquals(10, pigStats.getRecordsWritten()); // assertEquals(110, pigStats.getBytesWritten()); // // } @Test - public void testJoinInputCounters() throws Exception { + public void testJoinInputCounters() throws Exception { testInputCounters("join"); } - + @Test - public void testCogroupInputCounters() throws Exception { + public void testCogroupInputCounters() throws Exception { testInputCounters("cogroup"); } - + @Test - public void testSkewedInputCounters() throws Exception { + public void testSkewedInputCounters() throws Exception { testInputCounters("skewed"); } - + @Test - public void testSelfJoinInputCounters() throws Exception { + public void testSelfJoinInputCounters() throws Exception { testInputCounters("self-join"); } - + private static boolean multiInputCreated = false; - + private static int count = 0; - - private void testInputCounters(String keyword) throws Exception { + + private void testInputCounters(String keyword) throws Exception { String file1 = "multi-input1.txt"; String file2 = "multi-input2.txt"; - + String output = keyword; - + if (keyword.equals("self-join")) { file2 = file1; keyword = "join"; } - - final int MAX_NUM_RECORDS = 100; + + final int MAX_NUM_RECORDS = 100; if (!multiInputCreated) { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { @@ -637,7 +639,7 @@ public class TestCounters { pw.println(t); } pw.close(); - + PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { int t = r.nextInt(100); @@ -649,8 +651,8 @@ public class TestCounters { pw2.close(); multiInputCreated = true; } - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file1 + "';"); @@ -661,7 +663,7 @@ public class TestCounters { pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';"); } ExecJob job = pigServer.store("c", output + "_output"); - + PigStats stats = job.getStatistics(); assertTrue(stats.isSuccessful()); List<InputStats> inputs = stats.getInputStats(); @@ -680,4 +682,46 @@ public class TestCounters { } } } + + @Test + public void testSplitUnionOutputCounters() throws Exception { + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input")); + for (int i = 0; i < 10; i++) { + pw.println(i); + } + pw.close(); + String query = + "a = load 'splitunion-input';" + + "split a into b if $0 < 5, c otherwise;" + + "d = union b, c;"; + + pigServer.registerQuery(query); + + ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage"); + PigStats stats1 = job.getStatistics(); + + query = + "a = load 'splitunion-input';" + + "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" + + "e = distinct d;" + + "f = union b, c, e;"; + + pigServer.registerQuery(query); + + job = pigServer.store("f", "splitunion-output-1", "PigStorage"); + PigStats stats2 = job.getStatistics(); + + PigStats[] pigStats = new PigStats[]{stats1, stats2}; + for (int i = 0; i < 2; i++) { + PigStats stats = pigStats[i]; + assertTrue(stats.isSuccessful()); + List<OutputStats> outputs = stats.getOutputStats(); + assertEquals(1, outputs.size()); + OutputStats output = outputs.get(0); + assertEquals("splitunion-output-" + i, output.getName()); + assertEquals(10, output.getNumberRecords()); + assertEquals(20, output.getBytes()); + } + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Wed Feb 22 09:43:41 2017 @@ -17,17 +17,36 @@ */ package org.apache.pig.test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -import java.util.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; - - -import org.apache.pig.data.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.PriorityQueue; +import java.util.Random; +import java.util.TreeSet; + +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DefaultDataBag; +import org.apache.pig.data.DefaultTuple; +import org.apache.pig.data.DistinctDataBag; +import org.apache.pig.data.InternalCachedBag; +import org.apache.pig.data.InternalDistinctBag; +import org.apache.pig.data.InternalSortedBag; +import org.apache.pig.data.NonSpillableDataBag; +import org.apache.pig.data.SingleTupleBag; +import org.apache.pig.data.SortedDataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.util.Spillable; import org.junit.After; import org.junit.Test; @@ -36,7 +55,7 @@ import org.junit.Test; /** * This class will exercise the basic Pig data model and members. It tests for proper behavior in * assignment and comparison, as well as function application. - * + * * @author dnm */ public class TestDataBag { @@ -590,7 +609,7 @@ public class TestDataBag { } mgr.forceSpill(); } - + assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size()); // Read tuples back, hopefully they come out in the same order. @@ -719,14 +738,14 @@ public class TestDataBag { @Test public void testDefaultBagFactory() throws Exception { BagFactory f = BagFactory.getInstance(); - + DataBag bag = f.newDefaultBag(); DataBag sorted = f.newSortedBag(null); DataBag distinct = f.newDistinctBag(); assertTrue("Expected a default bag", (bag instanceof DefaultDataBag)); assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag)); - assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag)); + assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag)); } @Test @@ -756,7 +775,7 @@ public class TestDataBag { try { BagFactory f = BagFactory.getInstance(); } catch (RuntimeException re) { - assertEquals("Expected does not extend BagFactory message", + assertEquals("Expected does not extend BagFactory message", "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!", re.getMessage()); caughtIt = true; @@ -775,7 +794,7 @@ public class TestDataBag { BagFactory.resetSelf(); } - + @Test public void testNonSpillableDataBagEquals1() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -789,7 +808,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testNonSpillableDataBagEquals2() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -804,7 +823,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testDefaultDataBagEquals1() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -820,7 +839,7 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - + @Test public void testDefaultDataBagEquals2() throws Exception { String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; @@ -837,35 +856,35 @@ public class TestDataBag { } assertEquals(bg1, bg2); } - - public void testInternalCachedBag() throws Exception { + + public void testInternalCachedBag() throws Exception { // check adding empty tuple DataBag bg0 = new InternalCachedBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 2); - + // check equal of bags DataBag bg1 = new InternalCachedBag(1, 0.5f); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} }; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertFalse(bg1.isSorted()); assertFalse(bg1.isDistinct()); - + tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} }; DataBag bg2 = new InternalCachedBag(1, 0.5f); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + // check bag with data written to disk DataBag bg3 = new InternalCachedBag(1, 0.0f); tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}}; @@ -873,7 +892,7 @@ public class TestDataBag { bg3.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg3); - + // check iterator Iterator<Tuple> iter = bg3.iterator(); DataBag bg4 = new InternalCachedBag(1, 0.0f); @@ -881,7 +900,7 @@ public class TestDataBag { bg4.add(iter.next()); } assertEquals(bg3, bg4); - + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); @@ -894,46 +913,46 @@ public class TestDataBag { assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); assertEquals(bg3, bg5); - - + + bg4.clear(); - assertEquals(bg4.size(), 0); + assertEquals(bg4.size(), 0); } - - public void testInternalSortedBag() throws Exception { - + + public void testInternalSortedBag() throws Exception { + // check adding empty tuple DataBag bg0 = new InternalSortedBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 2); - + // check equal of bags DataBag bg1 = new InternalSortedBag(); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }}; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertTrue(bg1.isSorted()); assertFalse(bg1.isDistinct()); - + tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} }; DataBag bg2 = new InternalSortedBag(); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + Iterator<Tuple> iter = bg1.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + // check bag with data written to disk DataBag bg3 = new InternalSortedBag(1, 0.0f, null); tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}}; @@ -941,17 +960,17 @@ public class TestDataBag { bg3.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg3); - + iter = bg3.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); assertTrue(iter.hasNext()); - + DataBag bg4 = new InternalSortedBag(1, 0.0f, null); bg4.add(iter.next()); bg4.add(iter.next()); @@ -959,21 +978,21 @@ public class TestDataBag { bg4.add(iter.next()); assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); - assertEquals(bg3, bg4); - + assertEquals(bg3, bg4); + // check clear bg3.clear(); assertEquals(bg3.size(), 0); - + // test with all data spill out - DataBag bg5 = new InternalSortedBag(); + DataBag bg5 = new InternalSortedBag(); for(int j=0; j<3; j++) { for (int i = 0; i < tupleContents.length; i++) { bg5.add(Util.createTuple(tupleContents[i])); - } + } bg5.spill(); } - + assertEquals(bg5.size(), 9); iter = bg5.iterator(); for(int i=0; i<3; i++) { @@ -983,21 +1002,21 @@ public class TestDataBag { iter.next().equals(Util.createTuple(new String[] {"c", "d"})); } for(int i=0; i<3; i++) { - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); } - + // test with most data spill out, with some data in memory // and merge of spill files - DataBag bg6 = new InternalSortedBag(); + DataBag bg6 = new InternalSortedBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg6.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg6.spill(); } } - + assertEquals(bg6.size(), 104*3); iter = bg6.iterator(); for(int i=0; i<104; i++) { @@ -1007,55 +1026,55 @@ public class TestDataBag { iter.next().equals(Util.createTuple(new String[] {"c", "d"})); } for(int i=0; i<104; i++) { - iter.next().equals(Util.createTuple(new String[] {"e", "f"})); + iter.next().equals(Util.createTuple(new String[] {"e", "f"})); } - + // check two implementation of sorted bag can compare correctly - DataBag bg7 = new SortedDataBag(null); + DataBag bg7 = new SortedDataBag(null); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg7.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg7.spill(); } } assertEquals(bg6, bg7); } - - public void testInternalDistinctBag() throws Exception { + + public void testInternalDistinctBag() throws Exception { // check adding empty tuple DataBag bg0 = new InternalDistinctBag(); bg0.add(TupleFactory.getInstance().newTuple()); bg0.add(TupleFactory.getInstance().newTuple()); assertEquals(bg0.size(), 1); - + // check equal of bags DataBag bg1 = new InternalDistinctBag(); assertEquals(bg1.size(), 0); - + String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}}; for (int i = 0; i < tupleContents.length; i++) { bg1.add(Util.createTuple(tupleContents[i])); } - + // check size, and isSorted(), isDistinct() assertEquals(bg1.size(), 3); assertFalse(bg1.isSorted()); assertTrue(bg1.isDistinct()); - + tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} }; DataBag bg2 = new InternalDistinctBag(); for (int i = 0; i < tupleContents.length; i++) { bg2.add(Util.createTuple(tupleContents[i])); } assertEquals(bg1, bg2); - + Iterator<Tuple> iter = bg1.iterator(); iter.next().equals(Util.createTuple(new String[] {"a", "b"})); iter.next().equals(Util.createTuple(new String[] {"c", "d"})); iter.next().equals(Util.createTuple(new String[] {"e", "f"})); - + // check bag with data written to disk DataBag bg3 = new InternalDistinctBag(1, 0.0f); tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}}; @@ -1064,13 +1083,13 @@ public class TestDataBag { } assertEquals(bg2, bg3); assertEquals(bg3.size(), 3); - - + + // call iterator methods with irregular order iter = bg3.iterator(); assertTrue(iter.hasNext()); assertTrue(iter.hasNext()); - + DataBag bg4 = new InternalDistinctBag(1, 0.0f); bg4.add(iter.next()); bg4.add(iter.next()); @@ -1078,73 +1097,73 @@ public class TestDataBag { bg4.add(iter.next()); assertFalse(iter.hasNext()); assertFalse(iter.hasNext()); - assertEquals(bg3, bg4); - + assertEquals(bg3, bg4); + // check clear bg3.clear(); assertEquals(bg3.size(), 0); - + // test with all data spill out - DataBag bg5 = new InternalDistinctBag(); + DataBag bg5 = new InternalDistinctBag(); for(int j=0; j<3; j++) { for (int i = 0; i < tupleContents.length; i++) { bg5.add(Util.createTuple(tupleContents[i])); - } + } bg5.spill(); } - + assertEquals(bg5.size(), 3); - - + + // test with most data spill out, with some data in memory // and merge of spill files - DataBag bg6 = new InternalDistinctBag(); + DataBag bg6 = new InternalDistinctBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg6.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg6.spill(); } } - - assertEquals(bg6.size(), 3); - + + assertEquals(bg6.size(), 3); + // check two implementation of sorted bag can compare correctly - DataBag bg7 = new DistinctDataBag(); + DataBag bg7 = new DistinctDataBag(); for(int j=0; j<104; j++) { for (int i = 0; i < tupleContents.length; i++) { bg7.add(Util.createTuple(tupleContents[i])); - } + } if (j != 103) { bg7.spill(); } } assertEquals(bg6, bg7); } - + // See PIG-1231 @Test public void testDataBagIterIdempotent() throws Exception { DataBag bg0 = new DefaultDataBag(); processDataBag(bg0, true); - + DataBag bg1 = new DistinctDataBag(); processDataBag(bg1, true); - + DataBag bg2 = new InternalDistinctBag(); processDataBag(bg2, true); - + DataBag bg3 = new InternalSortedBag(); processDataBag(bg3, true); - + DataBag bg4 = new SortedDataBag(null); processDataBag(bg4, true); - + DataBag bg5 = new InternalCachedBag(0, 0); processDataBag(bg5, false); } - + // See PIG-1285 @Test public void testSerializeSingleTupleBag() throws Exception { @@ -1159,7 +1178,7 @@ public class TestDataBag { dfBag.readFields(dis); assertTrue(dfBag.equals(stBag)); } - + // See PIG-2550 static class MyCustomTuple extends DefaultTuple { private static final long serialVersionUID = 8156382697467819543L; @@ -1184,7 +1203,23 @@ public class TestDataBag { Tuple t2 = iter.next(); assertTrue(t2.equals(t)); } - + + // See PIG-4260 + @Test + public void testSpillArrayBackedList() throws Exception { + Tuple[] tuples = new Tuple[2]; + tuples[0] = TupleFactory.getInstance().newTuple(1); + tuples[0].set(0, "first"); + tuples[1] = TupleFactory.getInstance().newTuple(1); + tuples[1].set(0, "second"); + DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples)); + bag.spill(); + Iterator<Tuple> iter = bag.iterator(); + assertEquals(tuples[0], iter.next()); + assertEquals(tuples[1], iter.next()); + assertFalse(iter.hasNext()); + } + void processDataBag(DataBag bg, boolean doSpill) { Tuple t = TupleFactory.getInstance().newTuple(new Integer(0)); bg.add(t); @@ -1194,7 +1229,7 @@ public class TestDataBag { assertTrue(iter.hasNext()); iter.next(); assertFalse(iter.hasNext()); - assertFalse("hasNext should be idempotent", iter.hasNext()); + assertFalse("hasNext should be idempotent", iter.hasNext()); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Wed Feb 22 09:43:41 2017 @@ -20,6 +20,9 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.math.BigDecimal; +import java.math.MathContext; +import java.math.RoundingMode; import java.util.Map; import java.util.Random; @@ -53,7 +56,7 @@ public class TestDivide { public void testOperator() throws ExecException { // int TRIALS = 10; byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY, - DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, + DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL, DataType.DATETIME, DataType.MAP, DataType.TUPLE }; // Map<Byte,String> map = GenRandomData.genTypeToNameMap(); System.out.println("Testing DIVIDE operator"); @@ -250,6 +253,33 @@ public class TestDivide { assertEquals(null, (Long)resl.result); break; } + case DataType.BIGDECIMAL: { + MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP); + BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc); + BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc); + lt.setValue(inpf1); + rt.setValue(inpf2); + Result resf = op.getNextBigDecimal(); + BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP); + assertEquals(expected, (BigDecimal)resf.result); + + // test with null in lhs + lt.setValue(null); + rt.setValue(inpf2); + resf = op.getNextBigDecimal(); + assertEquals(null, (BigDecimal)resf.result); + // test with null in rhs + lt.setValue(inpf1); + rt.setValue(null); + resf = op.getNextBigDecimal(); + assertEquals(null, (BigDecimal)resf.result); + // test divide by 0 + lt.setValue(inpf1); + rt.setValue(new BigDecimal(0.0f,mc)); + resf = op.getNextBigDecimal(); + assertEquals(null, (BigDecimal)resf.result); + break; + } case DataType.DATETIME: DateTime inpdt1 = new DateTime(r.nextLong()); DateTime inpdt2 = new DateTime(r.nextLong()); Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Wed Feb 22 09:43:41 2017 @@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.io.PrintWriter; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.PigRunner; -import org.apache.pig.tools.pigstats.JobStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; @@ -38,16 +38,15 @@ import org.junit.Test; public class TestEmptyInputDir { - private static MiniCluster cluster; + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); private static final String EMPTY_DIR = "emptydir"; private static final String INPUT_FILE = "input"; private static final String OUTPUT_FILE = "output"; private static final String PIG_FILE = "test.pig"; - + @BeforeClass public static void setUpBeforeClass() throws Exception { - cluster = MiniCluster.buildCluster(); FileSystem fs = cluster.getFileSystem(); if (!fs.mkdirs(new Path(EMPTY_DIR))) { throw new Exception("failed to create empty dir"); @@ -64,7 +63,35 @@ public class TestEmptyInputDir { public static void tearDownAfterClass() throws Exception { cluster.shutDown(); } - + + @Test + public void testGroupBy() throws Exception { + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + EMPTY_DIR + "';"); + w.println("B = group A by $0;"); + w.println("store B into '" + OUTPUT_FILE + "';"); + w.close(); + + try { + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + PigStats stats = PigRunner.run(args, null); + + assertTrue(stats.isSuccessful()); + + // This assert fails on 205 due to MAPREDUCE-3606 + if (Util.isMapredExecType(cluster.getExecType()) + && !Util.isHadoop205() && !Util.isHadoop1_x()) { + MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); + assertEquals(0, js.getNumberMaps()); + } + + assertEmptyOutputFile(); + } finally { + new File(PIG_FILE).delete(); + Util.deleteFile(cluster, OUTPUT_FILE); + } + } + @Test public void testSkewedJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -73,31 +100,28 @@ public class TestEmptyInputDir { w.println("C = join B by $0, A by $0 using 'skewed';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - + assertTrue(stats.isSuccessful()); - // the sampler job has zero maps - MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); - + // This assert fails on 205 due to MAPREDUCE-3606 - if (!Util.isHadoop205()&&!Util.isHadoop1_x()) - assertEquals(0, js.getNumberMaps()); - - FileSystem fs = cluster.getFileSystem(); - FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); - assertTrue(status.isDir()); - assertEquals(0, status.getLen()); - // output directory isn't empty - assertTrue(fs.listStatus(status.getPath()).length > 0); + if (Util.isMapredExecType(cluster.getExecType()) + && !Util.isHadoop205() && !Util.isHadoop1_x()) { + // the sampler job has zero maps + MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); + assertEquals(0, js.getNumberMaps()); + } + + assertEmptyOutputFile(); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testMergeJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -106,32 +130,28 @@ public class TestEmptyInputDir { w.println("C = join A by $0, B by $0 using 'merge';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - // the indexer job has zero maps - MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); - + + assertTrue(stats.isSuccessful()); + // This assert fails on 205 due to MAPREDUCE-3606 - if (!Util.isHadoop205()&&!Util.isHadoop1_x()) - assertEquals(0, js.getNumberMaps()); - - FileSystem fs = cluster.getFileSystem(); - FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); - assertTrue(status.isDir()); - assertEquals(0, status.getLen()); - - // output directory isn't empty - assertTrue(fs.listStatus(status.getPath()).length > 0); + if (Util.isMapredExecType(cluster.getExecType()) + && !Util.isHadoop205() && !Util.isHadoop1_x()) { + // the indexer job has zero maps + MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); + assertEquals(0, js.getNumberMaps()); + } + + assertEmptyOutputFile(); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testFRJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -140,55 +160,44 @@ public class TestEmptyInputDir { w.println("C = join A by $0, B by $0 using 'repl';"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - // the indexer job has zero maps - MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0); - + + assertTrue(stats.isSuccessful()); + // This assert fails on 205 due to MAPREDUCE-3606 - if (!Util.isHadoop205()&&!Util.isHadoop1_x()) - assertEquals(0, js.getNumberMaps()); - - FileSystem fs = cluster.getFileSystem(); - FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); - assertTrue(status.isDir()); - assertEquals(0, status.getLen()); - - // output directory isn't empty - assertTrue(fs.listStatus(status.getPath()).length > 0); + if (Util.isMapredExecType(cluster.getExecType()) + && !Util.isHadoop205() && !Util.isHadoop1_x()) { + MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0); + assertEquals(0, js.getNumberMaps()); + } + + assertEmptyOutputFile(); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testRegularJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("A = load '" + INPUT_FILE + "';"); w.println("B = load '" + EMPTY_DIR + "';"); - w.println("C = join B by $0, A by $0;"); + w.println("C = join B by $0, A by $0 PARALLEL 0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - - FileSystem fs = cluster.getFileSystem(); - FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); - assertTrue(status.isDir()); - assertEquals(0, status.getLen()); - - // output directory isn't empty - assertTrue(fs.listStatus(status.getPath()).length > 0); - + + assertTrue(stats.isSuccessful()); + + assertEmptyOutputFile(); + } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); @@ -203,19 +212,19 @@ public class TestEmptyInputDir { w.println("C = join B by $0 right outer, A by $0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(2, stats.getNumberRecords(OUTPUT_FILE)); + + assertTrue(stats.isSuccessful()); + assertEquals(2, stats.getNumberRecords(OUTPUT_FILE)); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); } } - + @Test public void testLeftOuterJoin() throws Exception { PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); @@ -224,16 +233,88 @@ public class TestEmptyInputDir { w.println("C = join B by $0 left outer, A by $0;"); w.println("store C into '" + OUTPUT_FILE + "';"); w.close(); - + + try { + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + PigStats stats = PigRunner.run(args, null); + + assertTrue(stats.isSuccessful()); + assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); + } finally { + new File(PIG_FILE).delete(); + Util.deleteFile(cluster, OUTPUT_FILE); + } + } + + @Test + public void testBloomJoin() throws Exception { + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + INPUT_FILE + "' as (x:int);"); + w.println("B = load '" + EMPTY_DIR + "' as (x:int);"); + w.println("C = join B by $0, A by $0 using 'bloom';"); + w.println("D = join A by $0, B by $0 using 'bloom';"); + w.println("store C into '" + OUTPUT_FILE + "';"); + w.println("store D into 'output1';"); + w.close(); + + try { + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; + PigStats stats = PigRunner.run(args, null); + + assertTrue(stats.isSuccessful()); + assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); + assertEquals(0, stats.getNumberRecords("output1")); + assertEmptyOutputFile(); + } finally { + new File(PIG_FILE).delete(); + Util.deleteFile(cluster, OUTPUT_FILE); + Util.deleteFile(cluster, "output1"); + } + } + + @Test + public void testBloomJoinOuter() throws Exception { + PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); + w.println("A = load '" + INPUT_FILE + "' as (x:int);"); + w.println("B = load '" + EMPTY_DIR + "' as (x:int);"); + w.println("C = join B by $0 left outer, A by $0 using 'bloom';"); + w.println("D = join A by $0 left outer, B by $0 using 'bloom';"); + w.println("E = join B by $0 right outer, A by $0 using 'bloom';"); + w.println("F = join A by $0 right outer, B by $0 using 'bloom';"); + w.println("store C into '" + OUTPUT_FILE + "';"); + w.println("store D into 'output1';"); + w.println("store E into 'output2';"); + w.println("store F into 'output3';"); + w.close(); + try { - String[] args = { PIG_FILE }; + String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, }; PigStats stats = PigRunner.run(args, null); - - assertTrue(stats.isSuccessful()); - assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); + + assertTrue(stats.isSuccessful()); + assertEquals(0, stats.getNumberRecords(OUTPUT_FILE)); + assertEquals(2, stats.getNumberRecords("output1")); + assertEquals(2, stats.getNumberRecords("output2")); + assertEquals(0, stats.getNumberRecords("output3")); + assertEmptyOutputFile(); } finally { new File(PIG_FILE).delete(); Util.deleteFile(cluster, OUTPUT_FILE); + Util.deleteFile(cluster, "output1"); + Util.deleteFile(cluster, "output2"); + Util.deleteFile(cluster, "output3"); } } + + private void assertEmptyOutputFile() throws IllegalArgumentException, IOException { + FileSystem fs = cluster.getFileSystem(); + FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE)); + assertTrue(status.isDir()); + assertEquals(0, status.getLen()); + // output directory isn't empty. Has one empty file + FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter()); + assertEquals(1, files.length); + assertEquals(0, files[0].getLen()); + assertTrue(files[0].getPath().getName().startsWith("part-")); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Wed Feb 22 09:43:41 2017 @@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc private void updatePigProperties(boolean allowErrors, long minErrors, double errorThreshold) { Properties properties = pigServer.getPigContext().getProperties(); - properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS, + properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, Boolean.toString(allowErrors)); - properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS, + properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS, Long.toString(minErrors)); - properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, + properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, Double.toString(errorThreshold)); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Wed Feb 22 09:43:41 2017 @@ -291,7 +291,7 @@ public class TestEvalPipeline { myMap.put("long", new Long(1)); myMap.put("float", new Float(1.0)); myMap.put("double", new Double(1.0)); - myMap.put("dba", new DataByteArray(new String("bytes").getBytes())); + myMap.put("dba", new DataByteArray(new String("1234").getBytes())); myMap.put("map", mapInMap); myMap.put("tuple", tuple); myMap.put("bag", bag); @@ -794,32 +794,31 @@ public class TestEvalPipeline { } @Test - public void testMapUDFfail() throws Exception{ + public void testMapUDFWithImplicitTypeCast() throws Exception{ int LOOP_COUNT = 2; File tmpFile = Util.createTempFileDelOnExit("test", "txt"); PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); for(int i = 0; i < LOOP_COUNT; i++) { - for(int j=0;j<LOOP_COUNT;j+=2){ - ps.println(i+"\t"+j); - ps.println(i+"\t"+j); - } + ps.println(i); } ps.close(); pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile.toString(), pigContext) + "';"); pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter - String query = "C = foreach B {" - + "generate mymap#'dba' * 10;" - + "};"; + String query = "C = foreach B generate mymap#'dba' * 10; "; pigServer.registerQuery(query); - try { - pigServer.openIterator("C"); - Assert.fail("Error expected."); - } catch (Exception e) { - e.getMessage().contains("Cannot determine"); + + Iterator<Tuple> iter = pigServer.openIterator("C"); + if(!iter.hasNext()) Assert.fail("No output found"); + int numIdentity = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + Assert.assertEquals(new Integer(12340), (Integer)t.get(0)); + ++numIdentity; } + Assert.assertEquals(LOOP_COUNT, numIdentity); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Wed Feb 22 09:43:41 2017 @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; @@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ import org.junit.Test; public class TestFindQuantiles { - + private static TupleFactory tFact = TupleFactory.getInstance(); private static final float epsilon = 0.0001f; - + @Test public void testFindQuantiles() throws Exception { final int numSamples = 97778; @@ -50,7 +49,7 @@ public class TestFindQuantiles { System.out.println("sum: " + sum); assertTrue(sum > (1-epsilon) && sum < (1+epsilon)); } - + @Test public void testFindQuantiles2() throws Exception { final int numSamples = 30000; @@ -86,7 +85,7 @@ public class TestFindQuantiles { } private float[] getProbVec(Tuple values) throws Exception { - float[] probVec = new float[values.size()]; + float[] probVec = new float[values.size()]; for(int i = 0; i < values.size(); i++) { probVec[i] = (Float)values.get(i); } @@ -95,7 +94,7 @@ public class TestFindQuantiles { private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception { Random rand = new Random(1000); - List<Tuple> samples = new ArrayList<Tuple>(); + List<Tuple> samples = new ArrayList<Tuple>(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, rand.nextInt(max)); @@ -106,7 +105,7 @@ public class TestFindQuantiles { } private DataBag generateUniqueSamples(int numSamples) throws Exception { - DataBag samples = BagFactory.getInstance().newDefaultBag(); + DataBag samples = BagFactory.getInstance().newDefaultBag(); for (int i=0; i<numSamples; i++) { Tuple t = tFact.newTuple(1); t.set(0, new Integer(23)); @@ -121,9 +120,9 @@ public class TestFindQuantiles { in.set(0, new Integer(numReduceres)); in.set(1, samples); - + FindQuantiles fq = new FindQuantiles(); - + Map<String, Object> res = fq.exec(in); return res; } @@ -135,12 +134,11 @@ public class TestFindQuantiles { InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS); Iterator<Object> it = weightedPartsData.values().iterator(); float[] probVec = getProbVec((Tuple)it.next()); - new DiscreteProbabilitySampleGenerator(probVec); float sum = 0.0f; for (float f : probVec) { sum += f; } return sum; } - + } Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,7 @@ import java.util.List; import java.util.Random; import org.apache.pig.PigServer; +import org.apache.pig.builtin.mock.Storage; import org.apache.pig.data.Tuple; import org.apache.pig.test.utils.TestHelper; import org.junit.Test; @@ -105,6 +106,31 @@ public class TestForEachNestedPlanLocal } @Test + public void testNestedCrossTwoRelationsLimit() throws Exception { + Storage.Data data = Storage.resetData(pig); + data.set("input", + Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))), + Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))), + Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2)))); + + pig.setBatchOn(); + pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});"); + pig.registerQuery("B = foreach A {" + + "crossed = cross bag1, bag2;" + + "filtered = filter crossed by f1 == f3;" + + "lmt = limit filtered 1;" + + "generate FLATTEN(lmt);" + "}"); + pig.registerQuery("store B into 'output' using mock.Storage();"); + + pig.executeBatch(); + + List<Tuple> actualResults = data.get("output"); + List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings( + new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"}); + Util.checkQueryOutputs(actualResults.iterator(), expectedResults); + } + + @Test public void testNestedCrossTwoRelationsComplex() throws Exception { File[] tmpFiles = generateDataSetFilesForNestedCross(); List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] { Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Wed Feb 22 09:43:41 2017 @@ -20,6 +20,7 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; @@ -50,6 +51,7 @@ public class TestGFCross { public void testSerial() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1"); + cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); @@ -66,6 +68,7 @@ public class TestGFCross { public void testParallelSet() throws Exception { Configuration cfg = new Configuration(); cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10"); + cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000"); UDFContext.getUDFContext().addJobConf(cfg); Tuple t = TupleFactory.getInstance().newTuple(2); Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Feb 22 09:43:41 2017 @@ -28,6 +28,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.FileWriter; +import java.io.FilenameFilter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; @@ -970,7 +971,6 @@ public class TestGrunt { @Test public void testStopOnFailure() throws Throwable { - Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType())); PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties()); PigContext context = server.getPigContext(); context.getProperties().setProperty("stop.on.failure", ""+true); @@ -1569,4 +1569,20 @@ public class TestGrunt { } assertTrue(found); } + + @Test + public void testGruntUtf8() throws Throwable { + String command = "mkdir æµè¯\n" + + "quit\n"; + System.setProperty("jline.WindowsTerminal.directConsole", "false"); + System.setIn(new ByteArrayInputStream(command.getBytes())); + org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null); + File[] partFiles = new File(".").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + return name.equals("æµè¯"); + } + }); + assertEquals(partFiles.length, 1); + new File("æµè¯").delete(); + } }
