[ 
https://issues.apache.org/jira/browse/FLINK-1100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-1100.
-----------------------------------
    Resolution: Won't Fix

Closing ticket. The optimizer is not maintained as such.

> Optimization oportunity missed
> ------------------------------
>
>                 Key: FLINK-1100
>                 URL: https://issues.apache.org/jira/browse/FLINK-1100
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataSet
>    Affects Versions: 0.6-incubating, 0.7.0-incubating
>            Reporter: Asterios Katsifodimos
>            Priority: Minor
>              Labels: performance
>
> The Optimizer does not see an optimization opportunity. 
> The program I used is the transitive closure of v0.7-incubation and replaced 
> the groupBy.reduce with a simple distinct. 
> The resulting plan (JSON here: 
> https://gist.github.com/asteriosk/7a04cfd19537395eb401, also in the end of 
> the bug, misses an optimization opportunity: the sorted groupReduce can 
> receive an input partitioned on field 0 and sort on 1 in order to apply the 
> distinct function. As a result, the partitioning (on 0) can be reused to 
> forward the results to the input of the next iteration instead of 
> repartitioning.
> {code:javascript}
> {
>       "nodes": [
>  
>       {
>               "id": 2,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "[(1, 2), (2, 3), (2, 4), (3, 5), (6, 7), (8, 9), 
> (8, 10), (5, 11), (11, 12), (10, 13), (9, 14), (13,",
>               "parallelism": "1",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "step_function": [
>       {
>               "id": 6,
>               "type": "pact",
>               "pact": "Bulk Partial Solution",
>               "contents": "Partial Solution",
>               "parallelism": "4",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "[1:ASC]" },
>                       { "name": "Grouped on", "value": "[1]" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 5,
>               "type": "pact",
>               "pact": "Join",
>               "contents": 
> "org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 6, "side": "first", "ship_strategy": "Hash 
> Partition on [1]"},
>                       {"id": 2, "side": "second", "ship_strategy": "Hash 
> Partition on [0]"}
>               ],
>               "driver_strategy": "Hybrid Hash (CACHED) (build: [(1, 2), (2, 
> 3), (2, 4), (3, 5), (6, 7), (8, 9), (8, 10), (5, 11), (11, 12), (10, 13), (9, 
> 14), (13,)",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 4,
>               "type": "pact",
>               "pact": "Union",
>               "contents": "",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 5, "side": "first", "ship_strategy": "Hash 
> Partition on [0, 1]"},
>                       {"id": 6, "side": "second", "ship_strategy": "Hash 
> Partition on [0, 1]"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>                       { "name": "Partitioned on", "value": "[0, 1]" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 3,
>               "type": "pact",
>               "pact": "GroupReduce",
>               "contents": 
> "org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 5, "ship_strategy": "Hash Partition on [0, 1]"},
>                       {"id": 6, "ship_strategy": "Hash Partition on [0, 1]"}
>               ],
>               "driver_strategy": "Sorted Group Reduce",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       }
>               ],
>               "partial_solution": 6,
>               "next_partial_solution": 3,
>               "id": 1,
>               "type": "bulk_iteration",
>               "pact": "Bulk Iteration",
>               "contents": "Bulk Iteration",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 2, "ship_strategy": "Redistribute", 
> "local_strategy": "Sort on [1:ASC]"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 0,
>               "type": "sink",
>               "pact": "Data Sink",
>               "contents": "Print to System.out",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 1, "ship_strategy": "Forward"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       }
>       ]
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to