[
https://issues.apache.org/jira/browse/FLINK-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Greg Hogan closed FLINK-6110.
-----------------------------
Resolution: Duplicate
> Flink unnecessarily repeats shared work triggered by different blocking
> sinks, leading to massive inefficiency
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6110
> URL: https://issues.apache.org/jira/browse/FLINK-6110
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
>
> After a blocking sink (collect() or count()) is called, all already-computed
> intermediate DataSets are thrown away, and any subsequent code that tries to
> make use of an already-computed DataSet will require the DataSet to be
> computed from scratch. For example, the following code prints the elements a,
> b and c twice in succession, even though the DataSet ds should only have to
> be computed once:
> {code}
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> ds = env.fromElements("a", "b", "c").map(s -> {
> System.out.println(s); return s + s;});
> List<String> c1 = ds.map(s -> s).collect();
> List<String> c2 = ds.map(s -> s).collect();
> env.execute();
> {code}
> This is problematic because not every operation is easy to express in Flink
> using joins and filters -- sometimes for smaller datasets (such as marginal
> counts) it's easier to collect the values into a HashMap, and then pass that
> HashMap into subsequent operations so they can look up the values they need
> directly. A more complex example is the need to sort a set of values, then
> use the sorted array for subsequent binary search operations to identify rank
> -- this is only really possible using an array of sorted values, as long as
> that array fits easily in RAM (there's no way to do binary search as a join
> type) -- so you have to drop out of the Flink pipeline using collect() to
> produce the sorted binary search lookup array.
> However, any collect() or count() operation causes immediate execution of the
> Flink pipeline, which throws away *all* intermediate values that could be
> reused for future executions. As a result, code can be extremely inefficient,
> recomputing the same values over and over again unnecessarily.
> I believe that intermediate values should not be released or garbage
> collected until after env.execute() is called.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)