Hi Daniel:
  Thanks for your comment and I have figured out why  
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#isAccumulative()
 is false  in second case.


Best regards
Zhang,Liyun

-----Original Message-----
From: Daniel Dai [mailto:[email protected]] 
Sent: Wednesday, March 04, 2015 2:39 AM
To: [email protected]
Subject: Re: a question about TestAccumulator#testAccumBasic

In first case Accumulator is get used and the second case is not (believe due 
to the second UDF BagCount in the plan). There are certain conditions whether 
Accumulator can be used in the plan or not, the logic is in 
AccumulatorOptimizer.

Daniel

On 3/3/15, 12:40 AM, "Zhang, Liyun" <[email protected]> wrote:

>Hi Daniel:
>  Thanks for your reply!
>According to your comment: The first case will use Accumulator, so 
>accumulate -> cleanup will be called, but no exec. The second case will 
>not use Accumulator, exec will be called instead of accumulate -> cleanup.
>
>I guess what you mean is that If
>org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOpe
>rat
>or#isAccumulative() is true, it will execute 
>((Accumulator)func).accumulate((Tuple)result.result); while 
>org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOpe
>rat
>or#isAccumulative() is false, it will execute
>func.exec((Tuple) result.result);
>
>
>but I think the second case also use AccumulatorBagCount.  The 
>difference between the first and second case is second case use 
>org.apache.pig.test.utils.BagCount while the first one are not.
>> The first pig script: (TestAccumulator line 151~154)
>>  A = load '" + INPUT_FILE1 + "' as (id:int, fruit);
>>  B = group A by id;
>>  C = foreach B generate group,
>>org.apache.pig.test.utils.AccumulatorBagCount(A);
>
>
>The second script: (TestAccumulator line 169~171)
>>  A = load '" + INPUT_FILE1 + "' as (id:int, fruit);  B = group A by 
>>id;
>>  C = foreach B generate group,
>>org.apache.pig.test.utils.AccumulatorBagCount(A),
>>org.apache.pig.test.utils.BagCount(A);
>
>
>
>Best regards
>Zhang,Liyun
>
>
>
>-----Original Message-----
>From: Daniel Dai [mailto:[email protected]]
>Sent: Tuesday, March 03, 2015 3:45 AM
>To: [email protected]
>Subject: Re: a question about TestAccumulator#testAccumBasic
>
>The first case will use Accumulator, so accumulate -> cleanup will be 
>called, but no exec. The second case will not use Accumulator, exec 
>will be called instead of accumulate -> cleanup.
>
>Daniel
>
>On 3/1/15, 7:21 PM, "Zhang, Liyun" <[email protected]> wrote:
>
>>Hi all:
>>  I have a question about  TestAccumulator#testAccumBasic.
>> The first pig script: (TestAccumulator line 151~154)
>>  A = load '" + INPUT_FILE1 + "' as (id:int, fruit);
>>  B = group A by id;
>>  C = foreach B generate group,
>>org.apache.pig.test.utils.AccumulatorBagCount(A);
>>
>>  It uses org.apache.pig.test.utils.AccumulatorBagCount, in 
>>org.apache.pig.test.utils.AccumulatorBagCount#exec
>>  org.apache.pig.test.utils.AccumulatorBagCount#exec
>>public Integer exec(Tuple tuple) throws IOException {
>>        throw new IOException("exec() should not be called."); } My 
>>question:It should throw exception when script is excuted but why not 
>>throw exception?
>>
>>The second script: (TestAccumulator line 169~171)
>>  A = load '" + INPUT_FILE1 + "' as (id:int, fruit);  B = group A by 
>>id;
>>  C = foreach B generate group,
>>org.apache.pig.test.utils.AccumulatorBagCount(A),
>>org.apache.pig.test.utils.BagCount(A);
>> It uses , org.apache.pig.test.utils.AccumulatorBagCount and ), 
>>org.apache.pig.test.utils.BagCount.
>>  The code checks whether if it throws exception, if not throw 
>>exception, the unit test fails.
>>
>>
>>TestAccumulator#testAccumBasic
>>@Test
>>    public void testAccumBasic() throws IOException{
>>    151    // test group by
>>    152    pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as
>>(id:int, fruit);");
>>   153     pigServer.registerQuery("B = group A by id;");
>>    154    pigServer.registerQuery("C = foreach B generate group,
>>org.apache.pig.test.utils.AccumulatorBagCount(A);");
>>
>>        HashMap<Integer, Integer> expected = new HashMap<Integer,
>>Integer>();
>>        expected.put(100, 2);
>>        expected.put(200, 1);
>>        expected.put(300, 3);
>>        expected.put(400, 1);
>>
>>        Iterator<Tuple> iter = pigServer.openIterator("C");
>>
>>        while(iter.hasNext()) {
>>            Tuple t = iter.next();
>>            assertEquals(expected.get((Integer)t.get(0)),
>>(Integer)t.get(1));
>>        }
>>
>>    169    pigServer.registerQuery("B = group A by id;");
>>    170   pigServer.registerQuery("C = foreach B generate group,  " +
>>                "org.apache.pig.test.utils.AccumulatorBagCount(A),
>>org.apache.pig.test.utils.BagCount(A);");
>>
>>        try{
>>            iter = pigServer.openIterator("C");
>>
>>            while(iter.hasNext()) {
>>                Tuple t = iter.next();
>>                assertEquals(expected.get((Integer)t.get(0)),
>>(Integer)t.get(1));
>>            }
>>            fail("accumulator should not be called.");
>>        }catch(IOException e) {
>>            // should throw exception from AccumulatorBagCount.
>>        }
>>
>>        // test cogroup
>>        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
>>(id:int, fruit);");
>>        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as 
>>(id:int, fruit);");
>>        pigServer.registerQuery("C = cogroup A by id, B by id;");
>>        pigServer.registerQuery("D = foreach C generate group,  " +
>>                "org.apache.pig.test.utils.AccumulatorBagCount(A),
>>org.apache.pig.test.utils.AccumulatorBagCount(B);");
>>
>>        HashMap<Integer, String> expected2 = new HashMap<Integer,
>>String>();
>>        expected2.put(100, "2,2");
>>        expected2.put(200, "1,1");
>>        expected2.put(300, "3,3");
>>        expected2.put(400, "1,1");
>>
>>        iter = pigServer.openIterator("D");
>>
>>        while(iter.hasNext()) {
>>            Tuple t = iter.next();
>>            assertEquals(expected2.get((Integer)t.get(0)),
>>t.get(1).toString()+","+t.get(2).toString());
>>        }
>>    }
>>
>>   Can anyone help me solving my question?
>>
>>Best regards
>>Zhang,Liyun
>>
>

Reply via email to