[
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717239#comment-16717239
]
ASF GitHub Bot commented on FLINK-10566:
----------------------------------------
mxm opened a new pull request #7276: [FLINK-10566] Fix exponential planning
time of large programs
URL: https://github.com/apache/flink/pull/7276
## What is the purpose of the change
The traversal of the DAG is not efficient enough at some places which can
lead
to very long plan creation times.
This introduces caching for the traversal to avoid traversing nodes multiple
times.
## Brief change log
Caching is performed at two places:
- when registering Kryo types
- when determining the maximum parallelism
## Verifying this change
This change added tests and can be verified as follows:
Run `LargePlanTest` test to verify.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: no
- The serializers: yes
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Flink Planning is exponential in the number of stages
> -----------------------------------------------------
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 1.5.4, 1.6.1, 1.7.0
> Reporter: Robert Bradshaw
> Assignee: Maximilian Michels
> Priority: Major
> Labels: pull-request-available
> Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The
> execution itself is still sub-second, but the job submission takes
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my
> real-world workloads (with depth up to 10 and width up, and past, 50). On
> Flink it seems getting width beyond width 10 is problematic (times out after
> hours). Note the log scale on the chart for time.
>
> {code:java}
> public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet<String> input = env.fromElements("a", "b", "c");
> DataSet<String> stats = null;
> for (int i = 0; i < depth; i++) {
> stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
> }
> public static DataSet<String> analyze(DataSet<String> input,
> DataSet<String> stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
> final int ii = i;
> if (stats != null) {
> input = input.map(new RichMapFunction<String, String>() {
> @Override
> public void open(Configuration parameters) throws Exception {
> Collection<String> broadcastSet =
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
> return value;
> }
> }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
> }
> DataSet<String> branch = input
> .map(s -> new Tuple2<Integer, String>(0, s +
> ii))
> .groupBy(0)
> .minBy(1)
> .map(kv -> kv.f1);
> if (stats == null) {
> stats = branch;
> } else {
> stats = stats.union(branch);
> }
> }
> return stats.map(s -> "(" + s + ").stats");
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)