[ 
https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13698402#comment-13698402
 ] 

Josh Wills commented on CRUNCH-232:
-----------------------------------

Thanks for reporting this. It looks to me that we don't call cleanup in the 
case that there's no processing to do for an input, which probably only happens 
when we are doing a join against an empty input. Can you confirm that one of 
the inputs here is empty?

I think that the right thing to do is to ensure that cleanup is always called, 
regardless of whether or not the input actually has any records in it.
                
> DoFn initialize method gets called twice where as cleanup gets called only 
> once when join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-232
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-232
>             Project: Crunch
>          Issue Type: Bug
>          Components: MapReduce Patterns
>    Affects Versions: 0.6.0
>            Reporter: Anuj Ojha
>            Priority: Critical
>
> DoFn's initialize method gets called twice where as cleanup gets called only 
> once, when a Join is performed on two Ptables.
>  
> Sample Test:
> {code} 
>         final Configuration config = HBaseTest.getConf();
>         final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
>         final PCollection<String> collectionHelper1 = 
> pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile.txt").toString());
>  
>         final PCollection<String> collectionHelper2 = 
> pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile2.txt").toString());
>  
>         final PTable<Integer, String> ptable1 = 
> collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, String> ptable2 = 
> collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, Pair<String, String>> joinedTable = 
> ptable1.join(ptable2);
>  
>         final PCollection<String> joinedStrings = joinedTable.parallelDo(
>                 new MapFn<Pair<Integer, Pair<String, String>>, String>() {
>                     private static final long serialVersionUID = 
> -8796426750247480646L;
>  
>                     @Override
>                     public String map(final Pair<Integer, Pair<String, 
> String>> input) {
>                         return input.second().first() + "/" + 
> input.second().second();
>                     }
>                 }, Avros.strings());
>  
>         System.out.println(joinedStrings.materialize().iterator().hasNext());
>  {code}
>  
> The two DoFnCheck looks something like this:
>  
> {code} 
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 6780749658216132026L;
>  
>     @Override
>     public void initialize() {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned 
> up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void process(final String input, final Emitter<Pair<Integer, 
> String>> emitter) {
>         // TODO Auto-generated method stub
>         
> System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>  
>         final Pair<Integer, String> pair = new Pair<Integer, String>(1, 
> input);
>  
>         emitter.emit(pair);
>     }
> }
> {code} 
>  
> The console looks like this:
>  
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned 
> up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to