[ https://issues.apache.org/jira/browse/FLINK-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephan Ewen resolved FLINK-1018. --------------------------------- Resolution: Fixed Fix Version/s: 0.9 Fixed via 9c77f0785e43326521da5e535f9ab1f05a9c6280 > Logistic Regression deadlocks > ----------------------------- > > Key: FLINK-1018 > URL: https://issues.apache.org/jira/browse/FLINK-1018 > Project: Flink > Issue Type: Bug > Reporter: Markus Holzemer > Fix For: 0.9 > > Attachments: LogisticRegression.java > > > We are currently running our implementation of logistic regression with batch > gradient descent on the cluster. > Unfortunatelly for datasets > 1GB it seems to deadlock inside of the > iteration. This means the first iteration is never finished. > The iteration does a map over all points, the map gets the iteration input as > broadcast variable. The result of the map is reduced and the result of the > reducer (1 tuple) is crossed with the iteration input. > There should be no reason for the deadlock, since the data is still quite > small compared to the cluster size (4 nodes a 32GB). Also the datasize stays > constant throughout the algorithm. > Here is the generated plan. I will also attach the full algorithm. > {code} > { > "nodes": [ > { > "id": 2, > "type": "source", > "pact": "Data Source", > "contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, > 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.", > "parallelism": "1", > "subtasks_per_instance": "1", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "0.0 B" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "step_function": [ > { > "id": 8, > "type": "source", > "pact": "Data Source", > "contents": "TextInputFormat > (hdfs://cloud-7:45010/tmp/input/higgs.M.txt) - UTF-8", > "parallelism": "64", > "subtasks_per_instance": "16", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "8.0.31 GB" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "8.0.31 GB" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 7, > "type": "pact", > "pact": "Map", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 8, "ship_strategy": "Forward"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 11, > "type": "pact", > "pact": "Map", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 7, "ship_strategy": "Forward"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 10, > "type": "pact", > "pact": "Reduce", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 11, "ship_strategy": "Forward"} > ], > "driver_strategy": "Reduce All", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 9, > "type": "pact", > "pact": "Reduce", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2", > "parallelism": "1", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 10, "ship_strategy": "Redistribute"} > ], > "driver_strategy": "Reduce All", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 12, > "type": "pact", > "pact": "Bulk Partial Solution", > "contents": "Partial Solution", > "parallelism": "64", > "subtasks_per_instance": "16", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "0.0 B" }, > { "name": "Cumulative Disk I/O", "value": "0.0 B" }, > { "name": "Cumulative CPU", "value": "0.0 " } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 6, > "type": "pact", > "pact": "Map", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 7, "side": "first", "ship_strategy": "Forward", > "temp_mode": "CACHED"}, > {"id": 9, "side": "second", "ship_strategy": > "Broadcast"}, > {"id": 12, "side": "second", "ship_strategy": > "Broadcast"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 5, > "type": "pact", > "pact": "Reduce", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 6, "ship_strategy": "Forward"} > ], > "driver_strategy": "Reduce All", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "109.90 M" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 4, > "type": "pact", > "pact": "Reduce", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4", > "parallelism": "1", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 5, "ship_strategy": "Redistribute"} > ], > "driver_strategy": "Reduce All", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 3, > "type": "pact", > "pact": "Cross", > "contents": > "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 4, "side": "first", "ship_strategy": > "Broadcast"}, > {"id": 12, "side": "second", "ship_strategy": > "Forward", "temp_mode": "PIPELINE_BREAKER"} > ], > "driver_strategy": "Nested Loops (Blocked Outer: > de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > } > ], > "partial_solution": 12, > "next_partial_solution": 3, > "id": 1, > "type": "bulk_iteration", > "pact": "Bulk Iteration", > "contents": "Bulk Iteration", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 2, "ship_strategy": "Redistribute"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > }, > { > "id": 0, > "type": "sink", > "pact": "Data Sink", > "contents": "TextOutputFormat > (hdfs://cloud-7:45010/tmp/output/logreg) - UTF-8", > "parallelism": "64", > "subtasks_per_instance": "16", > "predecessors": [ > {"id": 1, "ship_strategy": "Forward"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } > ], > "costs": [ > { "name": "Network", "value": "0.0 B" }, > { "name": "Disk I/O", "value": "0.0 B" }, > { "name": "CPU", "value": "0.0 " }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": > "(none)" }, > { "name": "Filter Factor", "value": "(none)" } > ] > } > ] > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)