[ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716864#comment-16716864
 ] 

Maximilian Michels commented on FLINK-10566:
--------------------------------------------

Indeed. It seems to be the Kryo type registration on the plan nodes, the plan 
is already there. For the pipeline which we attempt to run, I believe we could 
skip the Kryo type registration entirely. All types should have their own Beam 
Coders.

I disabled auto Kryo registration via 
{{env.getConfig().disableAutoTypeRegistration()}}. Turns out, the bottle neck 
is _not_ the optimizer. It now gets stuck after optimization when determining 
the maximum parallelism. Seems to be the same symptom as the type registration:

{noformat}
....
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:201)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:282)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:281)
          at 
org.apache.flink.api.common.operators.SingleInputOperator.accept(SingleInputOperator.java:199)
          at 
org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:220)
          at org.apache.flink.api.common.Plan.accept(Plan.java:333)
          at 
org.apache.flink.api.common.Plan.getMaximumParallelism(Plan.java:374)
          at 
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:208)
          - locked <0x4a3> (a java.lang.Object)
          at 
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:91)
          at SlowPlanning.runPipeline(SlowPlanning.java:48)
          at SlowPlanning.main(SlowPlanning.java:34)
{noformat}



> Flink Planning is exponential in the number of stages
> -----------------------------------------------------
>
>                 Key: FLINK-10566
>                 URL: https://issues.apache.org/jira/browse/FLINK-10566
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 1.5.4, 1.6.1, 1.7.0
>            Reporter: Robert Bradshaw
>            Priority: Major
>         Attachments: chart.png
>
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
>     final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>     DataSet<String> input = env.fromElements("a", "b", "c");
>     DataSet<String> stats = null;
>     for (int i = 0; i < depth; i++) {
>       stats = analyze(input, stats, width / (i + 1) + 1);
>     }
>     stats.writeAsText("out.txt");
>     env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet<String> analyze(DataSet<String> input, 
> DataSet<String> stats, int branches) {
>     System.out.println("analyze " + branches);
>     for (int i = 0; i < branches; i++) {
>       final int ii = i;
>       if (stats != null) {
>         input = input.map(new RichMapFunction<String, String>() {
>             @Override
>             public void open(Configuration parameters) throws Exception {
>               Collection<String> broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
>             }
>             @Override
>             public String map(String value) throws Exception {
>               return value;
>             }
>           }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>       }
>       DataSet<String> branch = input
>                                .map(s -> new Tuple2<Integer, String>(0, s + 
> ii))
>                                .groupBy(0)
>                                .minBy(1)
>                                .map(kv -> kv.f1);
>       if (stats == null) {
>         stats = branch;
>       } else {
>         stats = stats.union(branch);
>       }
>     }
>     return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to