[
https://issues.apache.org/jira/browse/FLINK-1018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen resolved FLINK-1018.
---------------------------------
Resolution: Fixed
Fix Version/s: 0.9
Fixed via 9c77f0785e43326521da5e535f9ab1f05a9c6280
> Logistic Regression deadlocks
> -----------------------------
>
> Key: FLINK-1018
> URL: https://issues.apache.org/jira/browse/FLINK-1018
> Project: Flink
> Issue Type: Bug
> Reporter: Markus Holzemer
> Fix For: 0.9
>
> Attachments: LogisticRegression.java
>
>
> We are currently running our implementation of logistic regression with batch
> gradient descent on the cluster.
> Unfortunatelly for datasets > 1GB it seems to deadlock inside of the
> iteration. This means the first iteration is never finished.
> The iteration does a map over all points, the map gets the iteration input as
> broadcast variable. The result of the map is reduced and the result of the
> reducer (1 tuple) is crossed with the iteration input.
> There should be no reason for the deadlock, since the data is still quite
> small compared to the cluster size (4 nodes a 32GB). Also the datasize stays
> constant throughout the algorithm.
> Here is the generated plan. I will also attach the full algorithm.
> {code}
> {
> "nodes": [
> {
> "id": 2,
> "type": "source",
> "pact": "Data Source",
> "contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
> 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "0.0 B" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "step_function": [
> {
> "id": 8,
> "type": "source",
> "pact": "Data Source",
> "contents": "TextInputFormat
> (hdfs://cloud-7:45010/tmp/input/higgs.M.txt) - UTF-8",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "8.0.31 GB" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "8.0.31 GB" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 7,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 8, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "8.0.31 GB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 11,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 7, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 10,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 11, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 9,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "predecessors": [
> {"id": 10, "ship_strategy": "Redistribute"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "4.0.15 GB" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 12,
> "type": "pact",
> "pact": "Bulk Partial Solution",
> "contents": "Partial Solution",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "0.0 B" },
> { "name": "Cumulative Disk I/O", "value": "0.0 B" },
> { "name": "Cumulative CPU", "value": "0.0 " }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 6,
> "type": "pact",
> "pact": "Map",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 7, "side": "first", "ship_strategy": "Forward",
> "temp_mode": "CACHED"},
> {"id": 9, "side": "second", "ship_strategy":
> "Broadcast"},
> {"id": 12, "side": "second", "ship_strategy":
> "Broadcast"}
> ],
> "driver_strategy": "Map",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 5,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 6, "ship_strategy": "Forward"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "109.90 M" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 4,
> "type": "pact",
> "pact": "Reduce",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4",
> "parallelism": "1",
> "subtasks_per_instance": "1",
> "predecessors": [
> {"id": 5, "ship_strategy": "Redistribute"}
> ],
> "driver_strategy": "Reduce All",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 3,
> "type": "pact",
> "pact": "Cross",
> "contents":
> "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 4, "side": "first", "ship_strategy":
> "Broadcast"},
> {"id": 12, "side": "second", "ship_strategy":
> "Forward", "temp_mode": "PIPELINE_BREAKER"}
> ],
> "driver_strategy": "Nested Loops (Blocked Outer:
> de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)",
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> }
> ],
> "partial_solution": 12,
> "next_partial_solution": 3,
> "id": 1,
> "type": "bulk_iteration",
> "pact": "Bulk Iteration",
> "contents": "Bulk Iteration",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 2, "ship_strategy": "Redistribute"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "(unknown)" },
> { "name": "Disk I/O", "value": "(unknown)" },
> { "name": "CPU", "value": "(unknown)" },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> },
> {
> "id": 0,
> "type": "sink",
> "pact": "Data Sink",
> "contents": "TextOutputFormat
> (hdfs://cloud-7:45010/tmp/output/logreg) - UTF-8",
> "parallelism": "64",
> "subtasks_per_instance": "16",
> "predecessors": [
> {"id": 1, "ship_strategy": "Forward"}
> ],
> "global_properties": [
> { "name": "Partitioning", "value": "RANDOM" },
> { "name": "Partitioning Order", "value": "(none)" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "local_properties": [
> { "name": "Order", "value": "(none)" },
> { "name": "Grouping", "value": "not grouped" },
> { "name": "Uniqueness", "value": "not unique" }
> ],
> "estimates": [
> { "name": "Est. Output Size", "value": "(unknown)" },
> { "name": "Est. Cardinality", "value": "(unknown)" }
> ],
> "costs": [
> { "name": "Network", "value": "0.0 B" },
> { "name": "Disk I/O", "value": "0.0 B" },
> { "name": "CPU", "value": "0.0 " },
> { "name": "Cumulative Network", "value": "(unknown)" },
> { "name": "Cumulative Disk I/O", "value": "(unknown)" },
> { "name": "Cumulative CPU", "value": "(unknown)" }
> ],
> "compiler_hints": [
> { "name": "Output Size (bytes)", "value": "(none)" },
> { "name": "Output Cardinality", "value": "(none)" },
> { "name": "Avg. Output Record Size (bytes)", "value":
> "(none)" },
> { "name": "Filter Factor", "value": "(none)" }
> ]
> }
> ]
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)