[
https://issues.apache.org/jira/browse/FLINK-10867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777121#comment-16777121
]
Stephan Ewen commented on FLINK-10867:
--------------------------------------
There is another proposal for similar functionality in FLINK-111999.
I think that proposal is a bit simpler (does not need to change the runtime
parts) and ultimately more powerful even (because it is extensible).
> Add a DataSet-based CacheOperator to reuse results between jobs
> ---------------------------------------------------------------
>
> Key: FLINK-10867
> URL: https://issues.apache.org/jira/browse/FLINK-10867
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management, DataSet API, Local Runtime
> Affects Versions: 1.8.0
> Reporter: Miguel E. Coimbra
> Assignee: vinoyang
> Priority: Major
> Fix For: 1.8.0
>
>
> *Motivation.*
> There are job scenarios where Flink batch processing users may be interested
> in processing a large amount of data, outputting results to disk and then
> reusing the results for another type of computation in Flink again.
> This feature suggestion emerged from my work as a PhD researcher working on
> graph stream processing.
> [https://arxiv.org/abs/1810.02781]
> More specifically, in our use case this would be very useful to maintain an
> evolving graph while allowing for specific logic on challenges such as _when_
> and _how_ to integrate updates in the graph and also how to represent it.
> Furthermore, it would also be an enabler for rich use-cases that have synergy
> with this existing Jira issue pertaining graph partitioning:
> FLINK-1536 - Graph partitioning operators for Gelly
> *Problem.*
> While it would be negligible to write the results to disk and then read them
> back in a new job to be sent to the JobManager if they are small, this
> becomes prohibitive if there are several gigabytes of data to write/read and
> using a distributed storage (e.g. HDFS) is not an option.
> Even if there is a distributed storage available, as the number of sequential
> jobs increases, even the benefits of the secondary storage being distributed
> will diminish.
> *Existing alternatives.*
> I also considered, as a possibility, to compose the sequence of jobs in a
> single big job to submit to the JobManager, thus allowing reuse of results
> due to the natural forwarding of results to subsequent operators in dataflow
> programing.
> However, this becomes difficult due to two reasons:
> * The logic to connect the sequence of jobs may depend on factors external
> to Flink and not known at the start of the job composition.
> This also excludes limited iterative behavior like what is provided in
> {{BulkIteration/DeltaIteration;}}
> ** Composing a job with "too many" operators and inter-dependencies may lead
> to the Optimizer engaging an exponential optimization search space.
> This is particularly true for operators with multiple valid execution
> strategies, leading to a combinatorics problem.
> This leads to the Flink compiler _taking forever_ to even create a plan.
> I believe this is the current situation based on a reply I received from
> [~fhueske] last year.
> His reply was on the 7th of December 2017:
> Link:
> [http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
> Mailing list thread title: "Re: How to perform efficient DataSet reuse
> between iterations"
>
> *Idea.*
> Perhaps the better way to describe this *CacheOperator* feature is the
> concept of "_job chaining_", where a new type of DataSink would receive data
> that will:
> - Be available to a subsequent job which somehow makes a reference to the
> DataSink of the previous job;
> - Have remained available (from the previous job execution) in the exact
> same TaskManagers in the cluster.
> Likely, the optimal memory distribution will be pretty similar between
> chained jobs - if the data was read from disk again between jobs, it would
> likely be distributed with the same (automatic or not) strategies, hence the
> same distribution would likely be of use to sequential jobs.
> *Design.*
> Potential conflicts with the current Flink cluster execution model:
> - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job
> finishes in local mode, so it would be necessary to change local mode to keep
> a FlinkMiniCluster alive - what was the reasoning behind destroying it?
> Simplifying the implementation?
> - How would this look like in the API?
> I envisioned an example like this:
> {{DataSet<Tuple2<Long, Long>> previousResult =
> callSomeFlinkDataflowOperator(); // The result of some previous computation.}}
> {{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
> {{... // Other operations...}}
> {{environment.execute();}}
> {{... // The previous job has finished.}}
> {{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet,
> which resulted from callSomeFlinkDataflowOperator() int the previous Flink
> job, remained in memory.}}
> {{environment.execute(); // Trigger a different job whose data depends on
> the previous one.}}
> Besides adding appropriate classes to the Flink Java API, implementing this
> feature would require changing things so that:
> * JobManagers are aware that a completed job had cached operators - likely a
> new COMPLETED_AND_REUSABLE job state?
> * TaskManagers must keep references to the Flink memory management segments
> associated to the CacheOperator data;
> * CacheOperator must have a default number of usages and/or amount of time
> to be kept alive (I think both options should exist but the user may choose
> whether to use one or both);
> * Cluster coordination: should the JobManager be the entity that ultimately
> triggers the memory eviction order on the TaskManagers associated to a job
> with COMPLETED_AND_REUSABLE status?
>
> *Related work.*
> In Spark I believe the existing cache() operator does something similar to
> what I propose:
> [https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
> [https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)