Hi Daniel: Thanks for your comment and I have figured out why org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#isAccumulative() is false in second case.
Best regards Zhang,Liyun -----Original Message----- From: Daniel Dai [mailto:[email protected]] Sent: Wednesday, March 04, 2015 2:39 AM To: [email protected] Subject: Re: a question about TestAccumulator#testAccumBasic In first case Accumulator is get used and the second case is not (believe due to the second UDF BagCount in the plan). There are certain conditions whether Accumulator can be used in the plan or not, the logic is in AccumulatorOptimizer. Daniel On 3/3/15, 12:40 AM, "Zhang, Liyun" <[email protected]> wrote: >Hi Daniel: > Thanks for your reply! >According to your comment: The first case will use Accumulator, so >accumulate -> cleanup will be called, but no exec. The second case will >not use Accumulator, exec will be called instead of accumulate -> cleanup. > >I guess what you mean is that If >org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOpe >rat >or#isAccumulative() is true, it will execute >((Accumulator)func).accumulate((Tuple)result.result); while >org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOpe >rat >or#isAccumulative() is false, it will execute >func.exec((Tuple) result.result); > > >but I think the second case also use AccumulatorBagCount. The >difference between the first and second case is second case use >org.apache.pig.test.utils.BagCount while the first one are not. >> The first pig script: (TestAccumulator line 151~154) >> A = load '" + INPUT_FILE1 + "' as (id:int, fruit); >> B = group A by id; >> C = foreach B generate group, >>org.apache.pig.test.utils.AccumulatorBagCount(A); > > >The second script: (TestAccumulator line 169~171) >> A = load '" + INPUT_FILE1 + "' as (id:int, fruit); B = group A by >>id; >> C = foreach B generate group, >>org.apache.pig.test.utils.AccumulatorBagCount(A), >>org.apache.pig.test.utils.BagCount(A); > > > >Best regards >Zhang,Liyun > > > >-----Original Message----- >From: Daniel Dai [mailto:[email protected]] >Sent: Tuesday, March 03, 2015 3:45 AM >To: [email protected] >Subject: Re: a question about TestAccumulator#testAccumBasic > >The first case will use Accumulator, so accumulate -> cleanup will be >called, but no exec. The second case will not use Accumulator, exec >will be called instead of accumulate -> cleanup. > >Daniel > >On 3/1/15, 7:21 PM, "Zhang, Liyun" <[email protected]> wrote: > >>Hi all: >> I have a question about TestAccumulator#testAccumBasic. >> The first pig script: (TestAccumulator line 151~154) >> A = load '" + INPUT_FILE1 + "' as (id:int, fruit); >> B = group A by id; >> C = foreach B generate group, >>org.apache.pig.test.utils.AccumulatorBagCount(A); >> >> It uses org.apache.pig.test.utils.AccumulatorBagCount, in >>org.apache.pig.test.utils.AccumulatorBagCount#exec >> org.apache.pig.test.utils.AccumulatorBagCount#exec >>public Integer exec(Tuple tuple) throws IOException { >> throw new IOException("exec() should not be called."); } My >>question:It should throw exception when script is excuted but why not >>throw exception? >> >>The second script: (TestAccumulator line 169~171) >> A = load '" + INPUT_FILE1 + "' as (id:int, fruit); B = group A by >>id; >> C = foreach B generate group, >>org.apache.pig.test.utils.AccumulatorBagCount(A), >>org.apache.pig.test.utils.BagCount(A); >> It uses , org.apache.pig.test.utils.AccumulatorBagCount and ), >>org.apache.pig.test.utils.BagCount. >> The code checks whether if it throws exception, if not throw >>exception, the unit test fails. >> >> >>TestAccumulator#testAccumBasic >>@Test >> public void testAccumBasic() throws IOException{ >> 151 // test group by >> 152 pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as >>(id:int, fruit);"); >> 153 pigServer.registerQuery("B = group A by id;"); >> 154 pigServer.registerQuery("C = foreach B generate group, >>org.apache.pig.test.utils.AccumulatorBagCount(A);"); >> >> HashMap<Integer, Integer> expected = new HashMap<Integer, >>Integer>(); >> expected.put(100, 2); >> expected.put(200, 1); >> expected.put(300, 3); >> expected.put(400, 1); >> >> Iterator<Tuple> iter = pigServer.openIterator("C"); >> >> while(iter.hasNext()) { >> Tuple t = iter.next(); >> assertEquals(expected.get((Integer)t.get(0)), >>(Integer)t.get(1)); >> } >> >> 169 pigServer.registerQuery("B = group A by id;"); >> 170 pigServer.registerQuery("C = foreach B generate group, " + >> "org.apache.pig.test.utils.AccumulatorBagCount(A), >>org.apache.pig.test.utils.BagCount(A);"); >> >> try{ >> iter = pigServer.openIterator("C"); >> >> while(iter.hasNext()) { >> Tuple t = iter.next(); >> assertEquals(expected.get((Integer)t.get(0)), >>(Integer)t.get(1)); >> } >> fail("accumulator should not be called."); >> }catch(IOException e) { >> // should throw exception from AccumulatorBagCount. >> } >> >> // test cogroup >> pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as >>(id:int, fruit);"); >> pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as >>(id:int, fruit);"); >> pigServer.registerQuery("C = cogroup A by id, B by id;"); >> pigServer.registerQuery("D = foreach C generate group, " + >> "org.apache.pig.test.utils.AccumulatorBagCount(A), >>org.apache.pig.test.utils.AccumulatorBagCount(B);"); >> >> HashMap<Integer, String> expected2 = new HashMap<Integer, >>String>(); >> expected2.put(100, "2,2"); >> expected2.put(200, "1,1"); >> expected2.put(300, "3,3"); >> expected2.put(400, "1,1"); >> >> iter = pigServer.openIterator("D"); >> >> while(iter.hasNext()) { >> Tuple t = iter.next(); >> assertEquals(expected2.get((Integer)t.get(0)), >>t.get(1).toString()+","+t.get(2).toString()); >> } >> } >> >> Can anyone help me solving my question? >> >>Best regards >>Zhang,Liyun >> >
