Modified: pig/branches/spark/test/e2e/pig/tests/turing_jython.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/turing_jython.conf?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/turing_jython.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/turing_jython.conf Fri Feb 24 08:19:42 2017 @@ -50,7 +50,7 @@ d = filter b by age < 50; e = cogroup c by (name, age), d by (name, age) ; f = foreach e generate flatten(c), flatten(d); g = group f by registration; -h = foreach g generate group, SUM(f.d::contributions); +h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0; i = order h by $1; store i into '$out'; """).bind({'in1':input1,'in2':input2, 'out':output}).runSingle() @@ -68,7 +68,7 @@ else: e = cogroup c by (name, age), d by (name, age) ; f = foreach e generate flatten(c), flatten(d); g = group f by registration; - h = foreach g generate group, SUM(f.d::contributions); + h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0; i = order h by $1; store i into ':OUTPATH:'; \, @@ -92,38 +92,12 @@ hdfs = FileSystem.get(config) } ] - }, - { - 'name' => 'Jython_Embedded', - 'tests' => [ - { - 'num' => 1, - ,'pig' => q\#!/usr/bin/python -# JYTHON COMMENT -from org.apache.pig.scripting import Pig - -P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -store A into ':OUTPATH:';""") - -Q = P.bind() - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\, - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:';\, - } - ] }, { 'name' => 'Jython_CompileBindRun' ,'tests' => [ - { # bind() with no parameters, runSingle + { + # bind no parameters, runSingle 'num' => 1 ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT @@ -150,7 +124,7 @@ else: ,'delimiter' => ' ' },{ -# 9.2 1 bind single input parameter and no output parameters + # bind single input parameter 'num' => 2 ,'pig' => q\#!/usr/bin/python @@ -179,7 +153,7 @@ else: # ,'expected_out_regex' => "Pig job PASSED" },{ -# bind parallel execution with a multiple entries + # bind parallel execution with a multiple entries 'num' => 3 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -231,9 +205,7 @@ for i in [0, 1, 2]: \, },{ -# 8.6 compile pig script file with no input and no output parameters -#12.2 import python modules -# + # compile pig script file with no parameters 'num' => 4 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -243,6 +215,7 @@ pig_script = ":TMP:/script.pig" pigfile = open( pig_script, 'w+') pigfile.write(""" A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +-- a comment store A into ':OUTPATH:'; """) pigfile.close() @@ -263,7 +236,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ -# 8.7 compile pig script file with no input and with output parameters + # compile pig script file with parameters 'num' => 5 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -300,7 +273,7 @@ else: ,'delimiter' => ' ' },{ - # 11.15 1 results.getResults(alias) for null results + # results.getResults(alias) for null results 'num' => 6 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -318,7 +291,7 @@ result = P.bind().runSingle() store EMPTY into ':OUTPATH:';\ }, { - # bind reading from python context + # bind parameters from python context 'num' => 7 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -340,7 +313,7 @@ result = P.bind().runSingle() store B into ':OUTPATH:';\ },{ - # bind multiple times + # bind multiple times 'num' => 8 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -367,56 +340,8 @@ for i in [1,2,3]: B= foreach A generate age + 3; store B into ':OUTPATH:.3';\, - }, - { - # invoke .run() on a non-parallel pig script - 'num' => 9 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -P = Pig.compile(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -store A into ':OUTPATH:'; -""") -result = P.bind().run() -\, - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:';\, - }, - { -# 8.6 compile pig script file with no input and no output parameters -#12.2 import python modules -# - 'num' => 10 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script -pig_script = ":TMP:/script.pig" -pigfile = open( pig_script, 'w+') -pigfile.write(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); --- a comment -store A into ':OUTPATH:'; -""") -pigfile.close() - -#execute pig script - -result = Pig.compileFromFile( pig_script ).bind().runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" -else: - raise "Pig job FAILED" -\, - - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:'; -\ - ,'floatpostprocess' => 1 - ,'delimiter' => ' ' },{ + # python script with parameters 'num' => 11 ,'pig_params' => ['-p', qq(loadfile='studenttab10k')], ,'pig' => q\#!/usr/bin/python @@ -441,6 +366,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ + # python script with parameter file 'num' => 12 ,'pig_params' => ['-m', ":PARAMPATH:/params_3"], ,'pig' => q\#!/usr/bin/python @@ -465,6 +391,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ + # python script with command line arguments 'num' => 13 ,'additional_cmd_args' => ['studenttab10k'] ,'pig' => q\#!/usr/bin/python @@ -495,7 +422,7 @@ else: 'name' => 'Jython_Diagnostics' ,'tests' => [ { -# 11.23 1 explain() on a complex query + # explain() on a complex query 'num' => 1 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -525,7 +452,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 }, { -#11.22 1 illustrate() on a complex query + # illustrate() on a complex query 'num' => 2 ,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez ,'pig' => q\#!/usr/bin/python @@ -555,7 +482,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 ,'expected_out_regex' => "A.*name:bytearray.*age:bytearray.*gpa:bytearray" }, { -# 11.24 1 describe() on an alias + # describe() on an alias 'num' => 3 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -583,7 +510,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 ,'expected_out_regex' => "A:.*{name:.*bytearray,age:.*bytearray,gpa:.*bytearray}" }, { -#11.29 1 describe() on an undefined alias + # describe() on an undefined alias 'num' => 4 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -613,7 +540,7 @@ result = P.bind({'in1':input1, 'in2':inp }, { -# 11.27 1 illustrate(alias) + # illustrate(alias) 'num' => 5 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -643,7 +570,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'expected_err_regex' => "ERROR 1121" }, { -# 11.28 1 explain(alias) + # explain(alias) 'num' => 6 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -710,14 +637,10 @@ Pig.fs("-copyFromLocal :TMP:/iterator_ou }, ] }, { -# 12.2 import python modules -# 12.1 python comments -# 12.6 fs lists a file - - 'name' => 'Jython_Misc' ,'tests' => [ { + # fs commands: lists a file 'num' => 1 ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT @@ -778,8 +701,8 @@ P.bind().runSingle() 'name' => 'Jython_Properties', 'tests' => [ { + # check if property is passed to Pig 'num' => 1 - ,'ignore' => 1 # This is a good test except that we can't verify it. ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT from org.apache.pig.scripting import Pig @@ -791,7 +714,7 @@ store A into ':OUTPATH:';""") Q = P.bind() prop = Properties() -prop.put("mapred.job.name", "friendship") +prop.put("pig.default.load.func", "wrong") result = Q.runSingle(prop) if result.isSuccessful(): @@ -799,10 +722,8 @@ if result.isSuccessful(): else: raise "Pig job FAILED" \ - - ,'sql' => "select name, age, gpa+0.00 from studenttab10k;" - ,'floatpostprocess' => 1 - ,'delimiter' => ' ' + ,'rc'=> 6 + ,'expected_err_regex' => "ERROR 1070: Could not resolve wrong using imports" } ] @@ -811,7 +732,7 @@ else: 'name' => 'Jython_Error', 'tests' => [ { - # run a script that returns single negative result + # run a script that returns single negative result 'num' => 1 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -834,103 +755,18 @@ else: ,'rc' => 6 ,'expected_err_regex' => "ERROR 1121" - }, - { - # run a script that returns single negative result - 'num' => 2 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -input= ":INPATH:/singlefile/studenttab10k" -output = ":OUTPATH:" - -P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""") - -Q = P.bind({'in':input, 'out':bad_output}) - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'rc' => 6 - ,'expected_err_regex' => "name 'bad_output' is not defined" },{ - # bind an undefined input parameter - 'num' => 3 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -input= ":INPATH:/singlefile/studenttab10k" -output = ":OUTPATH:" - -P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""") - -Q = P.bind({'in':invalid_parameter, 'out':output}) - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'expected_err_regex' => "ERROR 1121" - ,'rc'=> 6 - - }, - { - # compileFromFile for pig script file that does not exist throws IOException + # compileFromFile for pig script file that does not exist throws IOException 'num' => 4 ,'pig' => q\#!/usr/bin/python +import os from org.apache.pig.scripting import Pig # intentionally don't create pig script -pig_script = tmp_dir + "/script.pig" - -#execute pig script -input1= ":INPATH:/singlefile/studenttab10k" -input2= ":INPATH:/singlefile/votertab10k" -output1= ":OUTPATH:.1" -output2= ":OUTPATH:.2" - -result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'expected_err_regex' => "ERROR 1121" - ,'rc'=> 6 - }, - { - # compileFromFile for pig script file that does not have read permissions throws IOException - 'num' => 5 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script - pig_script = ":TMP:/script.pig" -pigfile = open( pig_script, 'w') -#no read permissions and file is left open until afer compile statement -pigfile.write(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -B = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); -store A into '$out1'; -store B into '$out2'; -""") -pigfile.close() + +os.remove(pig_script) #execute pig script input1= ":INPATH:/singlefile/studenttab10k" @@ -938,11 +774,9 @@ input2= ":INPATH:/singlefile/votertab10k output1= ":OUTPATH:.1" output2= ":OUTPATH:.2" -result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() - -pigfile.close() +results = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() -if result.isSuccessful(): +if results[0].isSuccessful(): print "Pig job PASSED" else: @@ -977,13 +811,16 @@ else: ,'expected_err_regex' => "ERROR 1121" }, { - # 11.10 iter.next for an alias that is undefined + # iter.next for an alias that is undefined 'num' => 7 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig #create pig script +out1= ":OUTPATH:.1" +out2= ":OUTPATH:.2" + P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B= filter A by age < 50; store B into '$out1'; @@ -992,9 +829,10 @@ D = filter C by name matches '^fred*'; store D into '$out2'; """) -result = P.bind().run() +results = P.bind().run() +iter = results[0].result("E").iterator() -if result.isSuccessful(): +if results[0].isSuccessful(): print "Pig job PASSED" else: @@ -1010,30 +848,6 @@ else: 'tests' => [ { # sql command - 'num' => 1 - ,'java_params' => ['-Dhcat.bin=:HCATBIN:'] - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script - -Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""") -ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string, -age int, -gpa double) -stored as textfile; -""") - -if ret==0: - print "SQL command PASSED" - -else: - raise "SQL command FAILED" -\ - ,'rc' => 0 - }, - { - # sql command 'num' => 2 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig
Modified: pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl (original) +++ pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl Fri Feb 24 08:19:42 2017 @@ -41,7 +41,6 @@ our @lastName = ("allen", "brown", "cars # rankaacd: RANK BY a ASC , c DESC # rankaaba: RANK BY a ASC , b ASC # a,b,c: values -# tail: long value in order to create multiple mappers ############################################################################ our @rankedTuples = ( "1,21,5,7,1,1,0,8,8","2,26,2,3,2,5,1,9,10","3,30,24,21,2,3,1,3,10","4,6,10,8,3,4,1,7,2", @@ -501,22 +500,10 @@ sub getBulkCopyCmd(){ my $randf = rand(10); printf HDFS "%d:%d:%d:%d:%d:%dL:%.2ff:%.2f\n", $tid, $i, $rand5, $rand100, $rand1000, $rand1000, $randf, $randf; } - } elsif ($filetype eq "ranking") { + } elsif ($filetype eq "ranking") { for (my $i = 0; $i < $numRows; $i++) { my $tuple = $rankedTuples[int($i)]; - printf HDFS "$tuple,"; - for my $j ( 0 .. 1000000) { - printf HDFS "%d",$j; - } - printf HDFS "\n"; - } - } elsif ($filetype eq "biggish") { - for (my $i = 1; $i < $numRows; $i++) { - printf HDFS "$i,$i,"; - for my $j ( 0 .. 1000) { - printf HDFS "%d",$j; - } - printf HDFS "\n"; + printf HDFS "$tuple\n"; } } elsif ($filetype eq "utf8Student") { srand(3.14159 + $numRows); Modified: pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original) +++ pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Fri Feb 24 08:19:42 2017 @@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle // result, the number of StoreFunc instances is greater by 1 in // Hadoop-2.0.x. assertTrue("storer instanciation count increasing: " + Storer.count, - Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4)); + Storer.count <= 5); } } Modified: pig/branches/spark/test/org/apache/pig/TestMain.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestMain.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/TestMain.java (original) +++ pig/branches/spark/test/org/apache/pig/TestMain.java Fri Feb 24 08:19:42 2017 @@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru import static org.junit.Assert.fail; import java.io.BufferedWriter; +import java.io.BufferedReader; import java.io.File; import java.io.FileWriter; +import java.io.FileReader; import java.io.IOException; import java.util.Properties; @@ -35,6 +37,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.parser.ParserException; import org.apache.pig.parser.SourceLocation; import org.apache.pig.test.TestPigRunner.TestNotificationListener; +import org.apache.pig.test.Util; import org.apache.pig.tools.parameters.ParameterSubstitutionException; import org.apache.pig.tools.pigstats.PigStats; import org.junit.Test; @@ -152,6 +155,35 @@ public class TestMain { } + @Test + public void testParseInputScript() throws Exception { + File input = Util.createInputFile("tmp", "", + new String[]{"{(1,1.0)}\ttestinputstring1", + "{(2,2.0)}\ttestinputstring1", + "{(3,3.0)}\ttestinputstring1", + "{(4,4.0)}\ttestinputstring1"} + ); + File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut"); + File scriptFile = Util.createInputFile("pigScript", "", + new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);", + "B = foreach A generate\n" + + " b,\n" + + " (bag{tuple(long)}) a.x as ax:{(x:long)};", + "store B into '"+out.getAbsolutePath()+"';"} + ); + + Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null); + BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000"))); + String line; + int count = 0; + while(( line = file.readLine()) != null) { + count++; + } + assertEquals(4,count); + Util.deleteDirectory(new File(out.getAbsolutePath())); + assertTrue(!new File(out.getAbsolutePath()).exists()); + } + public static class TestNotificationListener2 extends TestNotificationListener { protected boolean hadArgs = false; public TestNotificationListener2() {} Modified: pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java Fri Feb 24 08:19:42 2017 @@ -709,6 +709,19 @@ public class TestAvroStorage { } @Test + public void testGroupWithRepeatedSubRecords() throws Exception { + final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro"; + final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro"; + testAvroStorage(true, basedir + "code/pig/group_test.pig", + ImmutableMap.of( + "INFILE", input, + "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc", + "OUTFILE", createOutputName()) + ); + verifyResults(createOutputName(),check); + } + + @Test public void testLoadDirectory() throws Exception { final String input = basedir + "data/avro/uncompressed/testdirectory"; final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro"; Modified: pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java Fri Feb 24 08:19:42 2017 @@ -195,7 +195,7 @@ public class TestOrcStorage { Reader reader = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT1))); assertEquals(reader.getNumberOfRows(), 2); - RecordReader rows = reader.rows(null); + RecordReader rows = reader.rows(); Object row = rows.next(null); StructObjectInspector soi = (StructObjectInspector)reader.getObjectInspector(); IntWritable intWritable = (IntWritable)soi.getStructFieldData(row, @@ -291,7 +291,7 @@ public class TestOrcStorage { ObjectInspector oi = orcReader.getObjectInspector(); StructObjectInspector soi = (StructObjectInspector) oi; - RecordReader reader = orcReader.rows(null); + RecordReader reader = orcReader.rows(); Object row = null; while (reader.hasNext()) { @@ -326,9 +326,9 @@ public class TestOrcStorage { Reader orcReaderActual = OrcFile.createReader(fs, orcFile); StructObjectInspector soiActual = (StructObjectInspector) orcReaderActual.getObjectInspector(); - RecordReader readerExpected = orcReaderExpected.rows(null); + RecordReader readerExpected = orcReaderExpected.rows(); Object expectedRow = null; - RecordReader readerActual = orcReaderActual.rows(null); + RecordReader readerActual = orcReaderActual.rows(); Object actualRow = null; while (readerExpected.hasNext()) { Added: pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig (added) +++ pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig Fri Feb 24 08:19:42 2017 @@ -0,0 +1,5 @@ +in = LOAD '$INFILE' USING AvroStorage(); +grouped = GROUP in BY (value1.thing); +flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int)); +RMF $OUTFILE; +STORE flattened INTO '$OUTFILE' USING AvroStorage(); Modified: pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java (original) +++ pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java Fri Feb 24 08:19:42 2017 @@ -17,9 +17,9 @@ */ package org.apache.pig.data; -import static junit.framework.Assert.assertEquals; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -599,33 +599,33 @@ public class TestSchemaTuple { Data data = resetData(pigServer); data.set("foo1", - tuple(0), - tuple(1), - tuple(2), - tuple(3), - tuple(4), - tuple(5), - tuple(6), - tuple(7), - tuple(8), - tuple(9) + tuple(0, 0), + tuple(1, 1), + tuple(2, 2), + tuple(3, 3), + tuple(4, 4), + tuple(5, 5), + tuple(6, 6), + tuple(7, 7), + tuple(8, 8), + tuple(9, 9) ); data.set("foo2", - tuple(0), - tuple(1), - tuple(2), - tuple(3), - tuple(4), - tuple(5), - tuple(6), - tuple(7), - tuple(8), - tuple(9) + tuple(0, 0), + tuple(1, 1), + tuple(2, 2), + tuple(3, 3), + tuple(4, 4), + tuple(5, 5), + tuple(6, 6), + tuple(7, 7), + tuple(8, 8), + tuple(9, 9) ); - pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);"); - pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);"); + pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int, y:int);"); + pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int, y:int);"); if (preSort) { pigServer.registerQuery("A = ORDER A BY x ASC;"); pigServer.registerQuery("B = ORDER B BY x ASC;"); @@ -638,20 +638,24 @@ public class TestSchemaTuple { if (!out.hasNext()) { throw new Exception("Output should have had more elements! Failed on element: " + i); } - assertEquals(tuple(i, i), out.next()); + assertEquals(tuple(i, i, i, i), out.next()); } assertFalse(out.hasNext()); - pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();"); + pigServer.registerQuery("STORE D INTO 'bar1' USING mock.Storage();"); + pigServer.registerQuery("E = JOIN A by (x, y), B by (x, y) using '"+joinType+"';"); + pigServer.registerQuery("F = ORDER E BY $0 ASC;"); + pigServer.registerQuery("STORE F INTO 'bar2' USING mock.Storage();"); - List<Tuple> tuples = data.get("bar"); + List<Tuple> bar1 = data.get("bar1"); + List<Tuple> bar2 = data.get("bar2"); - if (tuples.size() != 10) { - throw new Exception("Output does not have enough elements! List: " + tuples); - } + assertEquals("Output does not have enough elements! List: " + bar1, 10, bar1.size()); + assertEquals("Output does not have enough elements! List: " + bar2, 10, bar2.size()); for (int i = 0; i < 10; i++) { - assertEquals(tuple(i, i), tuples.get(i)); + assertEquals(tuple(i, i, i, i), bar1.get(i)); + assertEquals(tuple(i, i, i, i), bar2.get(i)); } } Added: pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java (added) +++ pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniGenericCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.Lists; + +import static org.apache.pig.builtin.mock.Storage.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHiveUDTF { + private static PigServer pigServer = null; + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + + @BeforeClass + public static void oneTimeSetup() throws ExecException { + pigServer = new PigServer(ExecType.LOCAL); + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + cluster.shutDown(); + } + + @Test + public void testHiveUDTFOnBagInput() throws IOException { + Data data = resetData(pigServer); + + Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c"))); + + data.set("TestHiveUDTF", tuple); + + pigServer.registerQuery("define posexplode HiveUDTF('posexplode');"); + pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});"); + pigServer.registerQuery("B = foreach A generate posexplode(a0);"); + + Iterator<Tuple> result = pigServer.openIterator("B"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertTrue("Result doesn't contain the HiveUDTF output", + out.contains(tuple(bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c"))))); + assertTrue("Result doesn't contain an empty bag", + out.contains(tuple(bag()))); + } + + @Test + public void testHiveUDTFOnBagInputWithTwoProjection() throws IOException { + Data data = resetData(pigServer); + + Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c"))); + + data.set("TestHiveUDTF", tuple); + + pigServer.registerQuery("define posexplode HiveUDTF('posexplode');"); + pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});"); + pigServer.registerQuery("B = foreach A generate a0, posexplode(a0);"); + + Iterator<Tuple> result = pigServer.openIterator("B"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertTrue("Result doesn't contain the HiveUDTF output", + out.contains(tuple(bag(tuple("a"), tuple("b"), tuple("c")), bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c"))))); + assertTrue("Result doesn't contain an empty bag", + out.contains(tuple(null, bag()))); + } + + @Test + public void testHiveUDTFOnClose() throws IOException { + Data data = resetData(pigServer); + + List<Tuple> tuples = Arrays.asList(tuple("a", 1), tuple("a", 2), tuple("a", 3)); + + data.set("TestHiveUDTF", tuples); + + pigServer.registerQuery("define COUNT2 HiveUDTF('org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount2');"); + pigServer.registerQuery("a = load 'TestHiveUDTF' USING mock.Storage() as (name:chararray, id:int);"); + pigServer.registerQuery("b = foreach a generate flatten(COUNT2(name));"); + + Iterator<Tuple> result = pigServer.openIterator("b"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertEquals(tuple(3), out.get(0)); + assertEquals(tuple(3), out.get(1)); + } + +} Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java (original) +++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java Fri Feb 24 08:19:42 2017 @@ -652,4 +652,14 @@ public class TestQueryParser { public void testSplit2() throws Exception { shouldPass("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';"); } + + @Test + public void testBigDecimalParsing() throws Exception { + shouldPass("B = FILTER A BY $1 < 1234567890.123456789BD;"); + } + + @Test + public void testBigIntegerParsing() throws Exception { + shouldPass("B = FILTER A BY $1 < 1234567890123456789BI;"); + } } Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java (original) +++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java Fri Feb 24 08:19:42 2017 @@ -19,10 +19,20 @@ package org.apache.pig.parser; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.pig.ExecType; +import org.apache.pig.LoadFunc; +import org.apache.pig.NonFSLoadFunc; +import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.test.Util; import org.junit.Test; @@ -72,43 +82,76 @@ public class TestQueryParserUtils { QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc); assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) { - // webhdfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - // har with webhfs - QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //har with viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + // webhdfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + // har with webhfs + QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + //viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - } + //har with viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + } + @Test + public void testNonFSLoadFunc() throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); + pigServer.registerQuery("A = load 'hbase://query/SELECT ID, NAME, DATE FROM HIRES WHERE DATE > TO_DATE(\"1990-12-21 05:55:00.000\")' using org.apache.pig.parser.TestQueryParserUtils$DummyNonFSLoader();"); + pigServer.shutdown(); } + /** + * Test class for testNonFSLoadFuncNoSetHdfsServersCall test case + */ + public static class DummyNonFSLoader extends LoadFunc implements NonFSLoadFunc { + + @Override + public void setLocation(String location, Job job) throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public InputFormat getInputFormat() throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public Tuple getNext() throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + return location; + } + } } Added: pig/branches/spark/test/org/apache/pig/test/MiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/MiniCluster.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/MiniCluster.java (added) +++ pig/branches/spark/test/org/apache/pig/test/MiniCluster.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; + +/** + * This class builds a single instance of itself with the Singleton + * design pattern. While building the single instance, it sets up a + * mini cluster that actually consists of a mini DFS cluster and a + * mini MapReduce cluster on the local machine and also sets up the + * environment for Pig to run on top of the mini cluster. + */ +public class MiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("build/classes"); + private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml"); + + protected MiniMRYarnCluster m_mr = null; + private Configuration m_dfs_conf = null; + private Configuration m_mr_conf = null; + + /** + * @deprecated use {@link org.apache.pig.test.MiniGenericCluster.buildCluster() instead. + */ + @Deprecated + public static MiniCluster buildCluster() { + System.setProperty("test.exec.type", "mr"); + return (MiniCluster)MiniGenericCluster.buildCluster("mr"); + } + + @Override + public ExecType getExecType() { + return ExecType.MAPREDUCE; + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + final int dataNodes = 4; // There will be 4 data nodes + final int taskTrackers = 4; // There will be 4 task tracker nodes + + System.setProperty("hadoop.log.dir", "build/test/logs"); + // Create the dir that holds hadoop-site.xml file + // Delete if hadoop-site.xml exists already + CONF_DIR.mkdirs(); + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + + // Builds and starts the mini dfs and mapreduce clusters + Configuration config = new Configuration(); + config.set("yarn.scheduler.capacity.root.queues", "default"); + config.set("yarn.scheduler.capacity.root.default.capacity", "100"); + m_dfs = new MiniDFSCluster(config, dataNodes, true, null); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + //Create user home directory + m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); + + m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers); + m_mr.init(m_dfs_conf); + m_mr.start(); + + // Write the necessary config info to hadoop-site.xml + m_mr_conf = new Configuration(m_mr.getConfig()); + + m_conf = m_mr_conf; + m_conf.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + m_conf.unset(MRConfiguration.JOB_CACHE_FILES); + + m_conf.setInt(MRConfiguration.IO_SORT_MB, 200); + m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m"); + + m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2); + m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2); + m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2); + m_conf.set("dfs.datanode.address", "0.0.0.0:0"); + m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); + m_conf.set("pig.jobcontrol.sleep", "100"); + m_conf.writeXml(new FileOutputStream(CONF_FILE)); + m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()), + new Path("/pigtest/conf/hadoop-site.xml")); + DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf); + + System.err.println("XXX: Setting " + FileSystem.FS_DEFAULT_NAME_KEY + " to: " + m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + // Set the system properties needed by Pig + System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER)); + System.setProperty("namenode", m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void shutdownMiniMrClusters() { + // Delete hadoop-site.xml on shutDown + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + if (m_mr != null) { m_mr.stop(); } + m_mr = null; + } + + static public Launcher getLauncher() { + return new MapReduceLauncher(); + } +} Added: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (added) +++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType; +import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher; + +public class SparkMiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("build/classes"); + private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml"); + private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml"); + private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml"); + private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml"); + + private Configuration m_dfs_conf = null; + protected MiniMRYarnCluster m_mr = null; + private Configuration m_mr_conf = null; + + private static final Log LOG = LogFactory + .getLog(SparkMiniCluster.class); + private ExecType spark = new SparkExecType(); + SparkMiniCluster() { + + } + + @Override + public ExecType getExecType() { + return spark; + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + deleteConfFiles(); + CONF_DIR.mkdirs(); + + // Build mini DFS cluster + Configuration hdfsConf = new Configuration(); + m_dfs = new MiniDFSCluster.Builder(hdfsConf) + .numDataNodes(2) + .format(true) + .racks(null) + .build(); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + //Create user home directory + m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); + // Write core-site.xml + Configuration core_site = new Configuration(false); + core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + core_site.writeXml(new FileOutputStream(CORE_CONF_FILE)); + + Configuration hdfs_site = new Configuration(false); + for (Map.Entry<String, String> conf : m_dfs_conf) { + if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) { + hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey())); + } + } + hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE)); + + // Build mini YARN cluster + m_mr = new MiniMRYarnCluster("PigMiniCluster", 2); + m_mr.init(m_dfs_conf); + m_mr.start(); + m_mr_conf = m_mr.getConfig(); + m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + System.getProperty("java.class.path")); + + Configuration mapred_site = new Configuration(false); + Configuration yarn_site = new Configuration(false); + for (Map.Entry<String, String> conf : m_mr_conf) { + if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) { + if (conf.getKey().contains("yarn")) { + yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } else if (!conf.getKey().startsWith("dfs")){ + mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey())); + } + } + } + + mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE)); + yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE)); + + m_conf = m_mr_conf; + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + System.setProperty("hadoop.log.dir", "build/test/logs"); + } catch (IOException e) { + throw new RuntimeException(e); + + } + } + + @Override + protected void shutdownMiniMrClusters() { + deleteConfFiles(); + if (m_mr != null) { + m_mr.stop(); + m_mr = null; + } + } + + private void deleteConfFiles() { + + if(CORE_CONF_FILE.exists()) { + CORE_CONF_FILE.delete(); + } + if(HDFS_CONF_FILE.exists()) { + HDFS_CONF_FILE.delete(); + } + if(MAPRED_CONF_FILE.exists()) { + MAPRED_CONF_FILE.delete(); + } + if(YARN_CONF_FILE.exists()) { + YARN_CONF_FILE.delete(); + } + } + + static public Launcher getLauncher() { + return new SparkLauncher(); + } +} Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Fri Feb 24 08:19:42 2017 @@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -67,16 +66,10 @@ public class TestBZip { @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.") public static Iterable<Object[]> data() { - if ( HadoopShims.isHadoopYARN() ) { - return Arrays.asList(new Object[][] { - { false }, - { true } - }); - } else { - return Arrays.asList(new Object[][] { - { false } - }); - } + return Arrays.asList(new Object[][] { + { false }, + { true } + }); } public TestBZip (Boolean useBzipFromHadoop) { Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Feb 24 08:19:42 2017 @@ -130,6 +130,7 @@ import org.apache.pig.data.DefaultBagFac import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.ReadToEndLoader; import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.impl.logicalLayer.schema.Schema; @@ -3206,72 +3207,47 @@ public class TestBuiltin { @Test public void testUniqueID() throws Exception { Util.resetStateForExecModeSwitch(); - String inputFileName = "testUniqueID.txt"; - Util.createInputFile(cluster, inputFileName, new String[] - {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"}); Properties copyproperties = new Properties(); copyproperties.putAll(cluster.getProperties()); PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties); - pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10"); + + // running with 2 mappers each taking 5 records + String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath(); + Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"}); + Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"}); pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); - pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); + + pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);"); pigServer.registerQuery("B = foreach A generate name, UniqueID();"); Iterator<Tuple> iter = pigServer.openIterator("B"); - if (!Util.isSparkExecType(cluster.getExecType())) { - assertEquals(iter.next().get(1), "0-0"); - assertEquals(iter.next().get(1), "0-1"); - assertEquals(iter.next().get(1), "0-2"); - assertEquals(iter.next().get(1), "0-3"); - assertEquals(iter.next().get(1), "0-4"); - assertEquals(iter.next().get(1), "1-0"); - assertEquals(iter.next().get(1), "1-1"); - assertEquals(iter.next().get(1), "1-2"); - assertEquals(iter.next().get(1), "1-3"); - assertEquals(iter.next().get(1), "1-4"); - } else{ - //there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes) - //Split0: - // 1\n - // 2\n - // 3\n - // 4\n - // 5\n - // 1\n - //Split1: - // 2\n - // 3\n - // 4\n - // 5\n - //The size of Split0 is 12 not 10 because LineRecordReader#nextKeyValue will read one more line - //More detail see PIG-4383 - assertEquals(iter.next().get(1), "0-0"); - assertEquals(iter.next().get(1), "0-1"); - assertEquals(iter.next().get(1), "0-2"); - assertEquals(iter.next().get(1), "0-3"); - assertEquals(iter.next().get(1), "0-4"); - assertEquals(iter.next().get(1), "0-5"); - assertEquals(iter.next().get(1), "1-0"); - assertEquals(iter.next().get(1), "1-1"); - assertEquals(iter.next().get(1), "1-2"); - assertEquals(iter.next().get(1), "1-3"); - } - Util.deleteFile(cluster, inputFileName); + assertEquals("0-0",iter.next().get(1)); + assertEquals("0-1",iter.next().get(1)); + assertEquals("0-2",iter.next().get(1)); + assertEquals("0-3",iter.next().get(1)); + assertEquals("0-4",iter.next().get(1)); + assertEquals("1-0",iter.next().get(1)); + assertEquals("1-1",iter.next().get(1)); + assertEquals("1-2",iter.next().get(1)); + assertEquals("1-3",iter.next().get(1)); + assertEquals("1-4",iter.next().get(1)); + Util.deleteFile(cluster, TMP_DIR + "/input1.txt"); + Util.deleteFile(cluster, TMP_DIR + "/input2.txt"); } @Test public void testRANDOMWithJob() throws Exception { Util.resetStateForExecModeSwitch(); - String inputFileName = "testRANDOM.txt"; - Util.createInputFile(cluster, inputFileName, new String[] - {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"}); - Properties copyproperties = new Properties(); copyproperties.putAll(cluster.getProperties()); PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties); - // running with two mappers - pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10"); + + // running with 2 mappers each taking 5 records + String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath(); + Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"}); + Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"}); pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); - pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); + + pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);"); pigServer.registerQuery("B = foreach A generate name, RANDOM();"); Iterator<Tuple> iter = pigServer.openIterator("B"); double [] mapper1 = new double[5]; @@ -3294,7 +3270,8 @@ public class TestBuiltin { for( int i = 0; i < 5; i++ ){ assertNotEquals(mapper1[i], mapper2[i], 0.0001); } - Util.deleteFile(cluster, inputFileName); + Util.deleteFile(cluster, TMP_DIR + "/input1.txt"); + Util.deleteFile(cluster, TMP_DIR + "/input2.txt"); } Added: pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java (added) +++ pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + + +import java.util.Properties; + +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; + +import org.junit.Assert; +import org.junit.Test; + +public class TestConfigurationUtil { + + @Test + public void testExpandForAlternativeNames() { + Properties properties = null; + properties = ConfigurationUtil.expandForAlternativeNames("fs.df.interval", "500"); + Assert.assertEquals(1,properties.size()); + Assert.assertEquals("500",properties.get("fs.df.interval")); + + properties = ConfigurationUtil.expandForAlternativeNames("dfs.df.interval", "600"); + Assert.assertEquals(2,properties.size()); + Assert.assertEquals("600",properties.get("fs.df.interval")); + Assert.assertEquals("600",properties.get("dfs.df.interval")); + + properties = ConfigurationUtil.expandForAlternativeNames("", ""); + Assert.assertEquals(1,properties.size()); + Assert.assertEquals("",properties.get("")); + + } +} Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Fri Feb 24 08:19:42 2017 @@ -30,17 +30,17 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.fs.Path; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.pigstats.InputStats; import org.apache.pig.tools.pigstats.JobStats; +import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStats.JobGraph; -import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -49,8 +49,8 @@ import org.junit.runners.JUnit4; public class TestCounters { String file = "input.txt"; - static MiniCluster cluster = MiniCluster.buildCluster(); - + static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + final int MAX = 100*1000; Random r = new Random(); @@ -59,7 +59,7 @@ public class TestCounters { public static void oneTimeTearDown() throws Exception { cluster.shutDown(); } - + @Test public void testMapOnly() throws IOException, ExecException { int count = 0; @@ -70,13 +70,13 @@ public class TestCounters { if(t > 50) count ++; } pw.close(); - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only"); PigStats pigStats = job.getStatistics(); - + //counting the no. of bytes in the output file //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen(); InputStream is = FileLocalizer.open(FileLocalizer.fullPath( @@ -85,9 +85,9 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output_map_only"), true); @@ -98,7 +98,7 @@ public class TestCounters { JobGraph jg = pigStats.getJobGraph(); Iterator<JobStats> iter = jg.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); @@ -123,20 +123,20 @@ public class TestCounters { count ++; } pw.close(); - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = filter a by $0 > 50;"); pigServer.registerQuery("c = foreach b generate $0 - 50;"); ExecJob job = pigServer.store("c", "output_map_only", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath( "output_map_only", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -149,8 +149,8 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); - + JobStats js = iter.next(); + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -158,7 +158,7 @@ public class TestCounters { assertEquals(0, js.getReduceInputRecords()); assertEquals(0, js.getReduceOutputRecords()); } - + System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } @@ -183,7 +183,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -195,7 +195,7 @@ public class TestCounters { long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); @@ -208,7 +208,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -242,7 +242,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group;"); @@ -253,9 +253,9 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -266,7 +266,7 @@ public class TestCounters { JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -300,7 +300,7 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); @@ -311,20 +311,20 @@ public class TestCounters { pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -337,7 +337,7 @@ public class TestCounters { System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten()); assertEquals(filesize, pigStats.getBytesWritten()); } - + @Test public void testMapCombineReduceBinStorage() throws IOException, ExecException { int count = 0; @@ -358,20 +358,20 @@ public class TestCounters { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = group a by $0;"); pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);"); ExecJob job = pigServer.store("c", "output", "BinStorage"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); @@ -379,11 +379,11 @@ public class TestCounters { System.out.println("============================================"); System.out.println("Test case MapCombineReduce"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); Iterator<JobStats> iter = jp.iterator(); while (iter.hasNext()) { - MRJobStats js = (MRJobStats) iter.next(); + JobStats js = iter.next(); System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -399,6 +399,8 @@ public class TestCounters { @Test public void testMultipleMRJobs() throws IOException, ExecException { + Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job", + Util.isMapredExecType(cluster.getExecType())); int count = 0; PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); int [] nos = new int[10]; @@ -413,38 +415,38 @@ public class TestCounters { } pw.close(); - for(int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { if(nos[i] > 0) count ++; } - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.registerQuery("a = load '" + file + "';"); pigServer.registerQuery("b = order a by $0;"); pigServer.registerQuery("c = group b by $0;"); pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); ExecJob job = pigServer.store("d", "output"); PigStats pigStats = job.getStatistics(); - + InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), pigServer.getPigContext()); long filesize = 0; while(is.read() != -1) filesize++; - + is.close(); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("output"), true); - + System.out.println("============================================"); System.out.println("Test case MultipleMRJobs"); System.out.println("============================================"); - + JobGraph jp = pigStats.getJobGraph(); - MRJobStats js = (MRJobStats)jp.getSinks().get(0); - + JobStats js = (JobStats)jp.getSinks().get(0); + System.out.println("Job id: " + js.getName()); System.out.println(jp.toString()); - + System.out.println("Map input records : " + js.getMapInputRecords()); assertEquals(MAX, js.getMapInputRecords()); System.out.println("Map output records : " + js.getMapOutputRecords()); @@ -453,12 +455,12 @@ public class TestCounters { assertEquals(count, js.getReduceInputRecords()); System.out.println("Reduce output records : " + js.getReduceOutputRecords()); assertEquals(count, js.getReduceOutputRecords()); - + System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten()); assertEquals(filesize, js.getHdfsBytesWritten()); } - + @Test public void testMapOnlyMultiQueryStores() throws Exception { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file)); @@ -467,8 +469,8 @@ public class TestCounters { pw.println(t); } pw.close(); - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -479,22 +481,22 @@ public class TestCounters { List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); - + JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(MAX, counter); - } - + + assertEquals(MAX, counter); + } + @Test public void testMultiQueryStores() throws Exception { int[] nums = new int[100]; @@ -505,13 +507,13 @@ public class TestCounters { nums[t]++; } pw.close(); - + int groups = 0; for (int i : nums) { if (i > 0) groups++; } - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file + "';"); @@ -525,29 +527,29 @@ public class TestCounters { pigServer.registerQuery("store g into '/tmp/outout2';"); List<ExecJob> jobs = pigServer.executeBatch(); PigStats stats = jobs.get(0).getStatistics(); - + assertTrue(stats.getOutputLocations().size() == 2); - + cluster.getFileSystem().delete(new Path(file), true); cluster.getFileSystem().delete(new Path("/tmp/outout1"), true); cluster.getFileSystem().delete(new Path("/tmp/outout2"), true); - MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0); - + JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0); + Map<String, Long> entry = js.getMultiStoreCounters(); long counter = 0; for (Long val : entry.values()) { counter += val; } - - assertEquals(groups, counter); - } - - /* + + assertEquals(groups, counter); + } + + /* * IMPORTANT NOTE: * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE - * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED - */ + */ // @Test // public void testLocal() throws IOException, ExecException { // int count = 0; @@ -566,7 +568,7 @@ public class TestCounters { // } // pw.close(); // -// for(int i = 0; i < 10; i++) +// for(int i = 0; i < 10; i++) // if(nos[i] > 0) // count ++; // @@ -580,56 +582,56 @@ public class TestCounters { // pigServer.registerQuery("c = group b by $0;"); // pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); // PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics(); -// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs()); +// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs()); // long filesize = 0; // while(is.read() != -1) filesize++; -// +// // is.close(); // out.delete(); -// +// // //Map<String, Map<String, String>> stats = pigStats.getPigStats(); -// +// // assertEquals(10, pigStats.getRecordsWritten()); // assertEquals(110, pigStats.getBytesWritten()); // // } @Test - public void testJoinInputCounters() throws Exception { + public void testJoinInputCounters() throws Exception { testInputCounters("join"); } - + @Test - public void testCogroupInputCounters() throws Exception { + public void testCogroupInputCounters() throws Exception { testInputCounters("cogroup"); } - + @Test - public void testSkewedInputCounters() throws Exception { + public void testSkewedInputCounters() throws Exception { testInputCounters("skewed"); } - + @Test - public void testSelfJoinInputCounters() throws Exception { + public void testSelfJoinInputCounters() throws Exception { testInputCounters("self-join"); } - + private static boolean multiInputCreated = false; - + private static int count = 0; - - private void testInputCounters(String keyword) throws Exception { + + private void testInputCounters(String keyword) throws Exception { String file1 = "multi-input1.txt"; String file2 = "multi-input2.txt"; - + String output = keyword; - + if (keyword.equals("self-join")) { file2 = file1; keyword = "join"; } - - final int MAX_NUM_RECORDS = 100; + + final int MAX_NUM_RECORDS = 100; if (!multiInputCreated) { PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { @@ -637,7 +639,7 @@ public class TestCounters { pw.println(t); } pw.close(); - + PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2)); for (int i = 0; i < MAX_NUM_RECORDS; i++) { int t = r.nextInt(100); @@ -649,8 +651,8 @@ public class TestCounters { pw2.close(); multiInputCreated = true; } - - PigServer pigServer = new PigServer(ExecType.MAPREDUCE, + + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); pigServer.setBatchOn(); pigServer.registerQuery("a = load '" + file1 + "';"); @@ -661,7 +663,7 @@ public class TestCounters { pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';"); } ExecJob job = pigServer.store("c", output + "_output"); - + PigStats stats = job.getStatistics(); assertTrue(stats.isSuccessful()); List<InputStats> inputs = stats.getInputStats(); @@ -680,4 +682,46 @@ public class TestCounters { } } } + + @Test + public void testSplitUnionOutputCounters() throws Exception { + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input")); + for (int i = 0; i < 10; i++) { + pw.println(i); + } + pw.close(); + String query = + "a = load 'splitunion-input';" + + "split a into b if $0 < 5, c otherwise;" + + "d = union b, c;"; + + pigServer.registerQuery(query); + + ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage"); + PigStats stats1 = job.getStatistics(); + + query = + "a = load 'splitunion-input';" + + "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" + + "e = distinct d;" + + "f = union b, c, e;"; + + pigServer.registerQuery(query); + + job = pigServer.store("f", "splitunion-output-1", "PigStorage"); + PigStats stats2 = job.getStatistics(); + + PigStats[] pigStats = new PigStats[]{stats1, stats2}; + for (int i = 0; i < 2; i++) { + PigStats stats = pigStats[i]; + assertTrue(stats.isSuccessful()); + List<OutputStats> outputs = stats.getOutputStats(); + assertEquals(1, outputs.size()); + OutputStats output = outputs.get(0); + assertEquals("splitunion-output-" + i, output.getName()); + assertEquals(10, output.getNumberRecords()); + assertEquals(20, output.getBytes()); + } + } }
