Repository: flink Updated Branches: refs/heads/master f9eac3afd -> 83061ad0f
[FLINK-5623] [runtime] Fix TempBarrier dam has been closed Properly reset the "pipeline breaker" upon closing. This closes #3747 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9746846 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9746846 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9746846 Branch: refs/heads/master Commit: c9746846b357d8ce538ff872cea60c52b1904b43 Parents: f9eac3a Author: Greg Hogan <[email protected]> Authored: Thu Apr 20 08:46:01 2017 -0400 Committer: Stephan Ewen <[email protected]> Committed: Fri Apr 21 10:15:26 2017 +0200 ---------------------------------------------------------------------- .../org/apache/flink/graph/library/link_analysis/PageRank.java | 5 ----- .../main/java/org/apache/flink/runtime/operators/BatchTask.java | 1 + 2 files changed, 1 insertion(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java index c5c4178..747735e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java @@ -45,7 +45,6 @@ import org.apache.flink.graph.asm.result.UnaryResult; import org.apache.flink.graph.library.link_analysis.Functions.SumScore; import org.apache.flink.graph.library.link_analysis.PageRank.Result; import org.apache.flink.graph.utils.GraphUtils; -import org.apache.flink.graph.utils.GraphUtils.IdentityMapper; import org.apache.flink.graph.utils.Murmur3_32; import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet; import org.apache.flink.types.DoubleValue; @@ -176,10 +175,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> { .run(new VertexDegrees<K, VV, EV>() .setParallelism(parallelism)); - // prevent Exception "The dam has been closed." in TempBarrier - // for a simplified Graph as in PageRankITCase (see FLINK-5623) - vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, Degrees>>()); - // vertex count DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree); http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java index f748079..87b0a76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java @@ -884,6 +884,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme // close the async barrier if there is one if (this.tempBarriers[i] != null) { this.tempBarriers[i].close(); + this.tempBarriers[i] = null; } // recreate the local strategy
