[ https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Steffen Dienst updated FLINK-7593: ---------------------------------- Description: Under specific circumstances Flink seems to generate an execution plan that is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files contained multiple entries per group, the grouping did not occur. After some work I managed to reduce the relevant part of our code to the minimal test case below. Be careful: All parts need to be present, even the irrelevant secondary output. If I remove anything else Flink generates correct code (either by introducing a combiner node prior to the reducer or by using "Sum (combine))" an the edge before the reducer. {code:java} import java.util.ArrayList; import java.util.Collection; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.types.LongValue; import org.apache.flink.util.LongValueSequenceIterator; public class FlinkOptimizerBug { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Long, Long>> x = env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) .where(0).equalTo(0).with((t1,t2) -> t1) .union(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) .map(l->l) .withForwardedFields("f0;f1"); Collection out = new ArrayList(); x.output(new LocalCollectionOutputFormat<>(out )); x.groupBy(0) .sum(1) //BUG: this will not be grouped correctly, so there will be multiple outputs per group! .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) .setParallelism(1); env.setParallelism(4); System.out.println(env.getExecutionPlan()); env.execute(); } } {code} Invalid execution plan generated: {code:javascript} { "nodes": [ { "id": 5, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 4, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:24)", "parallelism": "4", "predecessors": [ {"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 7, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 6, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:26)", "parallelism": "4", "predecessors": [ {"id": 7, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "Join", "contents": "Join at main(FlinkOptimizerBug.java:27)", "parallelism": "4", "predecessors": [ {"id": 4, "side": "first", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"}, {"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Hybrid Hash (build: Map at main(FlinkOptimizerBug.java:24) (id: 4))", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 9, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 8, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:29)", "parallelism": "4", "predecessors": [ {"id": 9, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 2, "type": "pact", "pact": "Union", "contents": "", "parallelism": "4", "predecessors": [ {"id": 3, "side": "first", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}, {"id": 8, "side": "second", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 1, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:30)", "parallelism": "4", "predecessors": [ {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 0, "type": "sink", "pact": "Data Sink", "contents": "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982", "parallelism": "4", "predecessors": [ {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 11, "type": "pact", "pact": "GroupReduce", "contents": "SUM(1), at main(FlinkOptimizerBug.java:35", "parallelism": "4", "predecessors": [ {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Sorted Group Reduce", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 10, "type": "sink", "pact": "Data Sink", "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)", "parallelism": "1", "predecessors": [ {"id": 11, "ship_strategy": "Redistribute", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ] } {code} was: Under specific circumstances Flink seems to generate an execution plan that is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files contained multiple entries per group, the grouping did not occur. After some work I managed to reduce the relevant part of our code to the minimal test case below. Be careful: All parts need to be present, even the irrelevant secondary output. If I remove anything else Flink generates correct code (either by introducing a combiner node prior to the reducer or by using "Sum (combine))" an the edge before the reducer. {code:java} import java.util.ArrayList; import java.util.Collection; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.types.LongValue; import org.apache.flink.util.LongValueSequenceIterator; public class FlinkOptimizerBug { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Tuple2<Long, Long>> x = env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) .where(0).equalTo(0).with((t1,t2) -> t1) .union(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), LongValue.class) .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) .map(l->l) .withForwardedFields("f0;f1"); Collection out = new ArrayList(); x.output(new LocalCollectionOutputFormat<>(out )); x.groupBy(0) .sum(1) //BUG: this will not be grouped correctly, so there will be multiple outputs per group! .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) .setParallelism(1); env.setParallelism(4); System.out.println(env.getExecutionPlan()); env.execute(); } } {code} Invalid execution plan generated: {code:JSON} { "nodes": [ { "id": 5, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 4, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:24)", "parallelism": "4", "predecessors": [ {"id": 5, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 7, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 6, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:26)", "parallelism": "4", "predecessors": [ {"id": 7, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "Join", "contents": "Join at main(FlinkOptimizerBug.java:27)", "parallelism": "4", "predecessors": [ {"id": 4, "side": "first", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"}, {"id": 6, "side": "second", "ship_strategy": "Hash Partition on [0]", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Hybrid Hash (build: Map at main(FlinkOptimizerBug.java:24) (id: 4))", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 9, "type": "source", "pact": "Data Source", "contents": "at fromParallelCollection(ExecutionEnvironment.java:870) (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", "parallelism": "4", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 8, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:29)", "parallelism": "4", "predecessors": [ {"id": 9, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "0.0" }, { "name": "Cumulative Disk I/O", "value": "0.0" }, { "name": "Cumulative CPU", "value": "0.0" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 2, "type": "pact", "pact": "Union", "contents": "", "parallelism": "4", "predecessors": [ {"id": 3, "side": "first", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}, {"id": 8, "side": "second", "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 1, "type": "pact", "pact": "Map", "contents": "Map at main(FlinkOptimizerBug.java:30)", "parallelism": "4", "predecessors": [ {"id": 2, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 0, "type": "sink", "pact": "Data Sink", "contents": "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982", "parallelism": "4", "predecessors": [ {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 11, "type": "pact", "pact": "GroupReduce", "contents": "SUM(1), at main(FlinkOptimizerBug.java:35", "parallelism": "4", "predecessors": [ {"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} ], "driver_strategy": "Sorted Group Reduce", "global_properties": [ { "name": "Partitioning", "value": "HASH_PARTITIONED" }, { "name": "Partitioned on", "value": "[0]" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "[0:ASC]" }, { "name": "Grouped on", "value": "[0]" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 10, "type": "sink", "pact": "Data Sink", "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)", "parallelism": "1", "predecessors": [ {"id": 11, "ship_strategy": "Redistribute", "exchange_mode": "PIPELINED"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "0.0" }, { "name": "CPU", "value": "0.0" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ] } {code} > Generated plan does not create correct groups > --------------------------------------------- > > Key: FLINK-7593 > URL: https://issues.apache.org/jira/browse/FLINK-7593 > Project: Flink > Issue Type: Bug > Components: Optimizer > Affects Versions: 1.3.2 > Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2 > Reporter: Steffen Dienst > > Under specific circumstances Flink seems to generate an execution plan that > is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files > contained multiple entries per group, the grouping did not occur. After some > work I managed to reduce the relevant part of our code to the minimal test > case below. Be careful: All parts need to be present, even the irrelevant > secondary output. If I remove anything else Flink generates correct code > (either by introducing a combiner node prior to the reducer or by using "Sum > (combine))" an the edge before the reducer. > {code:java} > import java.util.ArrayList; > import java.util.Collection; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.io.LocalCollectionOutputFormat; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.core.fs.FileSystem.WriteMode; > import org.apache.flink.types.LongValue; > import org.apache.flink.util.LongValueSequenceIterator; > public class FlinkOptimizerBug { > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); > > DataSet<Tuple2<Long, Long>> x = > env.fromParallelCollection(new LongValueSequenceIterator(0,1000), > LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)) > .join(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))) > .where(0).equalTo(0).with((t1,t2) -> t1) > .union(env.fromParallelCollection(new > LongValueSequenceIterator(0,1000), LongValue.class) > .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L))) > .map(l->l) > .withForwardedFields("f0;f1"); > > Collection out = new ArrayList(); > x.output(new LocalCollectionOutputFormat<>(out )); > > x.groupBy(0) > .sum(1) //BUG: this will not be grouped correctly, so there will be > multiple outputs per group! > .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE) > .setParallelism(1); > env.setParallelism(4); > > System.out.println(env.getExecutionPlan()); > env.execute(); > } > } > {code} > Invalid execution plan generated: > {code:javascript} > { > "nodes": [ > { > "id": 5, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 4, > "type": "pact", > "pact": "Map", > "contents": "Map at main(FlinkOptimizerBug.java:24)", > "parallelism": "4", > "predecessors": [ > {"id": 5, "ship_strategy": "Forward", "exchange_mode": > "PIPELINED"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 7, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 6, > "type": "pact", > "pact": "Map", > "contents": "Map at main(FlinkOptimizerBug.java:26)", > "parallelism": "4", > "predecessors": [ > {"id": 7, "ship_strategy": "Forward", "exchange_mode": > "PIPELINED"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 3, > "type": "pact", > "pact": "Join", > "contents": "Join at main(FlinkOptimizerBug.java:27)", > "parallelism": "4", > "predecessors": [ > {"id": 4, "side": "first", "ship_strategy": "Hash > Partition on [0]", "exchange_mode": "PIPELINED"}, > {"id": 6, "side": "second", "ship_strategy": "Hash > Partition on [0]", "exchange_mode": "PIPELINED"} > ], > "driver_strategy": "Hybrid Hash (build: Map at > main(FlinkOptimizerBug.java:24) (id: 4))", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 9, > "type": "source", > "pact": "Data Source", > "contents": "at > fromParallelCollection(ExecutionEnvironment.java:870) > (org.apache.flink.api.java.io.ParallelIteratorInputFormat)", > "parallelism": "4", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 8, > "type": "pact", > "pact": "Map", > "contents": "Map at main(FlinkOptimizerBug.java:29)", > "parallelism": "4", > "predecessors": [ > {"id": 9, "ship_strategy": "Forward", "exchange_mode": > "PIPELINED"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 2, > "type": "pact", > "pact": "Union", > "contents": "", > "parallelism": "4", > "predecessors": [ > {"id": 3, "side": "first", "ship_strategy": "Forward", > "exchange_mode": "PIPELINED"}, > {"id": 8, "side": "second", "ship_strategy": "Forward", > "exchange_mode": "PIPELINED"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 1, > "type": "pact", > "pact": "Map", > "contents": "Map at main(FlinkOptimizerBug.java:30)", > "parallelism": "4", > "predecessors": [ > {"id": 2, "ship_strategy": "Hash Partition on [0]", > "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 0, > "type": "sink", > "pact": "Data Sink", > "contents": > "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982", > "parallelism": "4", > "predecessors": [ > {"id": 1, "ship_strategy": "Forward", "exchange_mode": > "PIPELINED"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 11, > "type": "pact", > "pact": "GroupReduce", > "contents": "SUM(1), at main(FlinkOptimizerBug.java:35", > "parallelism": "4", > "predecessors": [ > {"id": 1, "ship_strategy": "Forward", "exchange_mode": > "PIPELINED"} > ], > "driver_strategy": "Sorted Group Reduce", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 10, > "type": "sink", > "pact": "Data Sink", > "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)", > "parallelism": "1", > "predecessors": [ > {"id": 11, "ship_strategy": "Redistribute", > "exchange_mode": "PIPELINED"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM_PARTITIONED" > }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > } > ] > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)