Modified: pig/branches/spark/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/nightly.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/nightly.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/nightly.conf Wed Feb 22 09:43:41 2017 @@ -567,7 +567,6 @@ store c into ':OUTPATH:';\, { 'num' => 9, 'floatpostprocess' => 1, - 'ignore23' => 'I cannot get it right due to float precision, temporarily disable', 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = group a by name; c = foreach b generate group, AVG(a.gpa); @@ -1518,8 +1517,8 @@ store i into ':OUTPATH:';\, { # Union + operators 'num' => 12, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:double); -b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age:int, gpa:double); + 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa:double); c = union a, b; -- Exercise all expression operators -- d = foreach c generate (name is not NULL? UPPER(name) : 'FNU LNU') as name, (age < 30 ? -1 : age) as age, (gpa is NULL ? 0.0 : ((gpa > 0.5 AND gpa < 1.0) ? 1 : gpa)) as gpa; @@ -2186,7 +2185,7 @@ store d into ':OUTPATH:';\, b = order a by $0, $1, $2; c = limit b 100; store c into ':OUTPATH:';\, - 'sortArgs' => ['-t', ' ', '-k', '1,3'], + 'sortArgs' => ['-t', ' ', '-k', '1,2'], }, { # Make sure that limit higher than number of rows doesn't mess stuff up @@ -2206,6 +2205,7 @@ store c into ':OUTPATH:';\, }, { 'num' => 5, + 'execonly' => 'mapred,local', #tez may pick either input as part of the optimization so cannot be tested easily 'pig' =>q\a = load ':INPATH:/singlefile/studenttab10k'; b = load ':INPATH:/singlefile/votertab10k'; a1 = foreach a generate $0, $1; @@ -2285,7 +2285,21 @@ store d into ':OUTPATH:';\, 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); b = limit a 2000; store b into ':OUTPATH:';\, - } + }, + { + 'num' => 12, + 'execonly' => 'tez', #Limit_5 was not able to test on tez. + 'pig' =>q\a = load ':INPATH:/singlefile/studenttab10k'; +b = load ':INPATH:/singlefile/studenttab10k'; +a1 = foreach a generate $0, $1; +b1 = foreach b generate $0, $1; +c = union a1, b1; +d = limit c 100; +store d into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int); +b = limit a 100; +store b into ':OUTPATH:';\, + } ] }, { @@ -2736,6 +2750,41 @@ store c into ':OUTPATH:';\, }, ], }, + { + 'name' => 'StoreLoad', + 'tests' => [ + { + 'num' => 1, + 'floatpostprocess' => 1, + 'delimiter' => ' ', + 'pig' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double); +b = filter a by age < 25; +c = filter a by age > 70; +store b into ':OUTPATH:.intermediate1' using PigStorage(','); +store c into ':OUTPATH:.intermediate2' using PigStorage(','); +d = load ':OUTPATH:.intermediate1' using PigStorage(',') as (name:chararray, age:int, gpa: double); +e = load ':OUTPATH:.intermediate2' using PigStorage(',') as (name:chararray, age:int, gpa: double); +f = join d by name, e by name; +store f into ':OUTPATH:';\, + 'notmq' => 1, + }, + { + # Self join + 'num' => 2, + 'floatpostprocess' => 1, + 'delimiter' => ' ', + 'pig' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double); +b = filter a by name == 'nick miller'; +store b into ':OUTPATH:.intermediate' using PigStorage(','); +c = load ':OUTPATH:.intermediate' using PigStorage(',') as (name:chararray, age:int, gpa: double); +d = join a by name, c by name; +store d into ':OUTPATH:';\, + 'notmq' => 1, + }, + ], + }, { 'name' => 'MergeJoin', @@ -3171,6 +3220,25 @@ e = join a by name full outer, b by name store e into ':OUTPATH:';\, }, + # skew join with tuple key + { + 'num' => 15, + 'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100'], + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +c = group a by (name, age); +d = group b by (name, age); +e = join c by $0, d by $0 using 'skewed' parallel 5; +f = foreach e generate c::group, flatten(c::a), d::group, flatten(d::b); +store f into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +c = group a by (name, age); +d = group b by (name, age); +e = join c by $0, d by $0; +f = foreach e generate c::group, flatten(c::a), d::group, flatten(d::b); +store f into ':OUTPATH:';\ + } ] }, @@ -4243,40 +4311,32 @@ store e into ':OUTPATH:';\, # test common 'num' => 1, 'pig' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); -b = native ':MAPREDJARS:/hadoop-examples.jar' Store a into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' `wordcount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`; +b = native ':MAPREDJARS:/hadoop-examples.jar' Store a into ':OUTPATH:.intermediate.1' Load ':OUTPATH:.intermediate.2' `wordcount :OUTPATH:.intermediate.1 :OUTPATH:.intermediate.2`; store b into ':OUTPATH:';\, 'notmq' => 1, 'verify_pig_script' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); -b = mapreduce ':MAPREDJARS:/hadoop-examples.jar' Store a into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' `wordcount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`; +b = mapreduce ':MAPREDJARS:/hadoop-examples.jar' Store a into ':OUTPATH:.intermediate.1' Load ':OUTPATH:.intermediate.2' `wordcount :OUTPATH:.intermediate.1 :OUTPATH:.intermediate.2`; store b into ':OUTPATH:';\, }, { # test complex 'num' => 2, 'pig' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); b = foreach a generate name; c = distinct b; -d = native ':MAPREDJARS:/hadoop-examples.jar' Store c into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' as (name:chararray, count: int) `wordcount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`; +d = native ':MAPREDJARS:/hadoop-examples.jar' Store c into ':OUTPATH:.intermediate.1' Load ':OUTPATH:.intermediate.2' as (name:chararray, count: int) `wordcount :OUTPATH:.intermediate.1 :OUTPATH:.intermediate.2`; e = order d by name; store e into ':OUTPATH:';\, 'sortArgs' => ['-t', ' '], 'notmq' => 1, 'verify_pig_script' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); b = foreach a generate name; c = distinct b; -d = mapreduce ':MAPREDJARS:/hadoop-examples.jar' Store c into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' as (name:chararray, count: int) `wordcount table_testNativeMRJobSimple_input table_testNativeMRJobSimple_output`; +d = mapreduce ':MAPREDJARS:/hadoop-examples.jar' Store c into ':OUTPATH:.intermediate.1' Load ':OUTPATH:.intermediate.2' as (name:chararray, count: int) `wordcount :OUTPATH:.intermediate.1 :OUTPATH:.intermediate.2`; e = order d by name; store e into ':OUTPATH:';\, }, @@ -4284,16 +4344,8 @@ store e into ':OUTPATH:';\, # test streaming 'num' => 3, 'pig' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); -b = mapreduce ':MAPREDJARS:/hadoop-streaming.jar' Store a into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' as (name:chararray, count: int) `-input table_testNativeMRJobSimple_input -output table_testNativeMRJobSimple_output -mapper cat -reducer wc`; -store b into ':OUTPATH:';\, - 'pig23' => q\ -rmf table_testNativeMRJobSimple_input -rmf table_testNativeMRJobSimple_output -a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); -b = mapreduce ':MAPREDJARS:/hadoop-0.23.0-streaming.jar' Store a into 'table_testNativeMRJobSimple_input' Load 'table_testNativeMRJobSimple_output' as (name:chararray, count: int) `-input table_testNativeMRJobSimple_input -output table_testNativeMRJobSimple_output -mapper cat -reducer wc`; +b = mapreduce ':MAPREDJARS:/hadoop-streaming.jar' Store a into ':OUTPATH:.intermediate.1' Load ':OUTPATH:.intermediate.2' as (name:chararray, count: int) `-input :OUTPATH:.intermediate.1 -output :OUTPATH:.intermediate.2 -mapper cat -reducer wc`; store b into ':OUTPATH:';\, 'notmq' => 1, }, @@ -4884,21 +4936,6 @@ a = load ':INPATH:/singlefile/allscalar1 b = load ':INPATH:/singlefile/allscalar10k' using PigStorage() as (name:chararray, age:int, gpa:double, instate:chararray); C = union a, b; store C into ':OUTPATH:';\, - }, - { - # Test Union using merge with incompatible types. float->bytearray and chararray->bytearray - 'num' => 8, - 'delimiter' => ' ', - 'pig' => q\ -A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int); -B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray); -C = union onschema A, B; -store C into ':OUTPATH:';\, - 'verify_pig_script' => q\ -A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray); -B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray); -C = union A, B; -store C into ':OUTPATH:';\, } ] @@ -4927,7 +4964,6 @@ store C into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, - 'ignore23' => 'guava version of Pig is higher than hadoop 23', 'pig' => q?register :FUNCPATH:/testudf.jar; define gm org.apache.pig.test.udf.evalfunc.GoodMonitored(); a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -5297,6 +5333,26 @@ store C into ':OUTPATH:';\, C = UNION A,B; D = filter C by name == 'alice allen'; store D into ':OUTPATH:';", + },{ + 'num' => 5, + 'pig' => "set pig.optimizer.rules.disabled PushUpFilter; + define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3'); + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + B = filter A by name == 'alice allen'; + C = group B all; + D = foreach C generate bb(B.name) as bloomfilter; + E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + F = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + G = union E, F; + -- PushUpFilter is disabled to avoid filter being pushed before union + H = filter G by Bloom(D.bloomfilter, name); + store H into ':OUTPATH:';", + 'verify_pig_script' => " + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + B = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + C = UNION A,B; + D = filter C by name == 'alice allen'; + store D into ':OUTPATH:';", } ], },{ @@ -5637,13 +5693,15 @@ store a into ':OUTPATH:';\, 'execonly' => 'mapred,tez', 'pig' => q\ SET default_parallel 7; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A; C = foreach B generate rank_A,a,b,c; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rownumber,a,b,c; store C into ':OUTPATH:'; \, @@ -5652,13 +5710,15 @@ store a into ':OUTPATH:';\, 'execonly' => 'mapred,tez', 'pig' => q\ SET default_parallel 9; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A by b DESC,a ASC; C = foreach B generate rank_A,b,a; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankbdaa,b,a; store C into ':OUTPATH:'; \, @@ -5667,13 +5727,15 @@ store a into ':OUTPATH:';\, 'execonly' => 'mapred,tez', 'pig' => q\ SET default_parallel 7; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A by c ASC,b DESC; C = foreach B generate rank_A,c,b; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankcabd,c,b; store C into ':OUTPATH:'; \, @@ -5681,26 +5743,29 @@ store a into ':OUTPATH:';\, 'num' => 4, 'execonly' => 'mapred,tez', 'pig' => q\ - SET default_parallel 25; - A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray); + SET default_parallel 5; + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A; C = order B by rank_A; - D = foreach C generate rank_A,rownumber; + D = foreach C generate rank_A,a,b,c; store D into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray); - D = foreach A generate idx,rownumber; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); + D = foreach A generate rownumber,a,b,c; store D into ':OUTPATH:'; \, }, { 'num' => 5, 'execonly' => 'mapred,tez', 'pig' => q\ - SET default_parallel 11; + SET default_parallel 5; + SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; - A = LOAD ':INPATH:/singlefile/biggish' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray); - B = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); + B = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = join A by rownumber, B by rownumber; D = order C by B::rankcabd,B::rankbdca,B::rankaaba; E = rank D; @@ -5710,7 +5775,7 @@ store a into ':OUTPATH:';\, store H into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,idx:long,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,idx:long); B = foreach A generate rownumber,1; C = order B by rownumber; store C into ':OUTPATH:'; @@ -5719,14 +5784,16 @@ store a into ':OUTPATH:';\, 'num' => 6, 'execonly' => 'mapred,tez', 'pig' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); split A into M if rownumber > 15, N if rownumber < 25; C = rank N; D = foreach C generate $0, a, b, c; store D into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = filter A by rownumber < 25; D = foreach B generate rownumber, a, b, c; store D into ':OUTPATH:'; @@ -5741,14 +5808,16 @@ store a into ':OUTPATH:';\, 'num' => 1, 'execonly' => 'mapred,tez', 'pig' => q\ + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; SET default_parallel 9; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A by a ASC,b ASC DENSE; C = foreach B generate rank_A,a,b; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankaaba,a,b; store C into ':OUTPATH:'; \, @@ -5756,14 +5825,16 @@ store a into ':OUTPATH:';\, 'num' => 2, 'execonly' => 'mapred,tez', 'pig' => q\ + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; SET default_parallel 9; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A by a ASC,c DESC DENSE; C = foreach B generate rank_A,a,c; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankaacd,a,c; store C into ':OUTPATH:'; \, @@ -5771,14 +5842,16 @@ store a into ':OUTPATH:';\, 'num' => 3, 'execonly' => 'mapred,tez', 'pig' => q\ + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; SET default_parallel 7; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = rank A by b DESC,c ASC DENSE; C = foreach B generate rank_A,b,c; store C into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankbdca,b,c; store C into ':OUTPATH:'; \, @@ -5786,9 +5859,11 @@ store a into ':OUTPATH:';\, 'num' => 4, 'execonly' => 'mapred,tez', 'pig' => q\ + SET mapreduce.input.fileinputformat.split.maxsize '300'; + SET pig.splitCombination false; SET default_parallel 7; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); - B = foreach A generate a,b,c,tail; + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); + B = foreach A generate a,b,c; C = rank B by a ASC,b ASC DENSE; D = rank C by a ASC,c DESC DENSE; E = rank D by b DESC,c ASC DENSE; @@ -5796,7 +5871,7 @@ store a into ':OUTPATH:';\, store F into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = foreach A generate rankbdca,rankaacd,rankaaba,a,b,c; store B into ':OUTPATH:'; \, @@ -5805,8 +5880,9 @@ store a into ':OUTPATH:';\, 'execonly' => 'mapred,tez', 'pig' => q\ SET default_parallel 9; + SET mapreduce.input.fileinputformat.split.maxsize '300'; SET pig.splitCombination false; - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); B = foreach A generate a,b,c; C = rank B by a ASC,b ASC DENSE; D = rank B by a ASC,c DESC DENSE; @@ -5816,7 +5892,7 @@ store a into ':OUTPATH:';\, store H into ':OUTPATH:'; \, 'verify_pig_script' => q\ - A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int,tail:bytearray); + A = LOAD ':INPATH:/singlefile/prerank' using PigStorage(',') as (rownumber:long,rankcabd:long,rankbdaa:long,rankbdca:long,rankaacd:long,rankaaba:long,a:int,b:int,c:int); C = foreach A generate rankaaba,a,b,c; E = order C by a ASC,b ASC; D = foreach A generate rankaacd,a,b,c;
Modified: pig/branches/spark/test/e2e/pig/tests/orc.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/orc.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/orc.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/orc.conf Wed Feb 22 09:43:41 2017 @@ -1,3 +1,21 @@ +#!/usr/bin/env perl +############################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +############################################################################### $cfg = { 'driver' => 'Pig', 'nummachines' => 5, Modified: pig/branches/spark/test/e2e/pig/tests/turing_jython.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/turing_jython.conf?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/turing_jython.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/turing_jython.conf Wed Feb 22 09:43:41 2017 @@ -50,7 +50,7 @@ d = filter b by age < 50; e = cogroup c by (name, age), d by (name, age) ; f = foreach e generate flatten(c), flatten(d); g = group f by registration; -h = foreach g generate group, SUM(f.d::contributions); +h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0; i = order h by $1; store i into '$out'; """).bind({'in1':input1,'in2':input2, 'out':output}).runSingle() @@ -68,7 +68,7 @@ else: e = cogroup c by (name, age), d by (name, age) ; f = foreach e generate flatten(c), flatten(d); g = group f by registration; - h = foreach g generate group, SUM(f.d::contributions); + h = foreach g generate group, (float) ROUND(SUM(f.d::contributions) * 100) / 100.0; i = order h by $1; store i into ':OUTPATH:'; \, @@ -92,38 +92,12 @@ hdfs = FileSystem.get(config) } ] - }, - { - 'name' => 'Jython_Embedded', - 'tests' => [ - { - 'num' => 1, - ,'pig' => q\#!/usr/bin/python -# JYTHON COMMENT -from org.apache.pig.scripting import Pig - -P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -store A into ':OUTPATH:';""") - -Q = P.bind() - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\, - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:';\, - } - ] }, { 'name' => 'Jython_CompileBindRun' ,'tests' => [ - { # bind() with no parameters, runSingle + { + # bind no parameters, runSingle 'num' => 1 ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT @@ -150,7 +124,7 @@ else: ,'delimiter' => ' ' },{ -# 9.2 1 bind single input parameter and no output parameters + # bind single input parameter 'num' => 2 ,'pig' => q\#!/usr/bin/python @@ -179,7 +153,7 @@ else: # ,'expected_out_regex' => "Pig job PASSED" },{ -# bind parallel execution with a multiple entries + # bind parallel execution with a multiple entries 'num' => 3 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -231,9 +205,7 @@ for i in [0, 1, 2]: \, },{ -# 8.6 compile pig script file with no input and no output parameters -#12.2 import python modules -# + # compile pig script file with no parameters 'num' => 4 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -243,6 +215,7 @@ pig_script = ":TMP:/script.pig" pigfile = open( pig_script, 'w+') pigfile.write(""" A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +-- a comment store A into ':OUTPATH:'; """) pigfile.close() @@ -263,7 +236,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ -# 8.7 compile pig script file with no input and with output parameters + # compile pig script file with parameters 'num' => 5 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -300,7 +273,7 @@ else: ,'delimiter' => ' ' },{ - # 11.15 1 results.getResults(alias) for null results + # results.getResults(alias) for null results 'num' => 6 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -318,7 +291,7 @@ result = P.bind().runSingle() store EMPTY into ':OUTPATH:';\ }, { - # bind reading from python context + # bind parameters from python context 'num' => 7 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -340,7 +313,7 @@ result = P.bind().runSingle() store B into ':OUTPATH:';\ },{ - # bind multiple times + # bind multiple times 'num' => 8 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -367,56 +340,8 @@ for i in [1,2,3]: B= foreach A generate age + 3; store B into ':OUTPATH:.3';\, - }, - { - # invoke .run() on a non-parallel pig script - 'num' => 9 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -P = Pig.compile(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -store A into ':OUTPATH:'; -""") -result = P.bind().run() -\, - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:';\, - }, - { -# 8.6 compile pig script file with no input and no output parameters -#12.2 import python modules -# - 'num' => 10 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script -pig_script = ":TMP:/script.pig" -pigfile = open( pig_script, 'w+') -pigfile.write(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); --- a comment -store A into ':OUTPATH:'; -""") -pigfile.close() - -#execute pig script - -result = Pig.compileFromFile( pig_script ).bind().runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" -else: - raise "Pig job FAILED" -\, - - 'verify_pig_script' => q\A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); - store A into ':OUTPATH:'; -\ - ,'floatpostprocess' => 1 - ,'delimiter' => ' ' },{ + # python script with parameters 'num' => 11 ,'pig_params' => ['-p', qq(loadfile='studenttab10k')], ,'pig' => q\#!/usr/bin/python @@ -441,6 +366,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ + # python script with parameter file 'num' => 12 ,'pig_params' => ['-m', ":PARAMPATH:/params_3"], ,'pig' => q\#!/usr/bin/python @@ -465,6 +391,7 @@ else: ,'floatpostprocess' => 1 ,'delimiter' => ' ' },{ + # python script with command line arguments 'num' => 13 ,'additional_cmd_args' => ['studenttab10k'] ,'pig' => q\#!/usr/bin/python @@ -495,7 +422,7 @@ else: 'name' => 'Jython_Diagnostics' ,'tests' => [ { -# 11.23 1 explain() on a complex query + # explain() on a complex query 'num' => 1 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -525,7 +452,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 }, { -#11.22 1 illustrate() on a complex query + # illustrate() on a complex query 'num' => 2 ,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez ,'pig' => q\#!/usr/bin/python @@ -555,7 +482,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 ,'expected_out_regex' => "A.*name:bytearray.*age:bytearray.*gpa:bytearray" }, { -# 11.24 1 describe() on an alias + # describe() on an alias 'num' => 3 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -583,7 +510,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'rc'=> 0 ,'expected_out_regex' => "A:.*{name:.*bytearray,age:.*bytearray,gpa:.*bytearray}" }, { -#11.29 1 describe() on an undefined alias + # describe() on an undefined alias 'num' => 4 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -613,7 +540,7 @@ result = P.bind({'in1':input1, 'in2':inp }, { -# 11.27 1 illustrate(alias) + # illustrate(alias) 'num' => 5 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -643,7 +570,7 @@ result = P.bind({'in1':input1, 'in2':inp ,'expected_err_regex' => "ERROR 1121" }, { -# 11.28 1 explain(alias) + # explain(alias) 'num' => 6 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -710,14 +637,10 @@ Pig.fs("-copyFromLocal :TMP:/iterator_ou }, ] }, { -# 12.2 import python modules -# 12.1 python comments -# 12.6 fs lists a file - - 'name' => 'Jython_Misc' ,'tests' => [ { + # fs commands: lists a file 'num' => 1 ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT @@ -778,8 +701,8 @@ P.bind().runSingle() 'name' => 'Jython_Properties', 'tests' => [ { + # check if property is passed to Pig 'num' => 1 - ,'ignore' => 1 # This is a good test except that we can't verify it. ,'pig' => q\#!/usr/bin/python # JYTHON COMMENT from org.apache.pig.scripting import Pig @@ -791,7 +714,7 @@ store A into ':OUTPATH:';""") Q = P.bind() prop = Properties() -prop.put("mapred.job.name", "friendship") +prop.put("pig.default.load.func", "wrong") result = Q.runSingle(prop) if result.isSuccessful(): @@ -799,10 +722,8 @@ if result.isSuccessful(): else: raise "Pig job FAILED" \ - - ,'sql' => "select name, age, gpa+0.00 from studenttab10k;" - ,'floatpostprocess' => 1 - ,'delimiter' => ' ' + ,'rc'=> 6 + ,'expected_err_regex' => "ERROR 1070: Could not resolve wrong using imports" } ] @@ -811,7 +732,7 @@ else: 'name' => 'Jython_Error', 'tests' => [ { - # run a script that returns single negative result + # run a script that returns single negative result 'num' => 1 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig @@ -834,103 +755,18 @@ else: ,'rc' => 6 ,'expected_err_regex' => "ERROR 1121" - }, - { - # run a script that returns single negative result - 'num' => 2 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -input= ":INPATH:/singlefile/studenttab10k" -output = ":OUTPATH:" - -P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""") - -Q = P.bind({'in':input, 'out':bad_output}) - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'rc' => 6 - ,'expected_err_regex' => "name 'bad_output' is not defined" },{ - # bind an undefined input parameter - 'num' => 3 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -input= ":INPATH:/singlefile/studenttab10k" -output = ":OUTPATH:" - -P = Pig.compile("""A = load '$in' as (name, age, gpa); store A into '$out';""") - -Q = P.bind({'in':invalid_parameter, 'out':output}) - -result = Q.runSingle() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'expected_err_regex' => "ERROR 1121" - ,'rc'=> 6 - - }, - { - # compileFromFile for pig script file that does not exist throws IOException + # compileFromFile for pig script file that does not exist throws IOException 'num' => 4 ,'pig' => q\#!/usr/bin/python +import os from org.apache.pig.scripting import Pig # intentionally don't create pig script -pig_script = tmp_dir + "/script.pig" - -#execute pig script -input1= ":INPATH:/singlefile/studenttab10k" -input2= ":INPATH:/singlefile/votertab10k" -output1= ":OUTPATH:.1" -output2= ":OUTPATH:.2" - -result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() - -if result.isSuccessful(): - print "Pig job PASSED" - -else: - raise "Pig job FAILED" -\ - - ,'expected_err_regex' => "ERROR 1121" - ,'rc'=> 6 - }, - { - # compileFromFile for pig script file that does not have read permissions throws IOException - 'num' => 5 - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script - pig_script = ":TMP:/script.pig" -pigfile = open( pig_script, 'w') -#no read permissions and file is left open until afer compile statement -pigfile.write(""" -A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); -B = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); -store A into '$out1'; -store B into '$out2'; -""") -pigfile.close() + +os.remove(pig_script) #execute pig script input1= ":INPATH:/singlefile/studenttab10k" @@ -938,11 +774,9 @@ input2= ":INPATH:/singlefile/votertab10k output1= ":OUTPATH:.1" output2= ":OUTPATH:.2" -result = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() - -pigfile.close() +results = Pig.compileFromFile(pig_script).bind({'in1':input1,'in2':input2, 'out1':output1, 'out2':output2 }).run() -if result.isSuccessful(): +if results[0].isSuccessful(): print "Pig job PASSED" else: @@ -977,13 +811,16 @@ else: ,'expected_err_regex' => "ERROR 1121" }, { - # 11.10 iter.next for an alias that is undefined + # iter.next for an alias that is undefined 'num' => 7 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig #create pig script +out1= ":OUTPATH:.1" +out2= ":OUTPATH:.2" + P = Pig.compile("""A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); B= filter A by age < 50; store B into '$out1'; @@ -992,9 +829,10 @@ D = filter C by name matches '^fred*'; store D into '$out2'; """) -result = P.bind().run() +results = P.bind().run() +iter = results[0].result("E").iterator() -if result.isSuccessful(): +if results[0].isSuccessful(): print "Pig job PASSED" else: @@ -1010,30 +848,6 @@ else: 'tests' => [ { # sql command - 'num' => 1 - ,'java_params' => ['-Dhcat.bin=:HCATBIN:'] - ,'pig' => q\#!/usr/bin/python -from org.apache.pig.scripting import Pig - -#create pig script - -Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""") -ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string, -age int, -gpa double) -stored as textfile; -""") - -if ret==0: - print "SQL command PASSED" - -else: - raise "SQL command FAILED" -\ - ,'rc' => 0 - }, - { - # sql command 'num' => 2 ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig Modified: pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl (original) +++ pig/branches/spark/test/e2e/pig/tools/generate/generate_data.pl Wed Feb 22 09:43:41 2017 @@ -41,7 +41,6 @@ our @lastName = ("allen", "brown", "cars # rankaacd: RANK BY a ASC , c DESC # rankaaba: RANK BY a ASC , b ASC # a,b,c: values -# tail: long value in order to create multiple mappers ############################################################################ our @rankedTuples = ( "1,21,5,7,1,1,0,8,8","2,26,2,3,2,5,1,9,10","3,30,24,21,2,3,1,3,10","4,6,10,8,3,4,1,7,2", @@ -501,22 +500,10 @@ sub getBulkCopyCmd(){ my $randf = rand(10); printf HDFS "%d:%d:%d:%d:%d:%dL:%.2ff:%.2f\n", $tid, $i, $rand5, $rand100, $rand1000, $rand1000, $randf, $randf; } - } elsif ($filetype eq "ranking") { + } elsif ($filetype eq "ranking") { for (my $i = 0; $i < $numRows; $i++) { my $tuple = $rankedTuples[int($i)]; - printf HDFS "$tuple,"; - for my $j ( 0 .. 1000000) { - printf HDFS "%d",$j; - } - printf HDFS "\n"; - } - } elsif ($filetype eq "biggish") { - for (my $i = 1; $i < $numRows; $i++) { - printf HDFS "$i,$i,"; - for my $j ( 0 .. 1000) { - printf HDFS "%d",$j; - } - printf HDFS "\n"; + printf HDFS "$tuple\n"; } } elsif ($filetype eq "utf8Student") { srand(3.14159 + $numRows); Modified: pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original) +++ pig/branches/spark/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Wed Feb 22 09:43:41 2017 @@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle // result, the number of StoreFunc instances is greater by 1 in // Hadoop-2.0.x. assertTrue("storer instanciation count increasing: " + Storer.count, - Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4)); + Storer.count <= 5); } } Modified: pig/branches/spark/test/org/apache/pig/TestMain.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestMain.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/TestMain.java (original) +++ pig/branches/spark/test/org/apache/pig/TestMain.java Wed Feb 22 09:43:41 2017 @@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru import static org.junit.Assert.fail; import java.io.BufferedWriter; +import java.io.BufferedReader; import java.io.File; import java.io.FileWriter; +import java.io.FileReader; import java.io.IOException; import java.util.Properties; @@ -35,6 +37,7 @@ import org.apache.pig.impl.logicalLayer. import org.apache.pig.parser.ParserException; import org.apache.pig.parser.SourceLocation; import org.apache.pig.test.TestPigRunner.TestNotificationListener; +import org.apache.pig.test.Util; import org.apache.pig.tools.parameters.ParameterSubstitutionException; import org.apache.pig.tools.pigstats.PigStats; import org.junit.Test; @@ -152,6 +155,35 @@ public class TestMain { } + @Test + public void testParseInputScript() throws Exception { + File input = Util.createInputFile("tmp", "", + new String[]{"{(1,1.0)}\ttestinputstring1", + "{(2,2.0)}\ttestinputstring1", + "{(3,3.0)}\ttestinputstring1", + "{(4,4.0)}\ttestinputstring1"} + ); + File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut"); + File scriptFile = Util.createInputFile("pigScript", "", + new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);", + "B = foreach A generate\n" + + " b,\n" + + " (bag{tuple(long)}) a.x as ax:{(x:long)};", + "store B into '"+out.getAbsolutePath()+"';"} + ); + + Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null); + BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000"))); + String line; + int count = 0; + while(( line = file.readLine()) != null) { + count++; + } + assertEquals(4,count); + Util.deleteDirectory(new File(out.getAbsolutePath())); + assertTrue(!new File(out.getAbsolutePath()).exists()); + } + public static class TestNotificationListener2 extends TestNotificationListener { protected boolean hadArgs = false; public TestNotificationListener2() {} Modified: pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java Wed Feb 22 09:43:41 2017 @@ -709,6 +709,19 @@ public class TestAvroStorage { } @Test + public void testGroupWithRepeatedSubRecords() throws Exception { + final String input = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro"; + final String check = basedir + "data/avro/uncompressed/recordWithRepeatedSubRecords.avro"; + testAvroStorage(true, basedir + "code/pig/group_test.pig", + ImmutableMap.of( + "INFILE", input, + "AVROSTORAGE_OUT_2", "-f " + basedir + "schema/recordWithRepeatedSubRecords.avsc", + "OUTFILE", createOutputName()) + ); + verifyResults(createOutputName(),check); + } + + @Test public void testLoadDirectory() throws Exception { final String input = basedir + "data/avro/uncompressed/testdirectory"; final String check = basedir + "data/avro/uncompressed/testDirectoryCounts.avro"; Modified: pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestOrcStorage.java Wed Feb 22 09:43:41 2017 @@ -195,7 +195,7 @@ public class TestOrcStorage { Reader reader = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT1))); assertEquals(reader.getNumberOfRows(), 2); - RecordReader rows = reader.rows(null); + RecordReader rows = reader.rows(); Object row = rows.next(null); StructObjectInspector soi = (StructObjectInspector)reader.getObjectInspector(); IntWritable intWritable = (IntWritable)soi.getStructFieldData(row, @@ -291,7 +291,7 @@ public class TestOrcStorage { ObjectInspector oi = orcReader.getObjectInspector(); StructObjectInspector soi = (StructObjectInspector) oi; - RecordReader reader = orcReader.rows(null); + RecordReader reader = orcReader.rows(); Object row = null; while (reader.hasNext()) { @@ -326,9 +326,9 @@ public class TestOrcStorage { Reader orcReaderActual = OrcFile.createReader(fs, orcFile); StructObjectInspector soiActual = (StructObjectInspector) orcReaderActual.getObjectInspector(); - RecordReader readerExpected = orcReaderExpected.rows(null); + RecordReader readerExpected = orcReaderExpected.rows(); Object expectedRow = null; - RecordReader readerActual = orcReaderActual.rows(null); + RecordReader readerActual = orcReaderActual.rows(); Object actualRow = null; while (readerExpected.hasNext()) { Added: pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig (added) +++ pig/branches/spark/test/org/apache/pig/builtin/avro/code/pig/group_test.pig Wed Feb 22 09:43:41 2017 @@ -0,0 +1,5 @@ +in = LOAD '$INFILE' USING AvroStorage(); +grouped = GROUP in BY (value1.thing); +flattened = FOREACH grouped GENERATE flatten(in) as (key: chararray,value1: (thing: chararray,count: int),value2: (thing: chararray,count: int)); +RMF $OUTFILE; +STORE flattened INTO '$OUTFILE' USING AvroStorage(); Modified: pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java (original) +++ pig/branches/spark/test/org/apache/pig/data/TestSchemaTuple.java Wed Feb 22 09:43:41 2017 @@ -17,9 +17,9 @@ */ package org.apache.pig.data; -import static junit.framework.Assert.assertEquals; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -599,33 +599,33 @@ public class TestSchemaTuple { Data data = resetData(pigServer); data.set("foo1", - tuple(0), - tuple(1), - tuple(2), - tuple(3), - tuple(4), - tuple(5), - tuple(6), - tuple(7), - tuple(8), - tuple(9) + tuple(0, 0), + tuple(1, 1), + tuple(2, 2), + tuple(3, 3), + tuple(4, 4), + tuple(5, 5), + tuple(6, 6), + tuple(7, 7), + tuple(8, 8), + tuple(9, 9) ); data.set("foo2", - tuple(0), - tuple(1), - tuple(2), - tuple(3), - tuple(4), - tuple(5), - tuple(6), - tuple(7), - tuple(8), - tuple(9) + tuple(0, 0), + tuple(1, 1), + tuple(2, 2), + tuple(3, 3), + tuple(4, 4), + tuple(5, 5), + tuple(6, 6), + tuple(7, 7), + tuple(8, 8), + tuple(9, 9) ); - pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int);"); - pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int);"); + pigServer.registerQuery("A = LOAD 'foo1' USING mock.Storage() as (x:int, y:int);"); + pigServer.registerQuery("B = LOAD 'foo2' USING mock.Storage() as (x:int, y:int);"); if (preSort) { pigServer.registerQuery("A = ORDER A BY x ASC;"); pigServer.registerQuery("B = ORDER B BY x ASC;"); @@ -638,20 +638,24 @@ public class TestSchemaTuple { if (!out.hasNext()) { throw new Exception("Output should have had more elements! Failed on element: " + i); } - assertEquals(tuple(i, i), out.next()); + assertEquals(tuple(i, i, i, i), out.next()); } assertFalse(out.hasNext()); - pigServer.registerQuery("STORE D INTO 'bar' USING mock.Storage();"); + pigServer.registerQuery("STORE D INTO 'bar1' USING mock.Storage();"); + pigServer.registerQuery("E = JOIN A by (x, y), B by (x, y) using '"+joinType+"';"); + pigServer.registerQuery("F = ORDER E BY $0 ASC;"); + pigServer.registerQuery("STORE F INTO 'bar2' USING mock.Storage();"); - List<Tuple> tuples = data.get("bar"); + List<Tuple> bar1 = data.get("bar1"); + List<Tuple> bar2 = data.get("bar2"); - if (tuples.size() != 10) { - throw new Exception("Output does not have enough elements! List: " + tuples); - } + assertEquals("Output does not have enough elements! List: " + bar1, 10, bar1.size()); + assertEquals("Output does not have enough elements! List: " + bar2, 10, bar2.size()); for (int i = 0; i < 10; i++) { - assertEquals(tuple(i, i), tuples.get(i)); + assertEquals(tuple(i, i, i, i), bar1.get(i)); + assertEquals(tuple(i, i, i, i), bar2.get(i)); } } Added: pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java (added) +++ pig/branches/spark/test/org/apache/pig/impl/builtin/TestHiveUDTF.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.builtin; + +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.Tuple; +import org.apache.pig.test.MiniGenericCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.Lists; + +import static org.apache.pig.builtin.mock.Storage.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestHiveUDTF { + private static PigServer pigServer = null; + private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); + + @BeforeClass + public static void oneTimeSetup() throws ExecException { + pigServer = new PigServer(ExecType.LOCAL); + } + + @AfterClass + public static void oneTimeTearDown() throws Exception { + cluster.shutDown(); + } + + @Test + public void testHiveUDTFOnBagInput() throws IOException { + Data data = resetData(pigServer); + + Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c"))); + + data.set("TestHiveUDTF", tuple); + + pigServer.registerQuery("define posexplode HiveUDTF('posexplode');"); + pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});"); + pigServer.registerQuery("B = foreach A generate posexplode(a0);"); + + Iterator<Tuple> result = pigServer.openIterator("B"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertTrue("Result doesn't contain the HiveUDTF output", + out.contains(tuple(bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c"))))); + assertTrue("Result doesn't contain an empty bag", + out.contains(tuple(bag()))); + } + + @Test + public void testHiveUDTFOnBagInputWithTwoProjection() throws IOException { + Data data = resetData(pigServer); + + Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c"))); + + data.set("TestHiveUDTF", tuple); + + pigServer.registerQuery("define posexplode HiveUDTF('posexplode');"); + pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});"); + pigServer.registerQuery("B = foreach A generate a0, posexplode(a0);"); + + Iterator<Tuple> result = pigServer.openIterator("B"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertTrue("Result doesn't contain the HiveUDTF output", + out.contains(tuple(bag(tuple("a"), tuple("b"), tuple("c")), bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c"))))); + assertTrue("Result doesn't contain an empty bag", + out.contains(tuple(null, bag()))); + } + + @Test + public void testHiveUDTFOnClose() throws IOException { + Data data = resetData(pigServer); + + List<Tuple> tuples = Arrays.asList(tuple("a", 1), tuple("a", 2), tuple("a", 3)); + + data.set("TestHiveUDTF", tuples); + + pigServer.registerQuery("define COUNT2 HiveUDTF('org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount2');"); + pigServer.registerQuery("a = load 'TestHiveUDTF' USING mock.Storage() as (name:chararray, id:int);"); + pigServer.registerQuery("b = foreach a generate flatten(COUNT2(name));"); + + Iterator<Tuple> result = pigServer.openIterator("b"); + List<Tuple> out = Lists.newArrayList(result); + + assertEquals(2, out.size()); + assertEquals(tuple(3), out.get(0)); + assertEquals(tuple(3), out.get(1)); + } + +} Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java (original) +++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParser.java Wed Feb 22 09:43:41 2017 @@ -652,4 +652,14 @@ public class TestQueryParser { public void testSplit2() throws Exception { shouldPass("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';"); } + + @Test + public void testBigDecimalParsing() throws Exception { + shouldPass("B = FILTER A BY $1 < 1234567890.123456789BD;"); + } + + @Test + public void testBigIntegerParsing() throws Exception { + shouldPass("B = FILTER A BY $1 < 1234567890123456789BI;"); + } } Modified: pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java (original) +++ pig/branches/spark/test/org/apache/pig/parser/TestQueryParserUtils.java Wed Feb 22 09:43:41 2017 @@ -19,10 +19,20 @@ package org.apache.pig.parser; import static org.junit.Assert.assertEquals; +import java.io.IOException; import java.util.Properties; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; import org.apache.pig.ExecType; +import org.apache.pig.LoadFunc; +import org.apache.pig.NonFSLoadFunc; +import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.test.Util; import org.junit.Test; @@ -72,43 +82,76 @@ public class TestQueryParserUtils { QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc); assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) { - // webhdfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - // har with webhfs - QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //har with viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + // webhdfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + // har with webhfs + QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + //viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - } + //har with viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + } + @Test + public void testNonFSLoadFunc() throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); + pigServer.registerQuery("A = load 'hbase://query/SELECT ID, NAME, DATE FROM HIRES WHERE DATE > TO_DATE(\"1990-12-21 05:55:00.000\")' using org.apache.pig.parser.TestQueryParserUtils$DummyNonFSLoader();"); + pigServer.shutdown(); } + /** + * Test class for testNonFSLoadFuncNoSetHdfsServersCall test case + */ + public static class DummyNonFSLoader extends LoadFunc implements NonFSLoadFunc { + + @Override + public void setLocation(String location, Job job) throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public InputFormat getInputFormat() throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public void prepareToRead(RecordReader reader, PigSplit split) throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public Tuple getNext() throws IOException { + throw new RuntimeException("Should not be called"); + } + + @Override + public String relativeToAbsolutePath(String location, Path curDir) throws IOException { + return location; + } + } } Added: pig/branches/spark/test/org/apache/pig/test/MiniCluster.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/MiniCluster.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/MiniCluster.java (added) +++ pig/branches/spark/test/org/apache/pig/test/MiniCluster.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.filecache.DistributedCache; +import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster; +import org.apache.pig.ExecType; +import org.apache.pig.backend.hadoop.executionengine.Launcher; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; + +/** + * This class builds a single instance of itself with the Singleton + * design pattern. While building the single instance, it sets up a + * mini cluster that actually consists of a mini DFS cluster and a + * mini MapReduce cluster on the local machine and also sets up the + * environment for Pig to run on top of the mini cluster. + */ +public class MiniCluster extends MiniGenericCluster { + private static final File CONF_DIR = new File("build/classes"); + private static final File CONF_FILE = new File(CONF_DIR, "hadoop-site.xml"); + + protected MiniMRYarnCluster m_mr = null; + private Configuration m_dfs_conf = null; + private Configuration m_mr_conf = null; + + /** + * @deprecated use {@link org.apache.pig.test.MiniGenericCluster.buildCluster() instead. + */ + @Deprecated + public static MiniCluster buildCluster() { + System.setProperty("test.exec.type", "mr"); + return (MiniCluster)MiniGenericCluster.buildCluster("mr"); + } + + @Override + public ExecType getExecType() { + return ExecType.MAPREDUCE; + } + + @Override + protected void setupMiniDfsAndMrClusters() { + try { + final int dataNodes = 4; // There will be 4 data nodes + final int taskTrackers = 4; // There will be 4 task tracker nodes + + System.setProperty("hadoop.log.dir", "build/test/logs"); + // Create the dir that holds hadoop-site.xml file + // Delete if hadoop-site.xml exists already + CONF_DIR.mkdirs(); + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + + // Builds and starts the mini dfs and mapreduce clusters + Configuration config = new Configuration(); + config.set("yarn.scheduler.capacity.root.queues", "default"); + config.set("yarn.scheduler.capacity.root.default.capacity", "100"); + m_dfs = new MiniDFSCluster(config, dataNodes, true, null); + m_fileSys = m_dfs.getFileSystem(); + m_dfs_conf = m_dfs.getConfiguration(0); + + //Create user home directory + m_fileSys.mkdirs(m_fileSys.getWorkingDirectory()); + + m_mr = new MiniMRYarnCluster("PigMiniCluster", taskTrackers); + m_mr.init(m_dfs_conf); + m_mr.start(); + + // Write the necessary config info to hadoop-site.xml + m_mr_conf = new Configuration(m_mr.getConfig()); + + m_conf = m_mr_conf; + m_conf.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + m_conf.unset(MRConfiguration.JOB_CACHE_FILES); + + m_conf.setInt(MRConfiguration.IO_SORT_MB, 200); + m_conf.set(MRConfiguration.CHILD_JAVA_OPTS, "-Xmx512m"); + + m_conf.setInt(MRConfiguration.SUMIT_REPLICATION, 2); + m_conf.setInt(MRConfiguration.MAP_MAX_ATTEMPTS, 2); + m_conf.setInt(MRConfiguration.REDUCE_MAX_ATTEMPTS, 2); + m_conf.set("dfs.datanode.address", "0.0.0.0:0"); + m_conf.set("dfs.datanode.http.address", "0.0.0.0:0"); + m_conf.set("pig.jobcontrol.sleep", "100"); + m_conf.writeXml(new FileOutputStream(CONF_FILE)); + m_fileSys.copyFromLocalFile(new Path(CONF_FILE.getAbsoluteFile().toString()), + new Path("/pigtest/conf/hadoop-site.xml")); + DistributedCache.addFileToClassPath(new Path("/pigtest/conf/hadoop-site.xml"), m_conf); + + System.err.println("XXX: Setting " + FileSystem.FS_DEFAULT_NAME_KEY + " to: " + m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + // Set the system properties needed by Pig + System.setProperty("cluster", m_conf.get(MRConfiguration.JOB_TRACKER)); + System.setProperty("namenode", m_conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + System.setProperty("junit.hadoop.conf", CONF_DIR.getPath()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void shutdownMiniMrClusters() { + // Delete hadoop-site.xml on shutDown + if(CONF_FILE.exists()) { + CONF_FILE.delete(); + } + if (m_mr != null) { m_mr.stop(); } + m_mr = null; + } + + static public Launcher getLauncher() { + return new MapReduceLauncher(); + } +}
