Miguel E. Coimbra created FLINK-10867:
-----------------------------------------

             Summary: Add a DataSet-based CacheOperator to reuse results 
between jobs
                 Key: FLINK-10867
                 URL: https://issues.apache.org/jira/browse/FLINK-10867
             Project: Flink
          Issue Type: New Feature
          Components: Cluster Management, DataSet API, Local Runtime
    Affects Versions: 1.8.0
            Reporter: Miguel E. Coimbra
             Fix For: 1.8.0


*Motivation.*

There are job scenarios where Flink batch processing users may be interested in 
processing a large amount of data, outputting results to disk and then reusing 
the results for another type of computation in Flink again.

This feature suggestion emerged from my work as a PhD researcher working on 
graph stream processing.

https://arxiv.org/abs/1810.02781

More specifically, in our use case this would be very useful to maintain an 
evolving graph while allowing for specific logic on challenges such as _when_ 
and _how_ to integrate updates in the graph and also how to represent it.

Furthermore, it would also be an enabler for rich use-cases that have synergy 
with this existing Jira issue pertaining graph partitioning:

https://issues.apache.org/jira/browse/FLINK-1536?jql=project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20AND%20description%20~%20gelly%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

*Problem.* 

While it would be negligible to write the results to disk and then read back in 
in a new job to send to the JobManager if they are small, this becomes 
prohibitive if there are several gigabytes of data to write/read and using a 
distributed storage (e.g. HDFS) is not an option.

Even if there is a distributed storage available, as the number of sequential 
jobs increases, even the benefits of the secondary storage being distributed 
will diminish.

*Existing alternatives.*

I also considered, as a possibility, to compose the sequence of jobs in a 
single big job to submit to the JobManager, thus allowing reuse of results due 
to the natural forwarding of results to subsequent operators in dataflow 
programing.

However, this becomes difficult due to two reasons:
 * The logic to connect the sequence of jobs may depend on factors external to 
Flink and not known at the start of the job composition.
This also excludes limited iterative behavior like what is provided in 
{{BulkIteration/DeltaIteration;}}
 * Composing a job with "too many" operators and inter-dependencies may lead to 
the Optimizer engaging an exponential optimization search space.
 This is particularly true for operators with multiple valid execution 
strategies, leading to a combinatorics problem.
This leads to the Flink compiler _taking forever_ to even create a plan.
 I believe this is the current situation based on a reply I received from 
Fabian Hueske last year.
 His reply was on the 7th of December 2017:
 Link: 
[http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/raw/%3CCAAdrtT0FprAbTEFmv3qsgy%2BBFEXtxBBxM_0VS%3DfDsMrzAqY9ew%40mail.gmail.com%3E]
 Mailing list thread title: "Re: How to perform efficient DataSet reuse between 
iterations"

 

*Idea.*

Perhaps the better way to describe this *CachingOperator* feature is the 
concept of "_job chaining_", where a new type of DataSink would receive data 
that will:
 - Be available to a subsequent job which somehow makes a reference to the 
DataSink of the previous job;
 - Have remained available (from the previous job execution) in the exact same 
TaskManagers in the cluster.

Likely, the optimal memory distribution will be pretty similar between chained 
jobs - if the data was read from disk again between jobs, it would likely be 
distributed with the same (automatic or not) strategies, hence the same 
distribution would likely be of use to sequential jobs.



*Design.*

Potential conflicts with the current Flink cluster execution model:
 - The FlinkMiniCluster used with LocalEnvironment is destroyed when a job 
finishes in local mode, so it would be necessary to change local mode to keep a 
FlinkMiniCluster alive - what was the reasoning behind destroying it?
 Simplifying the implementation?

 - How would this look like in the API?
 I envisioned an example like this:

{{DataSet<Tuple2<Long, Long>> previousResult = callSomeFlinkDataflowOperator(); 
// The result of some previous computation.}}
{{CacheOperator<DataSet<Tuple2<Long, Long>>> op = previousResult.cache();}}
{{... // Other operations...}}
{{environment.execute();}}
{{... // The previous job has finished.}}
{{DataSet<Tuple2<Long, Long>> sorted = op.sort(0); // the previous DataSet, 
which resulted from callSomeFlinkDataflowOperator() int the previous Flink job, 
remained in memory.}}
{{environment.execute(); // Trigger a different job whose data depends on the 
previous one.}}

Besides adding appropriate classes to the Flink Java API, implementing this 
feature would require changing things so that:
 * JobManagers are aware that a completed job had cached operators - likely a 
new COMPLETED_AND_REUSABLE job state?
 * TaskManagers must keep references to the Flink memory management segments 
associated to the CacheOperator data;
 * CacheOperator must have a default number of usages and/or amount of time to 
be kept alive (I think both options should exist but the user may choose 
whether to use one or both);
 * Cluster coordination: should the JobManager be the entity that ultimately 
triggers the memory eviction order on the TaskManagers associated to a job with 
COMPLETED_AND_REUSABLE status?

 

*Related work.*

In Spark I believe the existing cache() operator does something similar to what 
I propose:

[https://spark.apache.org/docs/latest/graphx-programming-guide.html#caching-and-uncaching]

[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED]]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to