[ https://issues.apache.org/jira/browse/CRUNCH-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068555#comment-14068555 ]
Micah Whitacre commented on CRUNCH-405: --------------------------------------- Thinking through scenarios for how things should behave with moving materialized state to the pipeline I figured an example might help... If we have a pipeline that looked like the following: {code} pcoll1 = pipeline.read(...); pcoll2 = pcoll1.map(...); pcoll3 = pcoll2.map(...); pipeline.write(pcoll1); pipeline.write(pcoll2); pcoll3.materialize(); pipeline.run(); //generates the two outputs and materialized pipeline.run(); //would do nothing because outputTargets are cleared on the previous run and also nothing gets picked for materialization {code} if we changed the pipeline slightly... {code} pcoll1 = pipeline.read(...); pcoll2 = pcoll1.map(...); pcoll3 = pcoll2.map(...); pipeline.write(pcoll1); pipeline.write(pcoll2); pcoll3.materialize(); pipeline.run(); //generates the two outputs and materialized pcoll3.write(...); pipeline.run(); //would only generate one target for pcoll3 but also wouldn't utilize stored pcoll1 and pcoll2. {code} With the change to make the plan idempotent it gets more complex with the following: {code} pcoll1 = pipeline.read(...); pcoll2 = pcoll1.map(...); pcoll3 = pcoll2.map(...); pipeline.write(pcoll1); pipeline.write(pcoll2); pcoll3.materialize(); plan1 = pipeline.plan(); //generates the two outputs and materialized pcoll3.write(...); plan2 = pipeline.plan(); //generates the 3 targets + materialize plan2.execute(); plan1.execute(); {code} While someone could do this I'm not sure it is a valid workflow. Specifically in this case the output targets are not cleared (e.g. done in runAsync()) and the runs will conflict over who generates the targets. I believe this is the scenario where you were concerned and stated the above... {quote} We might need some sync logic in there to make sure two identical plans weren't executed simultaneously-- there would need to be a way for the execution of one plan to invalidate the execution of any others that were created. {quote} So what would be the preferred action? Should executing plan1 fail? Should it attempt and both fail when conflict over the targets being created? I'm toying with a couple of options... * The original patch had the addition of a method to MRPipeline. Instead of returning an executable MRExecutor it could return a no-op so that plan could never be executed. Only on a legitimate run/plan will update the state and prevent the duplication. {code} public MRExecutor plan(boolean dryRun) {code} * Add a "version" to the pipeline which gets modified for each write/materialize. Each execution then marks on the pipeline what version it executed for. We could then add logic to invalidate lower version runs that might be duplicates or stale plans. I'm not really sold on either and for sure I think there are flaws in the last approach and not fully baked. Also just checking but what are our thread safety guarantees around MRPipeline/DistributedPipeline? I assume they should be marked as not thread safe but wasn't sure if there were use cases I might have missed where they should be considered otherwise. > Explore adding support for idempotent MRPipeline.plan() > ------------------------------------------------------- > > Key: CRUNCH-405 > URL: https://issues.apache.org/jira/browse/CRUNCH-405 > Project: Crunch > Issue Type: Improvement > Components: Core > Reporter: Micah Whitacre > Assignee: Micah Whitacre > Attachments: CRUNCH-405_v1.patch > > > Talking through a use case with a consumer, they were interested in having > the ability to run the MRPipeline.plan() method one to many times prior to > ever calling the Pipeline.run/done methods. The reason for this was they > were looking at pulling information off the MRExecutor to tweak settings > inside of their DoFns. > Currently the MRPipeline implementation however does not have an idempotent > plan() method as it alters the state of internal values therefore affecting > the full run once done() is called. > It would be nice if we added an idempotent plan() method that could be gather > this information or perhaps a reset option. -- This message was sent by Atlassian JIRA (v6.2#6252)