[ 
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710007#comment-16710007
 ] 

Miguel E. Coimbra edited comment on FLINK-10867 at 12/5/18 12:29 PM:
---------------------------------------------------------------------

[~StephanEwen], I've been thinking how to go about this on my [own fork of 
Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having 
invested quite some time in the last two years using this platform and opening 
bug issues now and then, I'm interested in contributing something which I 
believe would be of use to the community

A complex task such as this would likely require a design document or something 
along those lines. However, what I've done so far was to read the Flink source 
code to understand what is necessary to change by tracing what the information 
flow would be, considering key moments:
 * Programmer invoking the {{.cache()}} function;
 * {{JobManager}} receiving a plan with a caching operator (the operator does 
not have a reference to a previous job at this time);
 * {{TaskManager}}(s) receiving indication that the 
{{org.apache.flink.core.memory.MemorySegment }}instances associated to the 
caching operator are to be kept as they are (in the job where caching will 
occur, their data must have originated from a previous operator which produced 
the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work 
like a regular {{DataSink}}, but instead of writing data, its action is to not 
evict the segments;
 * {{JobManager}} receiving a plan with a caching operator referencing 
previously-cached data.

 

Additional behavior properties:
 * Adding a parameter to decide how long the cached data is to be stored in the 
cluster. Number of jobs that the cached data should be persisted or amount of 
time? What would be desirable?
 * Would this imply that caching may only occur when executing in session mode 
so that the Flink job manager knows that it is caching specifically for a user?
 * The {{org.apache.flink.runtime.executiongraph.ExecutionJobVertex}} instances 
that depend on cached datasets could conceptually read their input from 
{{MemorySegment}} instances with the same logic as if reading  from disk.
 * Create a new COMPLETED_AND_CACHED job status to make this concept explicit 
as far as job management is concerned?

 

Besides the DataSet API (this part is the easier one), I've been thinking and 
perhaps it would be best to define an 
{{org.apache.flink.api.java.operators.MemorySinkOperator }}class to explicitly 
hold a reference to the previous job where caching occurred. The 
{{org.apache.flink.api.java.ExecutionEnvironment}} instance could note the 
references to this {{MemorySinkOperator}} instance and store in them the 
cluster job identification as an attribute. That way, when the client-side 
{{.execute()}} call finishes successfully, it would store the reference there 
so that the {{MemorySinkOperator}} operator reference can be reused in the next 
Flink job triggered by the user-level code.

The major packages for this functionality are:
 * {{org.apache.flink.optimizer}}
 * {{org.apache.flink.runtime}}
 * {{org.apache.flink.java:}}
 * {{org.apache.flink.client}}

 

I want to tackle this task myself, do you have any additional recommendations?

And could I do this as part of a GSoC 2019 contribution?


was (Author: mcoimbra):
[~StephanEwen], I've been thinking how to go about this on my [own fork of 
Apache Flink|https://github.com/mcoimbra/flink] on GitHub. After having 
invested quite some time in the last two years using this platform and opening 
bug issues now and then, I'm interested in contributing something which I 
believe would be of use to the community

A complex task such as this would likely require a design document or something 
along those lines. However, what I've done so far was to read the Flink source 
code to understand what is necessary to change by tracing what the information 
flow would be, considering key moments:
 * Programmer invoking the {{.cache()}} function;
 * {{JobManager}} receiving a plan with a caching operator (the operator does 
not have a reference to a previous job at this time);
 * {{TaskManager}}(s) receiving indication that the 
{{org.apache.flink.core.memory.MemorySegment }}instances associated to the 
caching operator are to be kept as they are (in the job where caching will 
occur, their data must have originated from a previous operator which produced 
the data in the same job, e.g. {{Filter->Map->Cache}}). The operator would work 
like a regular {{DataSink}}, but instead of writing data, its action is to not 
evict the segments;
 * {{JobManager}} receiving a plan with a caching operator referencing 
previously-cached data.

 

Additional behavior properties:
 * Adding a parameter to decide how long the cached data is to be stored in the 
cluster. Number of jobs that the cached data should be persisted or amount of 
time? What would be desirable?
 * Would this imply that caching may only occur when executing in session mode 
so that the Flink job manager knows that it is caching specifically for a user?
 * The {{org.apache.flink.runtime.executiongraph.ExecutionJobVertex}} instances 
that depend on cached datasets could conceptually read their input from 
{{MemorySegment}} instances with the same logic as if reading  from disk.
 * Create a new COMPLETED_AND_CACHED job status to make this concept explicit 
as far as job management is concerned?

 

Besides the DataSet API (this part is the easier one), I've been thinking and 
perhaps it would be best to define an 
{{org.apache.flink.api.java.operators.MemorySinkOperator }}class to explicitly 
hold a reference to the previous job where caching occurred. The 
{{org.apache.flink.api.java.ExecutionEnvironment}} instance could note the 
references to this {{MemorySinkOperator}} instance and store in them the 
cluster job identification as an attribute. That way, when the client-side 
{{.execute()}} call finishes successfully, it would store the reference there 
so that the {{MemorySinkOperator}} operator reference can be reused in the next 
Flink job triggered by the user-level code.

The major packages for this functionality are:
 * {{org.apache.flink.optimizer}}
 * {{org.apache.flink.runtime}}
 * {{org.apache.flink.java:}}
 * {{org.apache.flink.client}}

 

I want to tackle this task myself, do you have any additional recommendations?

> Add a DataSet-based CacheOperator to reuse results between jobs
> ---------------------------------------------------------------
>
>                 Key: FLINK-10867
>                 URL: https://issues.apache.org/jira/browse/FLINK-10867
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management, DataSet API, Local Runtime
>    Affects Versions: 1.8.0
>            Reporter: Miguel E. Coimbra
>            Assignee: vinoyang
>            Priority: Major
>             Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested 
> in processing a large amount of data, outputting results to disk and then 
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on 
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an 
> evolving graph while allowing for specific logic on challenges such as _when_ 
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy 
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them 
> back in a new job to be sent to the JobManager if they are small, this 
> becomes prohibitive if there are several gigabytes of data to write/read and 
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential 
> jobs increases, even the benefits of the secondary storage being distributed 
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a 
> single big job to submit to the JobManager, thus allowing reuse of results 
> due to the natural forwarding of results to subsequent operators in dataflow 
> programing.
> However, this becomes difficult due to two reasons:
>  * The logic to connect the sequence of jobs may depend on factors external 
> to Flink and not known at the start of the job composition.
>  This also excludes limited iterative behavior like what is provided in 
> {{BulkIteration/DeltaIteration;}}
>  ** Composing a job with "too many" operators and inter-dependencies may lead 
> to the Optimizer engaging an exponential optimization search space.
>  This is particularly true for operators with multiple valid execution 
> strategies, leading to a combinatorics problem.
>  This leads to the Flink compiler _taking forever_ to even create a plan.
>  I believe this is the current situation based on a reply I received from 
> [~fhueske] last year.
>  His reply was on the 7th of December 2017:
>  Link: 
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
>  Mailing list thread title: "Re: How to perform efficient DataSet reuse 
> between iterations"
>  
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the 
> concept of "_job chaining_", where a new type of DataSink would receive data 
> that will:
>  - Be available to a subsequent job which somehow makes a reference to the 
> DataSink of the previous job;
>  - Have remained available (from the previous job execution) in the exact 
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between 
> chained jobs - if the data was read from disk again between jobs, it would 
> likely be distributed with the same (automatic or not) strategies, hence the 
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
>  - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
> finishes in local mode, so it would be necessary to change local mode to keep 
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
>  Simplifying the implementation?
>  - How would this look like in the API?
>  I envisioned an example like this:
> {{DataSet<Tuple2<Long, Long>> previousResult = 
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
>  {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
>  {{... // Other operations...}}
>  {{environment.execute();}}
>  {{... // The previous job has finished.}}
>  {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, 
> which resulted from callSomeFlinkDataflowOperator() int the previous Flink 
> job, remained in memory.}}
>  {{environment.execute(); // Trigger a different job whose data depends on 
> the previous one.}}
> Besides adding appropriate classes to the Flink Java API, implementing this 
> feature would require changing things so that:
>  * JobManagers are aware that a completed job had cached operators - likely a 
> new COMPLETED_AND_REUSABLE job state?
>  * TaskManagers must keep references to the Flink memory management segments 
> associated to the CacheOperator data;
>  * CacheOperator must have a default number of usages and/or amount of time 
> to be kept alive (I think both options should exist but the user may choose 
> whether to use one or both);
>  * Cluster coordination: should the JobManager be the entity that ultimately 
> triggers the memory eviction order on the TaskManagers associated to a job 
> with COMPLETED_AND_REUSABLE status?
>  
> *Related work.*
> In Spark I believe the existing cache() operator does something similar to 
> what I propose:
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to