[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686214#comment-16686214
 ] 

Miguel E. Coimbra commented on FLINK-10867:
-------------------------------------------

I intend to work on this and thought about the following first:

1 - Adding the CacheOperator to the API like the example I mentioned above.

2 - Look in the cluster execution model how to implement this.

3 - See how to enable this in the MiniCluster mode, as in local mode the 
cluster gets deleted and rebuilt between jobs.

[~yanghua] do you have a plan on where to begin to collaborate?

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-10867
>                 URL: https://issues.apache.org/jira/browse/FLINK-10867
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management, DataSet API, Local Runtime
>    Affects Versions: 1.8.0
>            Reporter: Miguel E. Coimbra
>            Assignee: vinoyang
>            Priority: Major
>             Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  * Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> Fabian Hueske last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CachingOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
> finishes in local mode, so it would be necessary to change local mode to keep 
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet<Tuple2<Long, Long>> previousResult = 
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, 
> which resulted from callSomeFlinkDataflowOperator() int the previous Flink 
> job, remained in memory.}}
>  {{environment.execute(); // Trigger a different job whose data depends on 
> the previous one.}}
> Besides adding appropriate classes to the Flink Java API, implementing this 
> feature would require changing things so that:
>  * JobManagers are aware that a completed job had cached operators - likely a 
> new COMPLETED_AND_REUSABLE job state?
>  * TaskManagers must keep references to the Flink memory management segments 
> associated to the CacheOperator data;
>  * CacheOperator must have a default number of usages and/or amount of time 
> to be kept alive (I think both options should exist but the user may choose 
> whether to use one or both);
>  * Cluster coordination: should the JobManager be the entity that ultimately 
> triggers the memory eviction order on the TaskManagers associated to a job 
> with COMPLETED_AND_REUSABLE status?
>  
> *Related work.*
> In Spark I believe the existing cache() operator does something similar to 
> what I propose:
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to