Modified: pig/branches/spark/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/nightly.conf?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/nightly.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/nightly.conf Fri Mar 4 18:17:39 2016 @@ -689,7 +689,24 @@ store c into ':OUTPATH:';\, store d into ':OUTPATH:'; #, 'java_params' => ['-Dpig.exec.mapPartAgg=true'] - }, + }, + + { + #PIG-4707 Streaming and empty input + + 'num' => 6, + 'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); + b = group a by name; + c = foreach b generate flatten(a); + d = stream c through `cat` as (name, age, gpa); + e = filter d by name == 'nonexistent'; + SPLIT e into f if gpa > 2, g otherwise; + store f into ':OUTPATH:.1'; + store g into ':OUTPATH:.2'; + #, + 'java_params' => ['-Dpig.exec.mapPartAgg=true'] + + }, ], }, @@ -1380,7 +1397,8 @@ store g into ':OUTPATH:';\, { 'name' => 'Union', 'tests' => [ - { + { + # Simple store 'num' => 1, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); @@ -1389,7 +1407,8 @@ d = foreach b generate name, age; e = union c, d; store e into ':OUTPATH:';\, }, - { + { + # Union + Groupby + Combiner 'num' => 2, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1399,15 +1418,20 @@ e = foreach d generate group, SUM(c.age) store e into ':OUTPATH:';\, }, { + # Union + Groupby + Secondary key partitioner 'num' => 3, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); c = union a, b; d = group c by name; -e = foreach d { f = order c by $1,$2; generate group, f; }; -store e into ':OUTPATH:';\, +d1 = group c by name; -- Two separate groupbys to ensure secondary key partitioner +e = foreach d { f = order c by age, gpa ; g = limit f 1; generate g; }; +h = foreach d1 { i = order c by age asc, gpa desc; j = limit i 1; generate j; }; +store e into ':OUTPATH:.1'; +store h into ':OUTPATH:.2';\, }, { + # Union + Orderby 'num' => 4, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1417,6 +1441,7 @@ store d into ':OUTPATH:';\, 'sortArgs' => ['-t', ' ', '-k', '1,1'], }, { + # Simple split + Union 'num' => 5, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1427,6 +1452,7 @@ store a2 into ':OUTPATH:.1'; store d into ':OUTPATH:.2';\, }, { + # Union + Join 'num' => 6, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1436,6 +1462,7 @@ e = join c by name, d by name PARALLEL 2 store e into ':OUTPATH:';\, }, { + # Union + Replicate Join left 'num' => 7, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1445,6 +1472,7 @@ e = join c by name, d by name using 'rep store e into ':OUTPATH:';\, }, { + # Union + Replicate Join right 'num' => 8, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1454,6 +1482,7 @@ e = join d by name, c by name using 'rep store e into ':OUTPATH:';\, }, { + # Union + Skewed Join left 'num' => 9, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1463,6 +1492,7 @@ e = join c by name, d by name using 'ske store e into ':OUTPATH:';\, }, { + # Union + Skewed Join right 'num' => 10, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1486,6 +1516,7 @@ i = foreach i generate group, SUM(h.age) store i into ':OUTPATH:';\, }, { + # Union + operators 'num' => 12, 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:double); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age:int, gpa:double); @@ -1496,6 +1527,7 @@ e = filter d by (name matches '.*MIKE.*' store e into ':OUTPATH:';\, }, { + # Union + Groupby + Replicate join 'num' => 13, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); @@ -1503,22 +1535,24 @@ c = union a, b; d = group c by name; e = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); f = join d by group, e by name using 'replicated'; -store f into ':OUTPATH:';\, +g = foreach f generate group, flatten(c), name, age, registration, contributions; +store g into ':OUTPATH:';\, }, - { ## Secondary Key + { + # Group by with Secondary Key + Union 'num' => 14, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age, gpa); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age, gpa); c = group a by name; d = foreach c { - sorted = order a by name,age; + sorted = order a by name,age,gpa; lmt = limit sorted 1; generate lmt as c1; }; e = foreach d generate flatten(c1) as (name:chararray, age, gpa); f = group b by name; g = foreach f { - sorted = order b by name,age; + sorted = order b by name,age,gpa; lmt = limit sorted 1; generate lmt as f1; }; @@ -1529,6 +1563,7 @@ store j into ':OUTPATH:';\, 'sortArgs' => ['-t', ' ', '-k', '1,1'], }, { + # Union + Cross 'num' => 15, 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float); b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float); @@ -1537,6 +1572,30 @@ d = cross a, c; e = union b, d; store e into ':OUTPATH:';\, }, + { + # Union + Distinct + 'num' => 16, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa); +c = union a, b; +d = distinct c; +store c into ':OUTPATH:';\, + }, + { + # Union + Groupby + FILTER + 'num' => 17, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float); +b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float); +c = group a by name; +d = group b by name; +e = union c, d; +e = foreach e generate $0, $1 as groupbag; +f = foreach e { + g = order $1 by age asc, gpa desc; + h = filter g by (gpa == 0 ? true : false); + generate group, h; }; +store f into ':OUTPATH:';\, + } ] }, { @@ -2189,12 +2248,12 @@ store D into ':OUTPATH:';\, { 'num' => 9, 'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k'; -b = order a by $0, $1; +b = order a by $0, $1, $2; c = limit b 1000/10; store c into ':OUTPATH:';\, 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k'; -b = order a by $0, $1; +b = order a by $0, $1, $2; c = limit b 100; store c into ':OUTPATH:';\, @@ -3054,6 +3113,64 @@ e = join a by name full outer, b by name store e into ':OUTPATH:';\, }, + # right outer join with fixed memory + { + 'num' => 11, + 'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'], + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join a by name right outer, b by name using 'skewed' parallel 8; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +e = join a by name right outer, b by name ; +store e into ':OUTPATH:';\, + + }, + # full outer join with empty left relation + { + 'num' => 12, + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = filter a by name=='abc'; +e = join b by name right outer, a by name using 'skewed' parallel 8; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = foreach a generate (null, null, null, name, age, gpa); +c = foreach b generate flatten($0); +store c into ':OUTPATH:';\, + + }, + # left outer join with fixed memory + { + 'num' => 13, + 'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'], + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +b = filter b by name < 'b'; +e = join a by name left outer, b by name using 'skewed' parallel 8; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +b = filter b by name < 'b'; +e = join a by name left outer, b by name ; +store e into ':OUTPATH:';\, + }, + # full outer join with fixed memory + { + 'num' => 14, + 'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'], + 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +b = filter b by name > 'm'; +e = join a by name full outer, b by name using 'skewed' parallel 8; +store e into ':OUTPATH:';\, + 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions); +b = filter b by name > 'm'; +e = join a by name full outer, b by name ; +store e into ':OUTPATH:';\, + + }, ] }, @@ -3440,7 +3557,9 @@ store b into ':OUTPATH:';\, 'tests' => [ { # test reading and writing out files with .bz2 extension + # relying on Hadoop's bzipcodec (for 0.23/2.X and after) 'num' => 1, + 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'], 'pig' => q\ a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); store a into ':OUTPATH:.intermediate.bz2'; @@ -3450,7 +3569,33 @@ store b into ':OUTPATH:';\, }, { # test reading and writing with .bz extension + # relying on Hadoop's bzipcodec (for 0.23/2.X and after) 'num' => 2, + 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'], + 'pig' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +store a into ':OUTPATH:.intermediate.bz'; +b = load ':OUTPATH:.intermediate.bz'; +store b into ':OUTPATH:';\, + 'notmq' => 1, + }, + { + # test reading and writing out files with .bz2 extension + # using Bzip2TextInputFormat. + 'num' => 3, + 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'], + 'pig' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +store a into ':OUTPATH:.intermediate.bz2'; +b = load ':OUTPATH:.intermediate.bz2'; +store b into ':OUTPATH:';\, + 'notmq' => 1, + }, + { + # test reading and writing with .bz extension + # using Bzip2TextInputFormat. + 'num' => 4, + 'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'], 'pig' => q\ a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); store a into ':OUTPATH:.intermediate.bz'; @@ -3509,6 +3654,18 @@ f = foreach e generate AVG(d.age) as avg y = foreach a generate age/c.avg, age/f.avg; store y into ':OUTPATH:';\, }, + { + # test scalar with split + 'num' => 5, + 'pig' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa); +b = group a all; +c = foreach b generate AVG(a.age) as avg, COUNT(a.age) as cnt; +d = foreach c generate avg; +e = group d by $0; +f = foreach e generate group, c.avg, c.cnt; +store f into ':OUTPATH:';\, + }, ] }, { @@ -3873,6 +4030,42 @@ store b into ':OUTPATH:';\, ] }, { + 'name' => 'JavaScriptUDFs', + 'tests' => [ + { + # test double square + 'num' => 1, + 'pig' => q\ +register ':SCRIPTHOMEPATH:/js/scriptingudf.js' using javascript as myfuncs; +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); +b = foreach a generate myfuncs.square(gpa); +store b into ':OUTPATH:';\, + 'verify_pig_script' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); +b = foreach a generate gpa * gpa; +store b into ':OUTPATH:';\, + }, + ] + }, + { + 'name' => 'GroovyUDFs', + 'tests' => [ + { + # test integer square + 'num' => 1, + 'pig' => q\ +register ':SCRIPTHOMEPATH:/groovy/scriptingudf.groovy' using groovy as myfuncs; +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); +b = foreach a generate myfuncs.square(age); +store b into ':OUTPATH:';\, + 'verify_pig_script' => q\ +a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double); +b = foreach a generate age * age; +store b into ':OUTPATH:';\, + }, + ] + }, + { 'name' => 'StreamingPythonUDFs', 'tests' => [ { @@ -4910,34 +5103,39 @@ store C into ':OUTPATH:';\, { # PIG-2286 'num' => 1, + 'floatpostprocess' => 1, + 'delimiter' => ' ', 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double, gpa:double); B = group A all; - C = foreach B generate group, COR(A.age, A.gpa); + C = foreach B generate group, flatten(COR(A.age, A.gpa)); store C into ':OUTPATH:';?, 'verify_pig_script' => q?set pig.exec.nocombiner true A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double); B = group A all; - C = foreach B generate group, COR(A.age, A.gpa); + C = foreach B generate group, flatten(COR(A.age, A.gpa)); store C into ':OUTPATH:';?, }, { # PIG-2286, with 3 inputs to COR 'num' => 2, + 'floatpostprocess' => 1, + 'delimiter' => ' ', 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double); B = foreach A generate age, gpa, gpa*gpa as gpa2; C = group B all; - D = foreach C generate group, COR(B.age, B.gpa, B.gpa2); + D = foreach C generate group, flatten(COR(B.age, B.gpa, B.gpa2)); store D into ':OUTPATH:';?, 'verify_pig_script' => q?set pig.exec.nocombiner true A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double); B = foreach A generate age, gpa, gpa*gpa as gpa2; C = group B all; - D = foreach C generate group, COR(B.age, B.gpa, B.gpa2); + D = foreach C generate group, flatten(COR(B.age, B.gpa, B.gpa2)); store D into ':OUTPATH:';?, }, { # PIG-2385 'num' => 3, 'pig_params' => ['-M'], 'floatpostprocess' => 1, + 'delimiter' => ' ', 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double); Z = group A all; Z1 = foreach Z generate AVG(A.gpa) as avg; @@ -5075,6 +5273,30 @@ store C into ':OUTPATH:';\, C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float); D = join C by name, B by name; store D into ':OUTPATH:';", + },{ + 'num' => 4, + 'pig' => "set pig.optimizer.rules.disabled PushUpFilter; + define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3'); + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + B = filter A by name == 'alice allen'; + C = group B all; + D = foreach C generate bb(B.name); + store D into ':HDFSTMP:/mybloom_4'; + exec; + define bloom Bloom(':HDFSTMP:/mybloom_4'); + E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + F = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double); + G = union E, F; + -- PushUpFilter is disabled to avoid filter being pushed before union + H = filter G by bloom(name); + store H into ':OUTPATH:';", + 'notmq' => 1, + 'verify_pig_script' => " + A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + B = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double); + C = UNION A,B; + D = filter C by name == 'alice allen'; + store D into ':OUTPATH:';", } ], },{ @@ -5605,6 +5827,119 @@ store a into ':OUTPATH:';\, \, } ] + }, + { + 'name' => 'HiveUDF', + 'tests' => [ + { + # HiveUDF extends UDF + 'num' => 1, + 'pig' => q\ + define sin HiveUDF('sin'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate sin(gpa); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate SIN(gpa); + store B into ':OUTPATH:';\, + }, + { + # HiveUDF extends GenericUDF + 'num' => 2, + 'pig' => q\ + define upper HiveUDF('upper'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate upper(name); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate UPPER(name); + store B into ':OUTPATH:';\, + }, + { + # HiveUDTF + 'num' => 3, + 'pig' => q\ + define explode HiveUDTF('explode'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(explode(b)); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(b); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends GenericUDAF, with null handling + 'num' => 4, + 'pig' => q\ + define avg HiveUDAF('avg'); + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, avg(A.age); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, AVG(A.age); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends UDAF + 'num' => 5, + 'pig' => q\ + define percentile HiveUDAF('percentile'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate name, age, 0.5 as perc; + C = group B by name; + D = foreach C generate group, percentile(B.(age, perc)); + store D into ':OUTPATH:';\, + 'verify_pig_script' => q\ + register :FUNCPATH:/datafu.jar + define Quartile datafu.pig.stats.Quantile('0.5'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = group A by name; + C = foreach B { + sorted = order A by age; + generate group, flatten(Quartile(sorted.age)); + } + store C into ':OUTPATH:';\, + }, + { + # Constant folding and ship jars + 'num' => 6, + 'pig' => q# + sh echo -e "zach young\nzach zipper" > names.txt + define in_file HiveUDF('in_file', '(null, "names.txt")'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate in_file(name, 'names.txt'); + store B into ':OUTPATH:';#, + 'verify_pig_script' => q#register :PIGGYBANKJAR: + sh echo -e "zach young\nzach zipper" > names.txt + rmf :INPATH:/singlefile/names.txt + fs -put names.txt :INPATH:/singlefile/names.txt + define LookupInFiles org.apache.pig.piggybank.evaluation.string.LookupInFiles(); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate LookupInFiles(name, ':INPATH:/singlefile/names.txt'); + C = foreach B generate (boolean)$0; + store C into ':OUTPATH:'; + fs -rm :INPATH:/singlefile/names.txt# + }, + { + # Custom Hive UDF and MapredContext + 'num' => 7, + 'pig' => q\set mapred.max.split.size '100000000' + register :FUNCPATH:/testudf.jar; + define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate DummyContextUDF(age); + store B into ':OUTPATH:';\, + 'expected_err_regex' => "Encountered Warning UDF_WARNING_1 4610 time.*", + } + ] } ], },
Modified: pig/branches/spark/test/e2e/pig/tests/streaming.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/streaming.conf?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/streaming.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/streaming.conf Fri Mar 4 18:17:39 2016 @@ -39,13 +39,14 @@ $cfg = { 'num' => 1, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; -B = foreach A generate $2, $1, $0; -C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'`; +B = foreach A generate $2, $1; +C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $2, $1}'`; store C into ':OUTPATH:';#, 'pig_win' => q# +DEFINE CMD `awk -F "\\\t" "{print $2, $1}"` output(stdout using PigStreaming(' ')); A = load ':INPATH:/singlefile/studenttab10k'; -B = foreach A generate $2, $1, $0; -C = stream B through `awk "BEGIN {FS = \\\\"\t\\\\"; OFS = \\\\"\t\\\\"} {print $3, $2, $1}"`; +B = foreach A generate $2, $1; +C = stream B through CMD; store C into ':OUTPATH:';#, 'sql' => "select name, age, gpa from studenttab10k;", }, @@ -54,16 +55,15 @@ store C into ':OUTPATH:';#, 'num' => 2, 'pig' => q# A = load ':INPATH:/singlefile/studenttab10k'; -B = foreach A generate $2, $1, $0; -C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'` as (name, age, gpa); -D = foreach C generate name, age; -store D into ':OUTPATH:';#, +B = foreach A generate $2, $1; +C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $2, $1}'` as (age, gpa); +store C into ':OUTPATH:';#, 'pig_win' => q# +DEFINE CMD `awk -F "\\\t" "{print $2, $1}"` output(stdout using PigStreaming(' ')); A = load ':INPATH:/singlefile/studenttab10k'; -B = foreach A generate $2, $1, $0; -C = stream B through `awk "BEGIN {FS = \\\\"\t\\\\"; OFS = \\\\"\t\\\\"} {print $3, $2, $1}"` as (name, age, gpa); -D = foreach C generate name, age; -store D into ':OUTPATH:';#, +B = foreach A generate $2, $1; +C = stream B through CMD as (age, gpa); +store C into ':OUTPATH:';#, 'sql' => "select name, age from studenttab10k;", }, { @@ -432,7 +432,7 @@ C = stream B through CMD1; D = stream C through CMD2; store D into ':OUTPATH:';#, 'pig_win' => q# -define CMD1 `perl -ne "print $_;print STDERR "stderr $_";"`; +define CMD1 `perl -ne "print $_;print STDERR 'stderr $_';"`; define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl'); A = load ':INPATH:/singlefile//studenttab10k'; B = stream A through CMD1; Modified: pig/branches/spark/test/e2e/pig/tests/turing_jython.conf URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/turing_jython.conf?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/tests/turing_jython.conf (original) +++ pig/branches/spark/test/e2e/pig/tests/turing_jython.conf Fri Mar 4 18:17:39 2016 @@ -527,6 +527,7 @@ result = P.bind({'in1':input1, 'in2':inp }, { #11.22 1 illustrate() on a complex query 'num' => 2 + ,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez ,'pig' => q\#!/usr/bin/python from org.apache.pig.scripting import Pig Modified: pig/branches/spark/test/e2e/pig/udfs/java/build.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/udfs/java/build.xml?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/e2e/pig/udfs/java/build.xml (original) +++ pig/branches/spark/test/e2e/pig/udfs/java/build.xml Fri Mar 4 18:17:39 2016 @@ -31,6 +31,9 @@ <fileset dir="${pig.dir}"> <include name="pig*-core-*.jar"/> </fileset> + <fileset dir="${pig.dir}/lib" erroronmissingdir="false"> + <include name="*.jar"/> + </fileset> <fileset dir="${hadoop.common.dir}" erroronmissingdir="false"> <include name="hadoop-common*.jar"/> </fileset> Modified: pig/branches/spark/test/org/apache/pig/TestMain.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestMain.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/TestMain.java (original) +++ pig/branches/spark/test/org/apache/pig/TestMain.java Fri Mar 4 18:17:39 2016 @@ -39,6 +39,9 @@ import org.apache.pig.tools.parameters.P import org.apache.pig.tools.pigstats.PigStats; import org.junit.Test; +import java.nio.charset.Charset; +import com.google.common.io.Files; + public class TestMain { private Log log = LogFactory.getLog(TestMain.class); @@ -126,6 +129,29 @@ public class TestMain { } } + @Test + public void testlog4jConf() throws Exception { + Properties properties = Main.log4jConfAsProperties(null); + assertTrue(properties.isEmpty()); + properties = Main.log4jConfAsProperties(""); + assertTrue(properties.isEmpty()); + // Test for non-existent file + properties = Main.log4jConfAsProperties("non-existing-" + System.currentTimeMillis()); + assertTrue(properties.isEmpty()); + + // Create tmp file in under build/test/classes + File tmpFile = File.createTempFile("pig-log4jconf", ".properties", new File("build/test/classes")); + tmpFile.deleteOnExit(); + Files.write("A=B", tmpFile, Charset.forName("UTF-8")); + // Read it as a resource + properties = Main.log4jConfAsProperties(tmpFile.getName()); + assertEquals("B", properties.getProperty("A")); + // Read it as a file + properties = Main.log4jConfAsProperties(tmpFile.getAbsolutePath()); + assertEquals("B", properties.getProperty("A")); + } + + public static class TestNotificationListener2 extends TestNotificationListener { protected boolean hadArgs = false; public TestNotificationListener2() {} Modified: pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java (original) +++ pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java Fri Mar 4 18:17:39 2016 @@ -38,30 +38,26 @@ public class TestInputSizeReducerEstimat @Test public void testGetInputSizeFromFs() throws Exception { long size = 2L * 1024 * 1024 * 1024; + POLoad load1 = createPOLoadWithSize(size, new PigStorage()); + POLoad load2 = createPOLoadWithSize(size, new PigStorageWithStatistics()); Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize( - CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())), - new org.apache.hadoop.mapreduce.Job(CONF))); + CONF, Lists.newArrayList(load1), new org.apache.hadoop.mapreduce.Job(CONF))); Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize( - CONF, - Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())), - new org.apache.hadoop.mapreduce.Job(CONF))); + CONF, Lists.newArrayList(load2), new org.apache.hadoop.mapreduce.Job(CONF))); Assert.assertEquals(size * 2, InputSizeReducerEstimator.getTotalInputFileSize( - CONF, - Lists.newArrayList( - createPOLoadWithSize(size, new PigStorage()), - createPOLoadWithSize(size, new PigStorageWithStatistics())), - new org.apache.hadoop.mapreduce.Job(CONF))); + CONF, Lists.newArrayList(load1, load2), new org.apache.hadoop.mapreduce.Job(CONF))); // Negative test - PIG-3754 - POLoad poLoad = createPOLoadWithSize(size, new PigStorage()); - poLoad.setLFile(new FileSpec("hbase://users", null)); + load1.setLFile(new FileSpec("hbase://users", null)); - Assert.assertEquals(-1, InputSizeReducerEstimator.getTotalInputFileSize( - CONF, - Collections.singletonList(poLoad), - new org.apache.hadoop.mapreduce.Job(CONF))); + Assert.assertEquals(0, InputSizeReducerEstimator.getTotalInputFileSize( + CONF, Collections.singletonList(load1), new org.apache.hadoop.mapreduce.Job(CONF))); + + // Skip non-hdfs input - PIG-4679 + Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize( + CONF, Lists.newArrayList(load1, load2), new org.apache.hadoop.mapreduce.Job(CONF))); } @Test Modified: pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java Fri Mar 4 18:17:39 2016 @@ -34,8 +34,11 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.Iterator; +import com.google.common.io.Closeables; import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; @@ -52,12 +55,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.builtin.mock.Storage.Data; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.Utils; import org.apache.pig.impl.util.avro.AvroBagWrapper; @@ -131,8 +134,8 @@ public class TestAvroStorage { } @BeforeClass - public static void setup() throws ExecException, IOException { - pigServerLocal = new PigServer(ExecType.LOCAL); + public static void setup() throws Exception { + pigServerLocal = new PigServer(Util.getLocalTestMode()); Util.deleteDirectory(new File(outbasedir)); generateInputFiles(); } @@ -426,6 +429,21 @@ public class TestAvroStorage { } @Test + public void testLoadRecordsSpecifyFullSchemaFromClass() throws Exception { + final String input = basedir + "data/avro/uncompressed/records.avro"; + final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro"; + testAvroStorage(true, basedir + "code/pig/identity.pig", + ImmutableMap.of( + "INFILE", input, + "OUTFILE", createOutputName(), + "AVROSTORAGE_IN_2", "-c org.apache.pig.builtin.avro.code.java.RecordPojo", + "AVROSTORAGE_OUT_1", "''", + "AVROSTORAGE_OUT_2", "-c org.apache.pig.builtin.avro.code.java.RecordPojo") + ); + verifyResults(createOutputName(),check); + } + + @Test public void testLoadRecordsSpecifyFullSchemaFromFile() throws Exception { final String input = basedir + "data/avro/uncompressed/records.avro"; final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro"; @@ -828,7 +846,7 @@ public class TestAvroStorage { @Test public void testRetrieveDataFromMap() throws Exception { - pigServerLocal = new PigServer(ExecType.LOCAL); + pigServerLocal = new PigServer(Util.getLocalTestMode()); Data data = resetData(pigServerLocal); Map<String, String> mapv1 = new HashMap<String, String>(); mapv1.put("key1", "v11"); @@ -906,6 +924,96 @@ public class TestAvroStorage { assertEquals("bar", v); } + @Test + public void testAvroMapWrapper() throws Exception { + final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>(); + for (String fn : avroSchemas) { + final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro"; + int i = 0; + for (GenericContainer r : readAvroData(avro)) { + m.put(new Utf8(fn + i), r); + i += 1; + } + } + final AvroMapWrapper amw = new AvroMapWrapper(m); + // Test out all the interfaces the AvroMapWrapper supports + for (Object o : amw.values()) { + assertTrue(isValidPigObject(o)); + } + for (CharSequence k : amw.keySet()) { + assertTrue(isValidPigObject(k)); + assertTrue(isValidPigObject(amw.get(k))); + } + for (Map.Entry<CharSequence, Object> e : amw.entrySet()) { + assertTrue(isValidPigObject(e.getKey())); + assertTrue(isValidPigObject(e.getValue())); + } + } + + private boolean isValidPigObject(Object o) { + if (o == null) { + return true; + } + switch (DataType.findType(o)) { + case DataType.TUPLE: + for (Object inner : ((Tuple) o).getAll()) { + if (!isValidPigObject(inner)) { + return false; + } + } + return true; + case DataType.BAG: + final Iterator<Tuple> bi = ((DataBag) o).iterator(); + while (bi.hasNext()) { + if (!isValidPigObject(bi.next())) { + return false; + } + } + return true; + case DataType.MAP: + for (Object inner : ((Map) o).values()) { + if (!isValidPigObject(inner)) { + return false; + } + } + return true; + case DataType.BIGDECIMAL: + case DataType.BIGINTEGER: + case DataType.BOOLEAN: + case DataType.BYTE: + case DataType.BYTEARRAY: + case DataType.CHARARRAY: + case DataType.DATETIME: + case DataType.DOUBLE: + case DataType.FLOAT: + case DataType.GENERIC_WRITABLECOMPARABLE: + case DataType.INTEGER: + case DataType.LONG: + return true; + case DataType.ERROR: + default: + return false; + } + } + + private List<GenericContainer> readAvroData(String path) throws IOException { + final FileSystem fs = FileSystem.getLocal(new Configuration()); + final Path filePath = new Path(path); + assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath)); + final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>(); + final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader); + final List<GenericContainer> avroData = new ArrayList<GenericContainer>(); + try { + while (in.hasNext()) { + GenericContainer obj = in.next(); + avroData.add(obj); + } + } finally { + Closeables.closeQuietly(in); + } + return avroData; + } + private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException { pigServerLocal.setBatchOn(); Modified: pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java Fri Mar 4 18:17:39 2016 @@ -44,7 +44,7 @@ public class TestPluckTuple { } @Test - public void testSchema() throws Exception { + public void testStartsWith() throws Exception { String query = "a = load 'a' as (x:int,y:chararray,z:long);" + "b = load 'b' as (x:int,y:chararray,z:long);" + "c = join a by x, b by x;" + @@ -55,6 +55,39 @@ public class TestPluckTuple { } @Test + public void testNegativeStartsWith() throws Exception { + String query = "a = load 'a' as (x:int,y:chararray,z:long);" + + "b = load 'b' as (x:int,y:chararray,z:long);" + + "c = join a by x, b by x;" + + "define pluck PluckTuple('a::','false');" + + "d = foreach c generate flatten(pluck(*));"; + pigServer.registerQuery(query); + assertTrue(Schema.equals(pigServer.dumpSchema("b"), pigServer.dumpSchema("d"), false, true)); + } + + @Test + public void testPatternMatches() throws Exception { + String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" + + "a2 = load 'a2' as (x:int,y:chararray,z:long);" + + "b = join a1 by x, a2 by x;" + + "define pluck PluckTuple('a[2|3]::.*');" + + "c = foreach b generate flatten(pluck(*));"; + pigServer.registerQuery(query); + assertTrue(Schema.equals(pigServer.dumpSchema("a2"), pigServer.dumpSchema("c"), false, true)); + } + + @Test + public void testNegativePatternMatches() throws Exception { + String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" + + "a2 = load 'a2' as (x:int,y:chararray,z:long);" + + "b = join a1 by x, a2 by x;" + + "define pluck PluckTuple('a[2|3]::.*','false');" + + "c = foreach b generate flatten(pluck(*));"; + pigServer.registerQuery(query); + assertTrue(Schema.equals(pigServer.dumpSchema("a1"), pigServer.dumpSchema("c"), false, true)); + } + + @Test public void testOutput() throws Exception { Data data = resetData(pigServer); @@ -87,4 +120,62 @@ public class TestPluckTuple { assertEquals(exp2, it.next()); assertFalse(it.hasNext()); } + + @Test + public void testTwoPluckTuples() throws Exception { + Data data = resetData(pigServer); + + data.set("a", + Utils.getSchemaFromString("xa:int,yb:chararray,zc:long"), + tuple(1, "hey", 3L), + tuple(2, "woah", 4L) + ); + + String query = "a = load 'a' using mock.Storage();" + + "define pluck1 PluckTuple('.a');" + + "define pluck2 PluckTuple('.b');" + + "b = foreach a generate flatten(pluck1(*)), flatten(pluck2(*));"; + pigServer.registerQuery(query); + Iterator<Tuple> it = pigServer.openIterator("b"); + assertTrue(it.hasNext()); + assertEquals(tuple(1,"hey"), it.next()); + assertTrue(it.hasNext()); + assertEquals(tuple(2,"woah"), it.next()); + assertFalse(it.hasNext()); + } + + @Test + public void testNegativeOutput() throws Exception { + Data data = resetData(pigServer); + + Tuple exp1 = tuple(1, "sasf", 5L); + Tuple exp2 = tuple(2, "woah", 6L); + + data.set("a", + Utils.getSchemaFromString("x:int,y:chararray,z:long"), + tuple(1, "hey", 2L), + tuple(2, "woah", 3L), + tuple(3, "c", 4L) + ); + data.set("b", + Utils.getSchemaFromString("x:int,y:chararray,z:long"), + exp1, + exp2, + tuple(4, "c", 7L) + ); + + String query = "a = load 'a' using mock.Storage();" + + "b = load 'b' using mock.Storage();" + + "c = join a by x, b by x;" + + "define pluck PluckTuple('a::','false');" + + "d = foreach c generate flatten(pluck(*));"; + pigServer.registerQuery(query); + Iterator<Tuple> it = pigServer.openIterator("d"); + assertTrue(it.hasNext()); + assertEquals(exp1, it.next()); + assertTrue(it.hasNext()); + assertEquals(exp2, it.next()); + assertFalse(it.hasNext()); + } + } \ No newline at end of file Modified: pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java Fri Mar 4 18:17:39 2016 @@ -33,25 +33,34 @@ import org.junit.Test; public class TestTOP { private static TupleFactory tupleFactory_ = TupleFactory.getInstance(); private static BagFactory bagFactory_ = BagFactory.getInstance(); - private static Tuple inputTuple_ = tupleFactory_.newTuple(3); - private static DataBag dBag_ = bagFactory_.newDefaultBag(); + private static Tuple inputTuple_ = null; @BeforeClass public static void setup() throws ExecException { + inputTuple_ = fillTuple(0, 100); + } + + public static Tuple fillTuple(int start, int stop) throws ExecException { + Tuple tuple = tupleFactory_.newTuple(3); + + DataBag dBag = bagFactory_.newDefaultBag(); + // set N = 10 i.e retain top 10 tuples - inputTuple_.set(0, 10); + tuple.set(0, 10); // compare tuples by field number 1 - inputTuple_.set(1, 1); + tuple.set(1, 1); // set the data bag containing the tuples - inputTuple_.set(2, dBag_); + tuple.set(2, dBag); // generate tuples of the form (group-1, 1), (group-2, 2) ... - for (long i = 0; i < 100; i++) { + for (long i = start; i < stop; i++) { Tuple nestedTuple = tupleFactory_.newTuple(2); nestedTuple.set(0, "group-" + i); nestedTuple.set(1, i); - dBag_.add(nestedTuple); + dBag.add(nestedTuple); } + + return tuple; } @Test @@ -66,6 +75,26 @@ public class TestTOP { assertEquals(outBag.size(), 10L); checkItemsLT(outBag, 1, 10); } + + @Test + public void testTOPAccumulator() throws Exception { + Tuple firstTuple = fillTuple(0, 50); + Tuple secondTuple = fillTuple(50, 100); + + TOP top = new TOP("DESC"); + top.accumulate(firstTuple); + top.accumulate(secondTuple); + DataBag outBag = top.getValue(); + assertEquals(outBag.size(), 10L); + checkItemsGT(outBag, 1, 89); + + top = new TOP("ASC"); + top.accumulate(firstTuple); + top.accumulate(secondTuple); + outBag = top.getValue(); + assertEquals(outBag.size(), 10L); + checkItemsLT(outBag, 1, 10); + } @Test public void testTopAlgebraic() throws IOException { Modified: pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java Fri Mar 4 18:17:39 2016 @@ -17,13 +17,15 @@ */ package org.apache.pig.builtin.mock; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.assertFalse; -import static junit.framework.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; import static org.apache.pig.builtin.mock.Storage.resetData; import static org.apache.pig.builtin.mock.Storage.schema; import static org.apache.pig.builtin.mock.Storage.tuple; +import static org.apache.pig.builtin.mock.Storage.bag; +import static org.apache.pig.builtin.mock.Storage.map; import java.util.HashSet; import java.util.List; @@ -47,7 +49,9 @@ public class TestMockStorage { data.set("foo", tuple("a"), tuple("b"), - tuple("c") + tuple("c"), + tuple(map("d","e", "f","g")), + tuple(bag(tuple("h"),tuple("i"))) ); pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();"); @@ -57,8 +61,10 @@ public class TestMockStorage { assertEquals(tuple("a"), out.get(0)); assertEquals(tuple("b"), out.get(1)); assertEquals(tuple("c"), out.get(2)); + assertEquals(tuple(map("f", "g", "d", "e" )), out.get(3)); + assertEquals(tuple(bag(tuple("h"),tuple("i"))), out.get(4)); } - + @Test public void testMockSchema() throws Exception { PigServer pigServer = new PigServer(Util.getLocalTestMode()); Modified: pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original) +++ pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Fri Mar 4 18:17:39 2016 @@ -29,6 +29,7 @@ import java.util.List; import org.apache.pig.PigServer; import org.apache.pig.builtin.mock.Storage.Data; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.test.MiniGenericCluster; @@ -46,6 +47,7 @@ import org.junit.runner.RunWith; @RunWith(OrderedJUnit4Runner.class) @TestOrder({ "testPythonUDF_onCluster", + "testPythonUDF_withBytearrayAndBytes_onCluster", "testPythonUDF__allTypes", "testPythonUDF__withBigDecimal", "testPythonUDF", @@ -111,6 +113,41 @@ public class TestStreamingUDF { assertEquals(expected0, actual0); assertEquals(expected1, actual1); } + + @Test + public void testPythonUDF_withBytearrayAndBytes_onCluster() throws Exception { + pigServerMapReduce = new PigServer(cluster.getExecType(), cluster.getProperties()); + + + String[] pythonScript = { + "from pig_util import outputSchema", + "import os", + "@outputSchema('f:bytearray')", + "def foo(bar):", + " return bytearray(os.urandom(1000))" + }; + + Util.createLocalInputFile( "pyfilewBaB.py", pythonScript); + + String[] input = { + "field1" + }; + Util.createLocalInputFile("testTupleBaB", input); + Util.copyFromLocalToCluster(cluster, "testTupleBaB", "testTupleBaB"); + + pigServerMapReduce.registerQuery("REGISTER 'pyfilewBaB.py' USING streaming_python AS pf;"); + pigServerMapReduce.registerQuery("A = LOAD 'testTupleBaB' as (b:chararray);"); + pigServerMapReduce.registerQuery("B = FOREACH A generate pf.foo(b);"); + + Iterator<Tuple> iter = pigServerMapReduce.openIterator("B"); + assertTrue(iter.hasNext()); + Object result = iter.next().get(0); + + //Mostly we're happy we got a result w/o throwing an exception, but we'll + //do a basic check. + assertTrue(result instanceof DataByteArray); + assertEquals(1000, ((DataByteArray)result).size()); + } @Test public void testPythonUDF() throws Exception { @@ -159,7 +196,6 @@ public class TestStreamingUDF { }; Util.createLocalInputFile( "pyfileNL.py", pythonScript); - Data data = resetData(pigServerLocal); Tuple t0 = tf.newTuple(2); t0.set(0, "field10"); Modified: pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java (original) +++ pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java Fri Mar 4 18:17:39 2016 @@ -369,6 +369,18 @@ public class TestPigStreamingUDF { } @Test + public void testDeserialize__emptyMap() throws IOException { + byte[] input = "|[_|]_|_".getBytes(); + FieldSchema fs = new FieldSchema("", DataType.MAP); + PigStreamingUDF sp = new PigStreamingUDF(fs); + + Map<String, String> expectedOutput = new TreeMap<String, String>(); + + Object out = sp.deserialize(input, 0, input.length); + Assert.assertEquals(tf.newTuple(expectedOutput), out); + } + + @Test public void testDeserialize__bug() throws Exception { byte[] input = "|(_|-_|,_32|,_987654321098765432|,_987654321098765432|)_|_".getBytes(); Modified: pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java (original) +++ pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java Fri Mar 4 18:17:39 2016 @@ -65,6 +65,20 @@ public class TestStreamingUDFOutputHandl Assert.assertEquals(tf.newTuple("abc\ndef\nghi\njkl"), t); } + @Test + public void testGetValue__earlyNewLine() throws Exception{ + FieldSchema fs = new FieldSchema("", DataType.CHARARRAY); + String data = "\na|_\n"; + + PigStreamingUDF deserializer = new PigStreamingUDF(fs); + OutputHandler outty = new StreamingUDFOutputHandler(deserializer); + outty.bindTo(null, getIn(data), 0, 0); + + Tuple t = outty.getNext(); + + Assert.assertEquals(tf.newTuple("\na"), t); + } + private BufferedPositionedInputStream getIn(String input) throws UnsupportedEncodingException { InputStream stream = new ByteArrayInputStream(input.getBytes("UTF-8")); BufferedPositionedInputStream result = new BufferedPositionedInputStream(stream); Modified: pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java (original) +++ pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java Fri Mar 4 18:17:39 2016 @@ -56,7 +56,7 @@ public class TestImplicitSplitOnTuple { "D2 = FOREACH tuplified GENERATE tuplify.memberId as memberId, tuplify.shopId as shopId, score AS score;"+ "J = JOIN D1 By shopId, D2 by shopId;"+ "K = FOREACH J GENERATE D1::memberId AS member_id1, D2::memberId AS member_id2, D1::shopId as shop;"+ - "L = ORDER K by shop;"+ + "L = ORDER K by shop, member_id1, member_id2;"+ "STORE L into 'output' using mock.Storage;"); List<Tuple> list = data.get("output"); assertEquals("list: "+list, 20, list.size()); Modified: pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java Fri Mar 4 18:17:39 2016 @@ -49,7 +49,7 @@ public class TestAccumulator { private static final String INPUT_FILE2 = "AccumulatorInput2.txt"; private static final String INPUT_FILE3 = "AccumulatorInput3.txt"; private static final String INPUT_FILE4 = "AccumulatorInput4.txt"; - private static final String INPUT_DIR = "build/test/data"; + private static final String INPUT_DIR = Util.getTestDirectory(TestAccumulator.class); private static PigServer pigServer; private static Properties properties; @@ -88,7 +88,7 @@ public class TestAccumulator { } private static void createFiles() throws IOException { - new File(INPUT_DIR).mkdir(); + new File(INPUT_DIR).mkdirs(); PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1)); @@ -558,6 +558,21 @@ public class TestAccumulator { } } + // Pig 4365 + @Test + public void testAccumWithTOP() throws IOException{ + pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);"); + pigServer.registerQuery("B = group A all;"); + pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate flatten(C); }"); + + Iterator<Tuple> iter = pigServer.openIterator("D"); + + List<Tuple> expected = Util.getTuplesFromConstantTupleStrings( + new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", "(400,null)", "(400,null)" }); + + Util.checkQueryOutputsAfterSort(iter, expected); + } + @Test public void testAccumWithMultiBuiltin() throws IOException{ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);"); Modified: pig/branches/spark/test/org/apache/pig/test/TestAssert.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestAssert.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestAssert.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestAssert.java Fri Mar 4 18:17:39 2016 @@ -116,13 +116,13 @@ public class TestAssert { try { pigServer.openIterator("A"); } catch (FrontendException fe) { - if (!Util.isSparkExecType(Util.getLocalTestMode())) { + if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") + || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { Assert.assertTrue(fe.getCause().getMessage().contains( - "Job terminated with anomalous status FAILED")); - } - else { + "Assertion violated: i should be greater than 1")); + } else { Assert.assertTrue(fe.getCause().getMessage().contains( - "i should be greater than 1")); + "Job terminated with anomalous status FAILED")); } } } @@ -148,13 +148,13 @@ public class TestAssert { try { pigServer.openIterator("A"); } catch (FrontendException fe) { - if (!Util.isSparkExecType(Util.getLocalTestMode())) { + if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ") + || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) { Assert.assertTrue(fe.getCause().getMessage().contains( - "Job terminated with anomalous status FAILED")); - } - else { + "Assertion violated: i should be greater than 1")); + } else { Assert.assertTrue(fe.getCause().getMessage().contains( - "i should be greater than 1")); + "Job terminated with anomalous status FAILED")); } } } Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Fri Mar 4 18:17:39 2016 @@ -28,6 +28,7 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Properties; @@ -42,19 +43,50 @@ import org.apache.hadoop.mapreduce.Input import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.test.utils.CloseAwareFSDataInputStream; +import org.apache.pig.test.utils.CloseAwareOutputStream; import org.apache.tools.bzip2r.CBZip2InputStream; import org.apache.tools.bzip2r.CBZip2OutputStream; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class TestBZip { private static Properties properties; private static MiniGenericCluster cluster; + @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.") + public static Iterable<Object[]> data() { + if ( HadoopShims.isHadoopYARN() ) { + return Arrays.asList(new Object[][] { + { false }, + { true } + }); + } else { + return Arrays.asList(new Object[][] { + { false } + }); + } + } + + public TestBZip (Boolean useBzipFromHadoop) { + properties = cluster.getProperties(); + properties.setProperty("pig.bzip.use.hadoop.inputformat", useBzipFromHadoop.toString()); + } + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + @BeforeClass public static void oneTimeSetUp() throws Exception { cluster = MiniGenericCluster.buildCluster(); @@ -73,10 +105,9 @@ public class TestBZip { public void testBzipInPig() throws Exception { PigServer pig = new PigServer(cluster.getExecType(), properties); - File in = File.createTempFile("junit", ".bz2"); - in.deleteOnExit(); + File in = folder.newFile("junit-in.bz2"); - File out = File.createTempFile("junit", ".bz2"); + File out = folder.newFile("junit-out.bz2"); out.delete(); String clusterOutput = Util.removeColon(out.getAbsolutePath()); @@ -121,9 +152,6 @@ public class TestBZip { for (int j = 1; j < 100; j++) { assertEquals(new Integer(j), map.get(j)); } - - in.delete(); - Util.deleteFile(cluster, clusterOutput); } /** @@ -133,10 +161,9 @@ public class TestBZip { public void testBzipInPig2() throws Exception { PigServer pig = new PigServer(cluster.getExecType(), properties); - File in = File.createTempFile("junit", ".bz2"); - in.deleteOnExit(); + File in = folder.newFile("junit-in.bz2"); - File out = File.createTempFile("junit", ".bz2"); + File out = folder.newFile("junit-out.bz2"); out.delete(); String clusterOutput = Util.removeColon(out.getAbsolutePath()); @@ -181,9 +208,6 @@ public class TestBZip { for (int j = 1; j < 100; j++) { assertEquals(new Integer(j), map.get(j)); } - - in.delete(); - out.delete(); } //see PIG-2391 @@ -197,10 +221,9 @@ public class TestBZip { }; // bzip compressed input - File in = File.createTempFile("junit", ".bz2"); + File in = folder.newFile("junit-in.bz2"); String compressedInputFileName = in.getAbsolutePath(); String clusterCompressedFilePath = Util.removeColon(compressedInputFileName); - in.deleteOnExit(); try { CBZip2OutputStream cos = @@ -230,7 +253,6 @@ public class TestBZip { it2.next(); } } finally { - in.delete(); Util.deleteFile(cluster, "intermediate.bz"); Util.deleteFile(cluster, "final.bz"); } @@ -249,9 +271,8 @@ public class TestBZip { }; // bzip compressed input - File in = File.createTempFile("junit", ".bz2"); + File in = folder.newFile("junit-in.bz2"); String compressedInputFileName = in.getAbsolutePath(); - in.deleteOnExit(); String clusterCompressedFilePath = Util.removeColon(compressedInputFileName); String unCompressedInputFileName = "testRecordDelims-uncomp.txt"; @@ -291,7 +312,6 @@ public class TestBZip { assertFalse(it2.hasNext()); } finally { - in.delete(); Util.deleteFile(cluster, unCompressedInputFileName); Util.deleteFile(cluster, clusterCompressedFilePath); } @@ -305,10 +325,9 @@ public class TestBZip { public void testEmptyBzipInPig() throws Exception { PigServer pig = new PigServer(cluster.getExecType(), properties); - File in = File.createTempFile("junit", ".tmp"); - in.deleteOnExit(); + File in = folder.newFile("junit-in.tmp"); - File out = File.createTempFile("junit", ".bz2"); + File out = folder.newFile("junit-out.bz2"); out.delete(); String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath()); @@ -336,10 +355,6 @@ public class TestBZip { pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';"); pig.openIterator("B"); - - in.delete(); - Util.deleteFile(cluster, clusterOutputFilePath); - } /** @@ -347,8 +362,7 @@ public class TestBZip { */ @Test public void testEmptyBzip() throws Exception { - File tmp = File.createTempFile("junit", ".tmp"); - tmp.deleteOnExit(); + File tmp = folder.newFile("junit.tmp"); CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream( tmp)); cos.close(); @@ -358,7 +372,25 @@ public class TestBZip { fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length()); assertEquals(-1, cis.read(new byte[100])); cis.close(); - tmp.delete(); + } + + @Test + public void testInnerStreamGetsClosed() throws Exception { + File tmp = folder.newFile("junit.tmp"); + + CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp)); + CBZip2OutputStream cos = new CBZip2OutputStream(out); + assertFalse(out.isClosed()); + cos.close(); + assertTrue(out.isClosed()); + + FileSystem fs = FileSystem.getLocal(new Configuration(false)); + Path path = new Path(tmp.getAbsolutePath()); + CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path)); + CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length()); + assertFalse(in.isClosed()); + cis.close(); + assertTrue(in.isClosed()); } /** @@ -451,6 +483,7 @@ public class TestBZip { props.put(entry.getKey(), entry.getValue()); } props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, Integer.toString(splitSize)); + props.setProperty("pig.noSplitCombination", "true"); PigServer pig = new PigServer(cluster.getExecType(), props); FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props)); fs.delete(new Path(outputFile), true); @@ -464,7 +497,7 @@ public class TestBZip { numPartFiles++; } } - assertEquals(true, numPartFiles > 0); + assertEquals(true, numPartFiles > 1); // verify record count to verify we read bzip data correctly Util.registerMultiLineQuery(pig, script); @@ -480,26 +513,32 @@ public class TestBZip { "1\t2\r3\t4" }; - String inputFileName = "input.txt"; - Util.createInputFile(cluster, inputFileName, inputData); - - PigServer pig = new PigServer(cluster.getExecType(), properties); - - pig.setBatchOn(); - pig.registerQuery("a = load '" + inputFileName + "';"); - pig.registerQuery("store a into 'output.bz2';"); - pig.registerQuery("store a into 'output';"); - pig.executeBatch(); + try { + String inputFileName = "input.txt"; + Util.createInputFile(cluster, inputFileName, inputData); - FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( - pig.getPigContext().getProperties())); - FileStatus[] outputFiles = fs.listStatus(new Path("output"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); + PigServer pig = new PigServer(cluster.getExecType(), properties); - outputFiles = fs.listStatus(new Path("output.bz2"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); + pig.setBatchOn(); + pig.registerQuery("a = load '" + inputFileName + "';"); + pig.registerQuery("store a into 'output.bz2';"); + pig.registerQuery("store a into 'output';"); + pig.executeBatch(); + + FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( + pig.getPigContext().getProperties())); + FileStatus[] outputFiles = fs.listStatus(new Path("output"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + + outputFiles = fs.listStatus(new Path("output.bz2"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + } finally { + Util.deleteFile(cluster, "input.txt"); + Util.deleteFile(cluster, "output.bz2"); + Util.deleteFile(cluster, "output"); + } } @Test @@ -511,34 +550,41 @@ public class TestBZip { String inputFileName = "input2.txt"; Util.createInputFile(cluster, inputFileName, inputData); - PigServer pig = new PigServer(cluster.getExecType(), properties); - PigContext pigContext = pig.getPigContext(); - pigContext.getProperties().setProperty( "output.compression.enabled", "true" ); - pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" ); - - pig.setBatchOn(); - pig.registerQuery("a = load '" + inputFileName + "';"); - pig.registerQuery("store a into 'output2.bz2';"); - pig.registerQuery("store a into 'output2';"); - pig.executeBatch(); - - FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( - pig.getPigContext().getProperties())); - FileStatus[] outputFiles = fs.listStatus(new Path("output2"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); - - outputFiles = fs.listStatus(new Path("output2.bz2"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); + try { + PigServer pig = new PigServer(cluster.getExecType(), properties); + PigContext pigContext = pig.getPigContext(); + pigContext.getProperties().setProperty( "output.compression.enabled", "true" ); + pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" ); + + pig.setBatchOn(); + pig.registerQuery("a = load '" + inputFileName + "';"); + pig.registerQuery("store a into 'output2.bz2';"); + pig.registerQuery("store a into 'output2';"); + pig.executeBatch(); + + FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( + pig.getPigContext().getProperties())); + FileStatus[] outputFiles = fs.listStatus(new Path("output2"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + + outputFiles = fs.listStatus(new Path("output2.bz2"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + } finally { + Util.deleteFile(cluster,"input2.txt"); + Util.deleteFile(cluster,"output2.bz2"); + Util.deleteFile(cluster,"output2"); + } } /** - * Tests that Pig throws an Exception when the input files to be loaded are actually - * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part - * of the input data. + * Tests that Pig's Bzip2TextInputFormat throws an IOException when the input files to be loaded are actually + * a result of concatenating 2 or more bz2 files. It should not silently ignore part + * of the input data. When, hadoop's TextInpuFormat is used(PIG-3251), it should + * successfully read this concatenated bzip file to the end. */ - @Test (expected=IOException.class) + @Test public void testBZ2Concatenation() throws Exception { String[] inputData1 = new String[] { "1\ta", @@ -556,14 +602,12 @@ public class TestBZip { }; // bzip compressed input file1 - File in1 = File.createTempFile("junit", ".bz2"); + File in1 = folder.newFile("junit-in1.bz2"); String compressedInputFileName1 = in1.getAbsolutePath(); - in1.deleteOnExit(); // file2 - File in2 = File.createTempFile("junit", ".bz2"); + File in2 = folder.newFile("junit-in2.bz2"); String compressedInputFileName2 = in2.getAbsolutePath(); - in1.deleteOnExit(); String unCompressedInputFileName = "testRecordDelims-uncomp.txt"; Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged); @@ -603,19 +647,29 @@ public class TestBZip { // pig script to read compressed concatenated input script = "a = load '" + Util.encodeEscape(compressedInputFileName1) +"';"; pig.registerQuery(script); - Iterator<Tuple> it2 = pig.openIterator("a"); - while(it1.hasNext()) { - Tuple t1 = it1.next(); - Tuple t2 = it2.next(); - assertEquals(t1, t2); + try { + Iterator<Tuple> it2 = pig.openIterator("a"); + while(it1.hasNext()) { + Tuple t1 = it1.next(); + Tuple t2 = it2.next(); + assertEquals(t1, t2); + } + + assertFalse(it2.hasNext()); + + // When pig.bzip.use.hadoop.inputformat=true, it should successfully read the concatenated bzip file + assertEquals("IOException should be thrown when pig's own Bzip2TextInputFormat is used", + properties.getProperty("pig.bzip.use.hadoop.inputformat"), + "true"); + + } catch (IOException e) { + assertEquals("IOException should only be thrown when pig's own Bzip2TextInputFormat is used", + properties.getProperty("pig.bzip.use.hadoop.inputformat"), + "false"); } - assertFalse(it2.hasNext()); - } finally { - in1.delete(); - in2.delete(); Util.deleteFile(cluster, unCompressedInputFileName); } @@ -625,11 +679,12 @@ public class TestBZip { * Concatenate the contents of src file to the contents of dest file */ private void catInto(String src, String dest) throws IOException { - BufferedWriter out = new BufferedWriter(new FileWriter(dest, true)); - BufferedReader in = new BufferedReader(new FileReader(src)); - String str; - while ((str = in.readLine()) != null) { - out.write(str); + FileOutputStream out = new FileOutputStream(new File(dest) , true); + FileInputStream in = new FileInputStream(new File(src)); + byte[] buffer = new byte[4096]; + int bytesread; + while ((bytesread = in.read(buffer)) != -1) { + out.write(buffer,0, bytesread); } in.close(); out.close(); @@ -658,20 +713,26 @@ public class TestBZip { pw.println(inputScript); pw.close(); - PigServer pig = new PigServer(cluster.getExecType(), properties); - - FileInputStream fis = new FileInputStream(inputScriptName); - pig.registerScript(fis); + try { + PigServer pig = new PigServer(cluster.getExecType(), properties); - FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( - pig.getPigContext().getProperties())); - FileStatus[] outputFiles = fs.listStatus(new Path("output3"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); + FileInputStream fis = new FileInputStream(inputScriptName); + pig.registerScript(fis); - outputFiles = fs.listStatus(new Path("output3.bz2"), - Util.getSuccessMarkerPathFilter()); - assertTrue(outputFiles[0].getLen() > 0); + FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration( + pig.getPigContext().getProperties())); + FileStatus[] outputFiles = fs.listStatus(new Path("output3"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + + outputFiles = fs.listStatus(new Path("output3.bz2"), + Util.getSuccessMarkerPathFilter()); + assertTrue(outputFiles[0].getLen() > 0); + } finally { + Util.deleteFile(cluster, "input3.txt"); + Util.deleteFile(cluster, "output3.bz2"); + Util.deleteFile(cluster, "output3"); + } } }
