[ 
https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699009#comment-13699009
 ] 

Josh Wills commented on CRUNCH-232:
-----------------------------------

For joins, the CrunchMapper has two different types of input splits that it 
needs to process, and there are two separate sequences of DoFn calls (which in 
the runtime system are called RTNodes), one for each of the splits. It turns 
out that during setup, we would always initialize _all_ of the RTNodes (and 
thus their child DoFns), regardless of which input split we were running on at 
the time, but we would only call cleanup for the RTNode (and DoFns) that 
corresponded to the split that we were actually processing in the current task. 
The fix was primarily to ensure that we only called initialize for one of the 
RTNode sequences during setup-- the one that we were about to process-- not all 
of them.
                
> DoFn initialize method gets called twice where as cleanup gets called only 
> once when join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-232
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-232
>             Project: Crunch
>          Issue Type: Bug
>          Components: MapReduce Patterns
>    Affects Versions: 0.6.0
>            Reporter: Anuj Ojha
>            Priority: Critical
>         Attachments: CRUNCH-232.patch
>
>
> DoFn's initialize method gets called twice where as cleanup gets called only 
> once, when a Join is performed on two Ptables.
>  
> Sample Test:
> {code} 
>         final Configuration config = HBaseTest.getConf();
>         final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
>         final PCollection<String> collectionHelper1 = 
> pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile.txt").toString());
>  
>         final PCollection<String> collectionHelper2 = 
> pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile2.txt").toString());
>  
>         final PTable<Integer, String> ptable1 = 
> collectionHelper2.parallelDo("Creating table", new DoFnCheck(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, String> ptable2 = 
> collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, Pair<String, String>> joinedTable = 
> ptable1.join(ptable2);
>  
>         final PCollection<String> joinedStrings = joinedTable.parallelDo(
>                 new MapFn<Pair<Integer, Pair<String, String>>, String>() {
>                     private static final long serialVersionUID = 
> -8796426750247480646L;
>  
>                     @Override
>                     public String map(final Pair<Integer, Pair<String, 
> String>> input) {
>                         return input.second().first() + "/" + 
> input.second().second();
>                     }
>                 }, Avros.strings());
>  
>         System.out.println(joinedStrings.materialize().iterator().hasNext());
>  {code}
>  
> The two DoFnCheck looks something like this:
>  
> {code} 
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 6780749658216132026L;
>  
>     @Override
>     public void initialize() {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned 
> up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void process(final String input, final Emitter<Pair<Integer, 
> String>> emitter) {
>         // TODO Auto-generated method stub
>         
> System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>  
>         final Pair<Integer, String> pair = new Pair<Integer, String>(1, 
> input);
>  
>         emitter.emit(pair);
>     }
> }
> {code} 
>  
> The console looks like this:
>  
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned 
> up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm 
> initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to