Repository: flink
Updated Branches:
  refs/heads/master f9eac3afd -> 83061ad0f


[FLINK-5623] [runtime] Fix TempBarrier dam has been closed

Properly reset the "pipeline breaker" upon closing.

This closes #3747


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9746846
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9746846
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9746846

Branch: refs/heads/master
Commit: c9746846b357d8ce538ff872cea60c52b1904b43
Parents: f9eac3a
Author: Greg Hogan <[email protected]>
Authored: Thu Apr 20 08:46:01 2017 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 10:15:26 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/graph/library/link_analysis/PageRank.java  | 5 -----
 .../main/java/org/apache/flink/runtime/operators/BatchTask.java | 1 +
 2 files changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
index c5c4178..747735e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -45,7 +45,6 @@ import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.PageRank.Result;
 import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.graph.utils.GraphUtils.IdentityMapper;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.DoubleValue;
@@ -176,10 +175,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, 
Result<K>> {
                        .run(new VertexDegrees<K, VV, EV>()
                                .setParallelism(parallelism));
 
-               // prevent Exception "The dam has been closed." in TempBarrier
-               // for a simplified Graph as in PageRankITCase (see FLINK-5623)
-               vertexDegree = vertexDegree.map(new IdentityMapper<Vertex<K, 
Degrees>>());
-
                // vertex count
                DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9746846/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index f748079..87b0a76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -884,6 +884,7 @@ public class BatchTask<S extends Function, OT> extends 
AbstractInvokable impleme
                                        // close the async barrier if there is 
one
                                        if (this.tempBarriers[i] != null) {
                                                this.tempBarriers[i].close();
+                                               this.tempBarriers[i] = null;
                                        }
 
                                        // recreate the local strategy

Reply via email to