[ 
https://issues.apache.org/jira/browse/FLINK-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steffen Dienst updated FLINK-7593:
----------------------------------
    Description: 
Under specific circumstances Flink seems to generate an execution plan that is 
incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
contained multiple entries per group, the grouping did not occur. After some 
work I managed to reduce the relevant part of our code to the minimal test case 
below. Be careful: All parts need to be present, even the irrelevant secondary 
output. If I remove anything else Flink generates correct code (either by 
introducing a combiner node prior to the reducer or by using "Sum (combine))" 
an the edge before the reducer.

{code:java}
import java.util.ArrayList;
import java.util.Collection;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;

public class FlinkOptimizerBug {

  public static void main(String[] args) throws Exception {

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<Long, Long>> x = 
        env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
LongValue.class)
        .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
        .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
LongValue.class)
            .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
        .where(0).equalTo(0).with((t1,t2) -> t1)
        .union(env.fromParallelCollection(new 
LongValueSequenceIterator(0,1000), LongValue.class)
            .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
        .map(l->l)
        .withForwardedFields("f0;f1");
    
    Collection  out = new ArrayList();
    x.output(new LocalCollectionOutputFormat<>(out ));
    
    x.groupBy(0)
    .sum(1) //BUG: this will not be grouped correctly, so there will be 
multiple outputs per group!
    .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
    .setParallelism(1);
    env.setParallelism(4);
    
    System.out.println(env.getExecutionPlan());
    env.execute();
  }
}
{code}

Invalid execution plan generated:

{code:javascript}
{
        "nodes": [

        {
                "id": 5,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 4,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:24)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 5, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 7,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 6,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:26)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 7, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 3,
                "type": "pact",
                "pact": "Join",
                "contents": "Join at main(FlinkOptimizerBug.java:27)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 4, "side": "first", "ship_strategy": "Hash 
Partition on [0]", "exchange_mode": "PIPELINED"},
                        {"id": 6, "side": "second", "ship_strategy": "Hash 
Partition on [0]", "exchange_mode": "PIPELINED"}
                ],
                "driver_strategy": "Hybrid Hash (build: Map at 
main(FlinkOptimizerBug.java:24) (id: 4))",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 9,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 8,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:29)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 9, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 2,
                "type": "pact",
                "pact": "Union",
                "contents": "",
                "parallelism": "4",
                "predecessors": [
                        {"id": 3, "side": "first", "ship_strategy": "Forward", 
"exchange_mode": "PIPELINED"},
                        {"id": 8, "side": "second", "ship_strategy": "Forward", 
"exchange_mode": "PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 1,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:30)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 2, "ship_strategy": "Hash Partition on [0]", 
"local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 0,
                "type": "sink",
                "pact": "Data Sink",
                "contents": 
"org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982",
                "parallelism": "4",
                "predecessors": [
                        {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 11,
                "type": "pact",
                "pact": "GroupReduce",
                "contents": "SUM(1), at main(FlinkOptimizerBug.java:35",
                "parallelism": "4",
                "predecessors": [
                        {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Sorted Group Reduce",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 10,
                "type": "sink",
                "pact": "Data Sink",
                "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)",
                "parallelism": "1",
                "predecessors": [
                        {"id": 11, "ship_strategy": "Redistribute", 
"exchange_mode": "PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        }
        ]
}
{code}


  was:
Under specific circumstances Flink seems to generate an execution plan that is 
incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
contained multiple entries per group, the grouping did not occur. After some 
work I managed to reduce the relevant part of our code to the minimal test case 
below. Be careful: All parts need to be present, even the irrelevant secondary 
output. If I remove anything else Flink generates correct code (either by 
introducing a combiner node prior to the reducer or by using "Sum (combine))" 
an the edge before the reducer.

{code:java}
import java.util.ArrayList;
import java.util.Collection;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.LongValueSequenceIterator;

public class FlinkOptimizerBug {

  public static void main(String[] args) throws Exception {

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    DataSet<Tuple2<Long, Long>> x = 
        env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
LongValue.class)
        .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
        .join(env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
LongValue.class)
            .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
        .where(0).equalTo(0).with((t1,t2) -> t1)
        .union(env.fromParallelCollection(new 
LongValueSequenceIterator(0,1000), LongValue.class)
            .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
        .map(l->l)
        .withForwardedFields("f0;f1");
    
    Collection  out = new ArrayList();
    x.output(new LocalCollectionOutputFormat<>(out ));
    
    x.groupBy(0)
    .sum(1) //BUG: this will not be grouped correctly, so there will be 
multiple outputs per group!
    .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
    .setParallelism(1);
    env.setParallelism(4);
    
    System.out.println(env.getExecutionPlan());
    env.execute();
  }
}
{code}

Invalid execution plan generated:

{code:JSON}
{
        "nodes": [

        {
                "id": 5,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 4,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:24)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 5, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 7,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 6,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:26)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 7, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 3,
                "type": "pact",
                "pact": "Join",
                "contents": "Join at main(FlinkOptimizerBug.java:27)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 4, "side": "first", "ship_strategy": "Hash 
Partition on [0]", "exchange_mode": "PIPELINED"},
                        {"id": 6, "side": "second", "ship_strategy": "Hash 
Partition on [0]", "exchange_mode": "PIPELINED"}
                ],
                "driver_strategy": "Hybrid Hash (build: Map at 
main(FlinkOptimizerBug.java:24) (id: 4))",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 9,
                "type": "source",
                "pact": "Data Source",
                "contents": "at 
fromParallelCollection(ExecutionEnvironment.java:870) 
(org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
                "parallelism": "4",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 8,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:29)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 9, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 2,
                "type": "pact",
                "pact": "Union",
                "contents": "",
                "parallelism": "4",
                "predecessors": [
                        {"id": 3, "side": "first", "ship_strategy": "Forward", 
"exchange_mode": "PIPELINED"},
                        {"id": 8, "side": "second", "ship_strategy": "Forward", 
"exchange_mode": "PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 1,
                "type": "pact",
                "pact": "Map",
                "contents": "Map at main(FlinkOptimizerBug.java:30)",
                "parallelism": "4",
                "predecessors": [
                        {"id": 2, "ship_strategy": "Hash Partition on [0]", 
"local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 0,
                "type": "sink",
                "pact": "Data Sink",
                "contents": 
"org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982",
                "parallelism": "4",
                "predecessors": [
                        {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 11,
                "type": "pact",
                "pact": "GroupReduce",
                "contents": "SUM(1), at main(FlinkOptimizerBug.java:35",
                "parallelism": "4",
                "predecessors": [
                        {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
"PIPELINED"}
                ],
                "driver_strategy": "Sorted Group Reduce",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        },
        {
                "id": 10,
                "type": "sink",
                "pact": "Data Sink",
                "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)",
                "parallelism": "1",
                "predecessors": [
                        {"id": 11, "ship_strategy": "Redistribute", 
"exchange_mode": "PIPELINED"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
},
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" }    
        ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": 
"(none)" },
                        { "name": "Filter Factor", "value": "(none)" }          
]
        }
        ]
}
{code}



> Generated plan does not create correct groups
> ---------------------------------------------
>
>                 Key: FLINK-7593
>                 URL: https://issues.apache.org/jira/browse/FLINK-7593
>             Project: Flink
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 1.3.2
>         Environment: Windows 7, Ubuntu 16.04, Flink 1.3.2
>            Reporter: Steffen Dienst
>
> Under specific circumstances Flink seems to generate an execution plan that 
> is incorrect. I was using `groupBy(0).sum(1)` but the resulting csv files 
> contained multiple entries per group, the grouping did not occur. After some 
> work I managed to reduce the relevant part of our code to the minimal test 
> case below. Be careful: All parts need to be present, even the irrelevant 
> secondary output. If I remove anything else Flink generates correct code 
> (either by introducing a combiner node prior to the reducer or by using "Sum 
> (combine))" an the edge before the reducer.
> {code:java}
> import java.util.ArrayList;
> import java.util.Collection;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.types.LongValue;
> import org.apache.flink.util.LongValueSequenceIterator;
> public class FlinkOptimizerBug {
>   public static void main(String[] args) throws Exception {
>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     
>     DataSet<Tuple2<Long, Long>> x = 
>         env.fromParallelCollection(new LongValueSequenceIterator(0,1000), 
> LongValue.class)
>         .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L))
>         .join(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
>             .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4, 1L)))
>         .where(0).equalTo(0).with((t1,t2) -> t1)
>         .union(env.fromParallelCollection(new 
> LongValueSequenceIterator(0,1000), LongValue.class)
>             .map(l -> Tuple2.of(Math.round(Math.random()*1000) % 4,1L)))
>         .map(l->l)
>         .withForwardedFields("f0;f1");
>     
>     Collection  out = new ArrayList();
>     x.output(new LocalCollectionOutputFormat<>(out ));
>     
>     x.groupBy(0)
>     .sum(1) //BUG: this will not be grouped correctly, so there will be 
> multiple outputs per group!
>     .writeAsCsv("/tmp/foo", WriteMode.OVERWRITE)
>     .setParallelism(1);
>     env.setParallelism(4);
>     
>     System.out.println(env.getExecutionPlan());
>     env.execute();
>   }
> }
> {code}
> Invalid execution plan generated:
> {code:javascript}
> {
>       "nodes": [
>       {
>               "id": 5,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>               "parallelism": "4",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 4,
>               "type": "pact",
>               "pact": "Map",
>               "contents": "Map at main(FlinkOptimizerBug.java:24)",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 5, "ship_strategy": "Forward", "exchange_mode": 
> "PIPELINED"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 7,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>               "parallelism": "4",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 6,
>               "type": "pact",
>               "pact": "Map",
>               "contents": "Map at main(FlinkOptimizerBug.java:26)",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 7, "ship_strategy": "Forward", "exchange_mode": 
> "PIPELINED"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 3,
>               "type": "pact",
>               "pact": "Join",
>               "contents": "Join at main(FlinkOptimizerBug.java:27)",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 4, "side": "first", "ship_strategy": "Hash 
> Partition on [0]", "exchange_mode": "PIPELINED"},
>                       {"id": 6, "side": "second", "ship_strategy": "Hash 
> Partition on [0]", "exchange_mode": "PIPELINED"}
>               ],
>               "driver_strategy": "Hybrid Hash (build: Map at 
> main(FlinkOptimizerBug.java:24) (id: 4))",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 9,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "at 
> fromParallelCollection(ExecutionEnvironment.java:870) 
> (org.apache.flink.api.java.io.ParallelIteratorInputFormat)",
>               "parallelism": "4",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 8,
>               "type": "pact",
>               "pact": "Map",
>               "contents": "Map at main(FlinkOptimizerBug.java:29)",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 9, "ship_strategy": "Forward", "exchange_mode": 
> "PIPELINED"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "0.0" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0" },
>                       { "name": "Cumulative CPU", "value": "0.0" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 2,
>               "type": "pact",
>               "pact": "Union",
>               "contents": "",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 3, "side": "first", "ship_strategy": "Forward", 
> "exchange_mode": "PIPELINED"},
>                       {"id": 8, "side": "second", "ship_strategy": "Forward", 
> "exchange_mode": "PIPELINED"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 1,
>               "type": "pact",
>               "pact": "Map",
>               "contents": "Map at main(FlinkOptimizerBug.java:30)",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 2, "ship_strategy": "Hash Partition on [0]", 
> "local_strategy": "Sort on [0:ASC]", "exchange_mode": "PIPELINED"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>                       { "name": "Partitioned on", "value": "[0]" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "[0:ASC]" },
>                       { "name": "Grouped on", "value": "[0]" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 0,
>               "type": "sink",
>               "pact": "Data Sink",
>               "contents": 
> "org.apache.flink.api.java.io.LocalCollectionOutputFormat@52feb982",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
> "PIPELINED"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>                       { "name": "Partitioned on", "value": "[0]" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "[0:ASC]" },
>                       { "name": "Grouped on", "value": "[0]" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 11,
>               "type": "pact",
>               "pact": "GroupReduce",
>               "contents": "SUM(1), at main(FlinkOptimizerBug.java:35",
>               "parallelism": "4",
>               "predecessors": [
>                       {"id": 1, "ship_strategy": "Forward", "exchange_mode": 
> "PIPELINED"}
>               ],
>               "driver_strategy": "Sorted Group Reduce",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "HASH_PARTITIONED" },
>                       { "name": "Partitioned on", "value": "[0]" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "[0:ASC]" },
>                       { "name": "Grouped on", "value": "[0]" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 10,
>               "type": "sink",
>               "pact": "Data Sink",
>               "contents": "CsvOutputFormat (path: /tmp/foo, delimiter: ,)",
>               "parallelism": "1",
>               "predecessors": [
>                       {"id": 11, "ship_strategy": "Redistribute", 
> "exchange_mode": "PIPELINED"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM_PARTITIONED" 
> },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "0.0" },
>                       { "name": "CPU", "value": "0.0" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       }
>       ]
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to