[hotfix] Make failover region topological sorted
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ff91be1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ff91be1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ff91be1 Branch: refs/heads/master Commit: 3ff91be1d49cf9f972ad5f1c556af173d97d102e Parents: 3b0fb26 Author: Till Rohrmann <[email protected]> Authored: Thu Oct 26 18:22:43 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Nov 2 17:04:45 2017 +0100 ---------------------------------------------------------------------- .../executiongraph/failover/RestartPipelinedRegionStrategy.java | 5 +++-- .../apache/flink/runtime/executiongraph/FailoverRegionTest.java | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java index 1884d1c..b8f6964 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionStrategy.java @@ -166,8 +166,9 @@ public class RestartPipelinedRegionStrategy extends FailoverStrategy { if (predecessorRegion != thisRegion) { // we need to merge our region and the predecessor's region - thisRegion.addAll(predecessorRegion); - distinctRegions.remove(predecessorRegion); + predecessorRegion.addAll(thisRegion); + distinctRegions.remove(thisRegion); + thisRegion = predecessorRegion; // remap the vertices from that merged region for (ExecutionVertex inPredRegion: predecessorRegion) { http://git-wip-us.apache.org/repos/asf/flink/blob/3ff91be1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index f1e0f7c..4d53e67 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -422,6 +422,7 @@ public class FailoverRegionTest extends TestLogger { v2.setInvokableClass(AbstractInvokable.class); v3.setInvokableClass(AbstractInvokable.class); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v3.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
