[ 
https://issues.apache.org/jira/browse/CRUNCH-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068555#comment-14068555
 ] 

Micah Whitacre commented on CRUNCH-405:
---------------------------------------

Thinking through scenarios for how things should behave with moving 
materialized state to the pipeline I figured an example might help...

If we have a pipeline that looked like the following:

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
pipeline.run(); //generates the two outputs and materialized
pipeline.run(); //would do nothing because outputTargets are cleared on the 
previous run and also nothing gets picked for materialization
{code}

if we changed the pipeline slightly...

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
pipeline.run(); //generates the two outputs and materialized
pcoll3.write(...);
pipeline.run(); //would only generate one target for pcoll3 but also wouldn't 
utilize stored pcoll1 and pcoll2.
{code}

With the change to make the plan idempotent it gets more complex with the 
following:

{code}
pcoll1 = pipeline.read(...);
pcoll2 = pcoll1.map(...);
pcoll3 = pcoll2.map(...);
pipeline.write(pcoll1);
pipeline.write(pcoll2);
pcoll3.materialize();
plan1 = pipeline.plan(); //generates the two outputs and materialized
pcoll3.write(...);
plan2 = pipeline.plan(); //generates the 3 targets + materialize
plan2.execute();
plan1.execute();
{code}

While someone could do this I'm not sure it is a valid workflow.  Specifically 
in this case the output targets are not cleared (e.g. done in runAsync()) and 
the runs will conflict over who generates the targets.  I believe this is the 
scenario where you were concerned and stated the above...

{quote}
We might need some sync logic in there to make sure two identical plans weren't 
executed simultaneously-- there would need to be a way for the execution of one 
plan to invalidate the execution of any others that were created.
{quote}

So what would be the preferred action?  Should executing plan1 fail?  Should it 
attempt and both fail when conflict over the targets being created?

I'm toying with a couple of options...

* The original patch had the addition of a method to MRPipeline.  Instead of 
returning an executable MRExecutor it could return a no-op so that plan could 
never be executed.  Only on a legitimate run/plan will update the state and 
prevent the duplication.
{code}
public MRExecutor plan(boolean dryRun)
{code}
* Add a "version" to the pipeline which gets modified for each 
write/materialize.  Each execution then marks on the pipeline what version it 
executed for.  We could then add logic to invalidate lower version runs that 
might be duplicates or stale plans.

I'm not really sold on either and for sure I think there are flaws in the last 
approach and not fully baked.

Also just checking but what are our thread safety guarantees around 
MRPipeline/DistributedPipeline?  I assume they should be marked as not thread 
safe but wasn't sure if there were use cases I might have missed where they 
should be considered otherwise.






> Explore adding support for idempotent MRPipeline.plan()
> -------------------------------------------------------
>
>                 Key: CRUNCH-405
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-405
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Micah Whitacre
>            Assignee: Micah Whitacre
>         Attachments: CRUNCH-405_v1.patch
>
>
> Talking through a use case with a consumer, they were interested in having 
> the ability to run the MRPipeline.plan() method one to many times prior to 
> ever calling the Pipeline.run/done methods.  The reason for this was they 
> were looking at pulling information off the MRExecutor to tweak settings 
> inside of their DoFns.
> Currently the MRPipeline implementation however does not have an idempotent 
> plan() method as it alters the state of internal values therefore affecting 
> the full run once done() is called.  
> It would be nice if we added an idempotent plan() method that could be gather 
> this information or perhaps a reset option.  



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to