Miguel E. Coimbra created FLINK-10867:
-----------------------------------------
Summary: Add a DataSet-based CacheOperator to reuse results
between jobs
Key: FLINK-10867
URL: https://issues.apache.org/jira/browse/FLINK-10867
Project: Flink
Issue Type: New Feature
Components: Cluster Management, DataSet API, Local Runtime
Affects Versions: 1.8.0
Reporter: Miguel E. Coimbra
Fix For: 1.8.0
*Motivation.*
There are job scenarios where Flink batch processing users may be interested in
processing a large amount of data, outputting results to disk and then reusing
the results for another type of computation in Flink again.
This feature suggestion emerged from my work as a PhD researcher working on
graph stream processing.
https://arxiv.org/abs/1810.02781
More specifically, in our use case this would be very useful to maintain an
evolving graph while allowing for specific logic on challenges such as _when_
and _how_ to integrate updates in the graph and also how to represent it.
Furthermore, it would also be an enabler for rich use-cases that have synergy
with this existing Jira issue pertaining graph partitioning:
https://issues.apache.org/jira/browse/FLINK-1536?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20description%20~%20gelly%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC
*Problem.*
While it would be negligible to write the results to disk and then read back in
in a new job to send to the JobManager if they are small, this becomes
prohibitive if there are several gigabytes of data to write/read and using a
distributed storage (e.g. HDFS) is not an option.
Even if there is a distributed storage available, as the number of sequential
jobs increases, even the benefits of the secondary storage being distributed
will diminish.
*Existing alternatives.*
I also considered, as a possibility, to compose the sequence of jobs in a
single big job to submit to the JobManager, thus allowing reuse of results due
to the natural forwarding of results to subsequent operators in dataflow
programing.
However, this becomes difficult due to two reasons:
* The logic to connect the sequence of jobs may depend on factors external to
Flink and not known at the start of the job composition.
This also excludes limited iterative behavior like what is provided in
{{BulkIteration/DeltaIteration;}}
* Composing a job with "too many" operators and inter-dependencies may lead to
the Optimizer engaging an exponential optimization search space.
This is particularly true for operators with multiple valid execution
strategies, leading to a combinatorics problem.
This leads to the Flink compiler _taking forever_ to even create a plan.
I believe this is the current situation based on a reply I received from
Fabian Hueske last year.
His reply was on the 7th of December 2017:
Link:
[http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
Mailing list thread title: "Re: How to perform efficient DataSet reuse between
iterations"
*Idea.*
Perhaps the better way to describe this *CachingOperator* feature is the
concept of "_job chaining_", where a new type of DataSink would receive data
that will:
- Be available to a subsequent job which somehow makes a reference to the
DataSink of the previous job;
- Have remained available (from the previous job execution) in the exact same
TaskManagers in the cluster.
Likely, the optimal memory distribution will be pretty similar between chained
jobs - if the data was read from disk again between jobs, it would likely be
distributed with the same (automatic or not) strategies, hence the same
distribution would likely be of use to sequential jobs.
*Design.*
Potential conflicts with the current Flink cluster execution model:
- The FlinkMiniCluster used with LocalEnvironment is destroyed when a job
finishes in local mode, so it would be necessary to change local mode to keep a
FlinkMiniCluster alive - what was the reasoning behind destroying it?
Simplifying the implementation?
- How would this look like in the API?
I envisioned an example like this:
{{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator();
// The result of some previous computation.}}
{{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
{{... // Other operations...}}
{{environment.execute();}}
{{... // The previous job has finished.}}
{{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet,
which resulted from callSomeFlinkDataflowOperator() int the previous Flink job,
remained in memory.}}
{{environment.execute(); // Trigger a different job whose data depends on the
previous one.}}
Besides adding appropriate classes to the Flink Java API, implementing this
feature would require changing things so that:
* JobManagers are aware that a completed job had cached operators - likely a
new COMPLETED_AND_REUSABLE job state?
* TaskManagers must keep references to the Flink memory management segments
associated to the CacheOperator data;
* CacheOperator must have a default number of usages and/or amount of time to
be kept alive (I think both options should exist but the user may choose
whether to use one or both);
* Cluster coordination: should the JobManager be the entity that ultimately
triggers the memory eviction order on the TaskManagers associated to a job with
COMPLETED_AND_REUSABLE status?
*Related work.*
In Spark I believe the existing cache() operator does something similar to what
I propose:
[https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]
[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)