[ 
https://issues.apache.org/jira/browse/FLINK-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-1018.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9

Fixed via 9c77f0785e43326521da5e535f9ab1f05a9c6280

> Logistic Regression deadlocks
> -----------------------------
>
>                 Key: FLINK-1018
>                 URL: https://issues.apache.org/jira/browse/FLINK-1018
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Markus Holzemer
>             Fix For: 0.9
>
>         Attachments: LogisticRegression.java
>
>
> We are currently running our implementation of logistic regression with batch 
> gradient descent on the cluster.
> Unfortunatelly for datasets > 1GB it seems to deadlock inside of the 
> iteration. This means the first iteration is never finished.
> The iteration does a map over all points, the map gets the iteration input as 
> broadcast variable. The result of the map is reduced and the result of the 
> reducer (1 tuple) is crossed with the iteration input.
> There should be no reason for the deadlock, since the data is still quite 
> small compared to the cluster size (4 nodes a 32GB). Also the datasize stays 
> constant throughout the algorithm.
> Here is the generated plan. I will also attach the full algorithm.
> {code}
> {
>       "nodes": [
>       {
>               "id": 2,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
> 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.",
>               "parallelism": "1",
>               "subtasks_per_instance": "1",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0 B" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "step_function": [
>       {
>               "id": 8,
>               "type": "source",
>               "pact": "Data Source",
>               "contents": "TextInputFormat 
> (hdfs://cloud-7:45010/tmp/input/higgs.M.txt) - UTF-8",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "8.0.31 GB" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "8.0.31 GB" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 7,
>               "type": "pact",
>               "pact": "Map",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 8, "ship_strategy": "Forward"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 11,
>               "type": "pact",
>               "pact": "Map",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 7, "ship_strategy": "Forward"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 10,
>               "type": "pact",
>               "pact": "Reduce",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 11, "ship_strategy": "Forward"}
>               ],
>               "driver_strategy": "Reduce All",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 9,
>               "type": "pact",
>               "pact": "Reduce",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
>               "parallelism": "1",
>               "subtasks_per_instance": "1",
>               "predecessors": [
>                       {"id": 10, "ship_strategy": "Redistribute"}
>               ],
>               "driver_strategy": "Reduce All",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 12,
>               "type": "pact",
>               "pact": "Bulk Partial Solution",
>               "contents": "Partial Solution",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "0.0 B" },
>                       { "name": "Cumulative Disk I/O", "value": "0.0 B" },
>                       { "name": "Cumulative CPU", "value": "0.0 " }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 6,
>               "type": "pact",
>               "pact": "Map",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 7, "side": "first", "ship_strategy": "Forward", 
> "temp_mode": "CACHED"},
>                       {"id": 9, "side": "second", "ship_strategy": 
> "Broadcast"},
>                       {"id": 12, "side": "second", "ship_strategy": 
> "Broadcast"}
>               ],
>               "driver_strategy": "Map",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 5,
>               "type": "pact",
>               "pact": "Reduce",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 6, "ship_strategy": "Forward"}
>               ],
>               "driver_strategy": "Reduce All",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "109.90 M" }     
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 4,
>               "type": "pact",
>               "pact": "Reduce",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
>               "parallelism": "1",
>               "subtasks_per_instance": "1",
>               "predecessors": [
>                       {"id": 5, "ship_strategy": "Redistribute"}
>               ],
>               "driver_strategy": "Reduce All",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 3,
>               "type": "pact",
>               "pact": "Cross",
>               "contents": 
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 4, "side": "first", "ship_strategy": 
> "Broadcast"},
>                       {"id": 12, "side": "second", "ship_strategy": 
> "Forward", "temp_mode": "PIPELINE_BREAKER"}
>               ],
>               "driver_strategy": "Nested Loops (Blocked Outer: 
> de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)",
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       }
>               ],
>               "partial_solution": 12,
>               "next_partial_solution": 3,
>               "id": 1,
>               "type": "bulk_iteration",
>               "pact": "Bulk Iteration",
>               "contents": "Bulk Iteration",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 2, "ship_strategy": "Redistribute"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "(unknown)" },
>                       { "name": "Disk I/O", "value": "(unknown)" },
>                       { "name": "CPU", "value": "(unknown)" },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       },
>       {
>               "id": 0,
>               "type": "sink",
>               "pact": "Data Sink",
>               "contents": "TextOutputFormat 
> (hdfs://cloud-7:45010/tmp/output/logreg) - UTF-8",
>               "parallelism": "64",
>               "subtasks_per_instance": "16",
>               "predecessors": [
>                       {"id": 1, "ship_strategy": "Forward"}
>               ],
>               "global_properties": [
>                       { "name": "Partitioning", "value": "RANDOM" },
>                       { "name": "Partitioning Order", "value": "(none)" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "local_properties": [
>                       { "name": "Order", "value": "(none)" },
>                       { "name": "Grouping", "value": "not grouped" },
>                       { "name": "Uniqueness", "value": "not unique" }
>               ],
>               "estimates": [
>                       { "name": "Est. Output Size", "value": "(unknown)" },
>                       { "name": "Est. Cardinality", "value": "(unknown)" }    
>         ],
>               "costs": [
>                       { "name": "Network", "value": "0.0 B" },
>                       { "name": "Disk I/O", "value": "0.0 B" },
>                       { "name": "CPU", "value": "0.0 " },
>                       { "name": "Cumulative Network", "value": "(unknown)" },
>                       { "name": "Cumulative Disk I/O", "value": "(unknown)" },
>                       { "name": "Cumulative CPU", "value": "(unknown)" }
>               ],
>               "compiler_hints": [
>                       { "name": "Output Size (bytes)", "value": "(none)" },
>                       { "name": "Output Cardinality", "value": "(none)" },
>                       { "name": "Avg. Output Record Size (bytes)", "value": 
> "(none)" },
>                       { "name": "Filter Factor", "value": "(none)" }          
> ]
>       }
>       ]
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to