Robert Bradshaw created FLINK-10566:
---------------------------------------
Summary: Flink Planning is exponential in the number of stages
Key: FLINK-10566
URL: https://issues.apache.org/jira/browse/FLINK-10566
Project: Flink
Issue Type: Bug
Components: Optimizer
Affects Versions: 1.5.4
Reporter: Robert Bradshaw
Attachments: chart.png
This makes it nearly impossible to run graphs with 100 or more stages. (The
execution itself is still sub-second, but the job submission takes increasingly
long.)
I can reproduce this with the following pipeline, which resembles my real-world
workloads (with depth up to 10 and width up, and past, 50). On Flink it seems
getting width beyond width 10 is problematic (times out after hours). Note the
log scale on the chart for time.
{code:java}
public static void runPipeline(int depth, int width) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> input = env.fromElements("a", "b", "c");
DataSet<String> stats = null;
for (int i = 0; i < depth; i++) {
stats = analyze(input, stats, width / (i + 1) + 1);
}
stats.writeAsText("out.txt");
env.execute("depth " + depth + " width " + width);
}
public static DataSet<String> analyze(DataSet<String> input, DataSet<String>
stats, int branches) {
System.out.println("analyze " + branches);
for (int i = 0; i < branches; i++) {
final int ii = i;
if (stats != null) {
input = input.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
Collection<String> broadcastSet =
getRuntimeContext().getBroadcastVariable("stats");
}
@Override
public String map(String value) throws Exception {
return value;
}
}).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
}
DataSet<String> branch = input
.map(s -> new Tuple2<Integer, String>(0, s + ii))
.groupBy(0)
.minBy(1)
.map(kv -> kv.f1);
if (stats == null) {
stats = branch;
} else {
stats = stats.union(branch);
}
}
return stats.map(s -> "(" + s + ").stats");
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)