[
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16655692#comment-16655692
]
Fabian Hueske commented on FLINK-10566:
---------------------------------------
This issue only affects DataSet programs. The DataStream API does not optimize
the plans.
To be honest, this might be a major issue. The optimizer has not been developed
for quite some time (besides some bugfixes) and changing the plan enumeration
logic is not really easy.
One thing that might work, is to run the optimizer for subplans and connect the
optimal subplans. However, we would need to identify the edges at which we want
to split the plan. One challenge here is that the DataSet API allows for
branching and merging plans (in contrast to most relational optimizers).
What you also could try to do is to pass optimizer hints yourself. That would
reduce the search space because the execution strategies would be be fixed.
> Flink Planning is exponential in the number of stages
> -----------------------------------------------------
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 1.5.4
> Reporter: Robert Bradshaw
> Priority: Major
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The
> execution itself is still sub-second, but the job submission takes
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my
> real-world workloads (with depth up to 10 and width up, and past, 50). On
> Flink it seems getting width beyond width 10 is problematic (times out after
> hours). Note the log scale on the chart for time.
>
> {code:java}
> public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> input = env.fromElements("a", "b", "c");
> DataSet<String> stats = null;
> for (int i = 0; i < depth; i++) {
> stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
> }
> public static DataSet<String> analyze(DataSet<String> input,
> DataSet<String> stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
> final int ii = i;
> if (stats != null) {
> input = input.map(new RichMapFunction<String, String>() {
> @Override
> public void open(Configuration parameters) throws Exception {
> Collection<String> broadcastSet =
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
> return value;
> }
> }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
> }
> DataSet<String> branch = input
> .map(s -> new Tuple2<Integer, String>(0, s +
> ii))
> .groupBy(0)
> .minBy(1)
> .map(kv -> kv.f1);
> if (stats == null) {
> stats = branch;
> } else {
> stats = stats.union(branch);
> }
> }
> return stats.map(s -> "(" + s + ").stats");
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)