[ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699071#comment-13699071 ]
Josh Wills commented on CRUNCH-232: ----------------------------------- Yes-- just updated it. > DoFn initialize method gets called twice where as cleanup gets called only > once when join is performed on two PTables. > ---------------------------------------------------------------------------------------------------------------------- > > Key: CRUNCH-232 > URL: https://issues.apache.org/jira/browse/CRUNCH-232 > Project: Crunch > Issue Type: Bug > Components: MapReduce Patterns > Affects Versions: 0.6.0 > Reporter: Anuj Ojha > Priority: Critical > Fix For: 0.7.0 > > Attachments: CRUNCH-232.patch > > > DoFn's initialize method gets called twice where as cleanup gets called only > once, when a Join is performed on two Ptables. > > Sample Test: > {code} > final Configuration config = HBaseTest.getConf(); > final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); > final PCollection<String> collectionHelper1 = > pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile.txt").toString()); > > final PCollection<String> collectionHelper2 = > pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile2.txt").toString()); > > final PTable<Integer, String> ptable1 = > collectionHelper2.parallelDo("Creating table", new DoFnCheck(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable<Integer, String> ptable2 = > collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable<Integer, Pair<String, String>> joinedTable = > ptable1.join(ptable2); > > final PCollection<String> joinedStrings = joinedTable.parallelDo( > new MapFn<Pair<Integer, Pair<String, String>>, String>() { > private static final long serialVersionUID = > -8796426750247480646L; > > @Override > public String map(final Pair<Integer, Pair<String, > String>> input) { > return input.second().first() + "/" + > input.second().second(); > } > }, Avros.strings()); > > System.out.println(joinedStrings.materialize().iterator().hasNext()); > {code} > > The two DoFnCheck looks something like this: > > {code} > public class DoFnCheck extends DoFn<String, Pair<Integer, String>> { > /** > * > */ > private static final long serialVersionUID = 6780749658216132026L; > > @Override > public void initialize() { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm > initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void cleanup(final Emitter<Pair<Integer, String>> emitter) { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned > up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void process(final String input, final Emitter<Pair<Integer, > String>> emitter) { > // TODO Auto-generated method stub > > System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > > final Pair<Integer, String> pair = new Pair<Integer, String>(1, > input); > > emitter.emit(pair); > } > } > {code} > > The console looks like this: > > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm > initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned > up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm > initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira