Repository: flink Updated Branches: refs/heads/master b01d737ae -> e515b9bb2
[FLINK-5867] [flip-1] Add tests for pipelined failover region construction Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/166a3f87 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/166a3f87 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/166a3f87 Branch: refs/heads/master Commit: 166a3f877b8875dd4a3c2138f802241de7d9d2f8 Parents: 4eb9e46 Author: shuai.xus <[email protected]> Authored: Thu Apr 20 23:56:53 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Sat May 6 21:40:04 2017 +0200 ---------------------------------------------------------------------- .../PipelinedFailoverRegionBuildingTest.java | 644 +++++++++++++++++++ 1 file changed, 644 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/166a3f87/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java new file mode 100644 index 0000000..55bf711 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java @@ -0,0 +1,644 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +/** + * Tests that make sure that the building of pipelined connected failover regions works + * correctly. + */ +public class PipelinedFailoverRegionBuildingTest extends TestLogger { + + /** + * Tests that validates that a graph with single unconnected vertices works correctly. + * + * <pre> + * (v1) + * + * (v2) + * + * (v3) + * + * ... + * </pre> + */ + @Test + public void testIndividualVertices() throws Exception { + final JobVertex source1 = new JobVertex("source1"); + source1.setInvokableClass(NoOpInvokable.class); + source1.setParallelism(2); + + final JobVertex source2 = new JobVertex("source2"); + source2.setInvokableClass(NoOpInvokable.class); + source2.setParallelism(2); + + final JobGraph jobGraph = new JobGraph("test job", source1, source2); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion sourceRegion11 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[0]); + FailoverRegion sourceRegion12 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source1.getID()).getTaskVertices()[1]); + FailoverRegion targetRegion21 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[0]); + FailoverRegion targetRegion22 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source2.getID()).getTaskVertices()[1]); + + assertTrue(sourceRegion11 != sourceRegion12); + assertTrue(sourceRegion12 != targetRegion21); + assertTrue(targetRegion21 != targetRegion22); + } + + /** + * Tests that validates that embarrassingly parallel chains of vertices work correctly. + * + * <pre> + * (a1) --> (b1) + * + * (a2) --> (b2) + * + * (a3) --> (b3) + * + * ... + * </pre> + */ + @Test + public void testEmbarrassinglyParallelCase() throws Exception { + int parallelism = 10000; + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(parallelism); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(parallelism); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(parallelism); + + vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion preRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]); + FailoverRegion preRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]); + FailoverRegion preRegion3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]); + + assertTrue(preRegion1 == preRegion2); + assertTrue(preRegion2 == preRegion3); + + for (int i = 1; i < parallelism; ++i) { + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[i]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[i]); + FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[i]); + + assertTrue(region1 == region2); + assertTrue(region2 == region3); + + assertTrue(preRegion1 != region1); + } + } + + /** + * Tests that validates that a single pipelined component via a sequence of all-to-all + * connections works correctly. + * + * <pre> + * (a1) -+-> (b1) -+-> (c1) + * X X + * (a2) -+-> (b2) -+-> (c2) + * X X + * (a3) -+-> (b3) -+-> (c3) + * + * ... + * </pre> + */ + @Test + public void testOneComponentViaTwoExchanges() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(3); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(5); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(2); + + vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[4]); + FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]); + + assertTrue(region1 == region2); + assertTrue(region2 == region3); + } + + /** + * Tests that validates that a single pipelined component via a cascade of joins + * works correctly. + * + * <p>Non-parallelized view: + * <pre> + * (1)--+ + * +--(5)-+ + * (2)--+ | + * +--(7) + * (3)--+ | + * +--(6)-+ + * (4)--+ + * ... + * </pre> + */ + @Test + public void testOneComponentViaCascadeOfJoins() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(8); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(8); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(8); + + final JobVertex vertex4 = new JobVertex("vertex4"); + vertex4.setInvokableClass(NoOpInvokable.class); + vertex4.setParallelism(8); + + final JobVertex vertex5 = new JobVertex("vertex5"); + vertex5.setInvokableClass(NoOpInvokable.class); + vertex5.setParallelism(4); + + final JobVertex vertex6 = new JobVertex("vertex6"); + vertex6.setInvokableClass(NoOpInvokable.class); + vertex6.setParallelism(4); + + final JobVertex vertex7 = new JobVertex("vertex7"); + vertex7.setInvokableClass(NoOpInvokable.class); + vertex7.setParallelism(2); + + vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + + Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator(); + + FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next()); + + while (evs.hasNext()) { + FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next()); + assertTrue(preRegion == region); + } + } + + /** + * Tests that validates that a single pipelined component instance from one source + * works correctly. + * + * <p>Non-parallelized view: + * <pre> + * +--(1) + * +--(5)-+ + * | +--(2) + * (7)--+ + * | +--(3) + * +--(6)-+ + * +--(4) + * ... + * </pre> + */ + @Test + public void testOneComponentInstanceFromOneSource() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(8); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(8); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(8); + + final JobVertex vertex4 = new JobVertex("vertex4"); + vertex4.setInvokableClass(NoOpInvokable.class); + vertex4.setParallelism(8); + + final JobVertex vertex5 = new JobVertex("vertex5"); + vertex5.setInvokableClass(NoOpInvokable.class); + vertex5.setParallelism(4); + + final JobVertex vertex6 = new JobVertex("vertex6"); + vertex6.setInvokableClass(NoOpInvokable.class); + vertex6.setParallelism(4); + + final JobVertex vertex7 = new JobVertex("vertex7"); + vertex7.setInvokableClass(NoOpInvokable.class); + vertex7.setParallelism(2); + + vertex1.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex2.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex3.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex4.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex5.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex6.connectNewDataSetAsInput(vertex7, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph("test job", vertex7, vertex5, vertex6, vertex1, vertex2, vertex3, vertex4); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + + Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator(); + + FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next()); + + while (evs.hasNext()) { + FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next()); + assertTrue(preRegion == region); + } + } + + /** + * <pre> + * (a1) -+-> (b1) -+-> (c1) + * X + * (a2) -+-> (b2) -+-> (c2) + * X + * (a3) -+-> (b3) -+-> (c3) + * + * ^ ^ + * | | + * (pipelined) (blocking) + * + * </pre> + */ + @Test + public void testTwoComponentsViaBlockingExchange() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(3); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(2); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(2); + + vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]); + FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]); + FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]); + + assertTrue(region1 == region2); + assertTrue(region2 != region31); + assertTrue(region32 != region31); + } + + /** + * <pre> + * (a1) -+-> (b1) -+-> (c1) + * X X + * (a2) -+-> (b2) -+-> (c2) + * X X + * (a3) -+-> (b3) -+-> (c3) + * + * ^ ^ + * | | + * (pipelined) (blocking) + * </pre> + */ + @Test + public void testTwoComponentsViaBlockingExchange2() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(3); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(2); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(2); + + vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex3.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[1]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[0]); + FailoverRegion region31 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]); + FailoverRegion region32 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[1]); + + assertTrue(region1 == region2); + assertTrue(region2 != region31); + assertTrue(region32 != region31); + } + + /** + * Cascades of joins with partially blocking, partially pipelined exchanges: + * <pre> + * (1)--+ + * +--(5)-+ + * (2)--+ | + * (block) + * | + * +--(7) + * | + * (block) + * (3)--+ | + * +--(6)-+ + * (4)--+ + * ... + * </pre> + * + * Component 1: 1, 2, 5; component 2: 3,4,6; component 3: 7 + */ + @Test + public void testMultipleComponentsViaCascadeOfJoins() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(8); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(8); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(8); + + final JobVertex vertex4 = new JobVertex("vertex4"); + vertex4.setInvokableClass(NoOpInvokable.class); + vertex4.setParallelism(8); + + final JobVertex vertex5 = new JobVertex("vertex5"); + vertex5.setInvokableClass(NoOpInvokable.class); + vertex5.setParallelism(4); + + final JobVertex vertex6 = new JobVertex("vertex6"); + vertex6.setInvokableClass(NoOpInvokable.class); + vertex6.setParallelism(4); + + final JobVertex vertex7 = new JobVertex("vertex7"); + vertex7.setInvokableClass(NoOpInvokable.class); + vertex7.setParallelism(2); + + vertex5.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex5.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex6.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex6.connectNewDataSetAsInput(vertex4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex7.connectNewDataSetAsInput(vertex5, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + vertex7.connectNewDataSetAsInput(vertex6, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4, vertex5, vertex6, vertex7); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex1.getID()).getTaskVertices()[0]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex2.getID()).getTaskVertices()[5]); + FailoverRegion region5 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex5.getID()).getTaskVertices()[2]); + + assertTrue(region1 == region2); + assertTrue(region1 == region5); + + FailoverRegion region3 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex3.getID()).getTaskVertices()[0]); + FailoverRegion region4 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex4.getID()).getTaskVertices()[5]); + FailoverRegion region6 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex6.getID()).getTaskVertices()[2]); + + assertTrue(region3 == region4); + assertTrue(region3 == region6); + + FailoverRegion region71 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[0]); + FailoverRegion region72 = failoverStrategy.getFailoverRegion(eg.getJobVertex(vertex7.getID()).getTaskVertices()[1]); + + assertTrue(region71 != region72); + assertTrue(region1 != region71); + assertTrue(region1 != region72); + assertTrue(region3 != region71); + assertTrue(region3 != region72); + } + + @Test + public void testDiamondWithMixedPipelinedAndBlockingExchanges() throws Exception { + final JobVertex vertex1 = new JobVertex("vertex1"); + vertex1.setInvokableClass(NoOpInvokable.class); + vertex1.setParallelism(8); + + final JobVertex vertex2 = new JobVertex("vertex2"); + vertex2.setInvokableClass(NoOpInvokable.class); + vertex2.setParallelism(8); + + final JobVertex vertex3 = new JobVertex("vertex3"); + vertex3.setInvokableClass(NoOpInvokable.class); + vertex3.setParallelism(8); + + final JobVertex vertex4 = new JobVertex("vertex4"); + vertex4.setInvokableClass(NoOpInvokable.class); + vertex4.setParallelism(8); + + vertex2.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + vertex3.connectNewDataSetAsInput(vertex1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + vertex4.connectNewDataSetAsInput(vertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + vertex4.connectNewDataSetAsInput(vertex3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final JobGraph jobGraph = new JobGraph("test job", vertex1, vertex2, vertex3, vertex4); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + + Iterator<ExecutionVertex> evs = eg.getAllExecutionVertices().iterator(); + + FailoverRegion preRegion = failoverStrategy.getFailoverRegion(evs.next()); + + while (evs.hasNext()) { + FailoverRegion region = failoverStrategy.getFailoverRegion(evs.next()); + assertTrue(preRegion == region); + } + } + + /** + * This test checks that are strictly co-located vertices are in the same failover region, + * even through they are connected via a blocking pattern. + * This is currently an assumption / limitation of the scheduler. + */ + @Test + public void testBlockingAllToAllTopologyWithCoLocation() throws Exception { + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + source.setParallelism(10); + + final JobVertex target = new JobVertex("target"); + target.setInvokableClass(NoOpInvokable.class); + target.setParallelism(13); + + target.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + source.setSlotSharingGroup(sharingGroup); + target.setSlotSharingGroup(sharingGroup); + + source.setStrictlyCoLocatedWith(target); + + final JobGraph jobGraph = new JobGraph("test job", source, target); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion region1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]); + FailoverRegion region2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]); + + // we use 'assertTrue' here rather than 'assertEquals' because we want to test + // for referential equality, to be on the safe side + assertTrue(region1 == region2); + } + + /** + * This test checks that are strictly co-located vertices are in the same failover region, + * even through they are connected via a blocking pattern. + * This is currently an assumption / limitation of the scheduler. + */ + @Test + public void testPipelinedOneToOneTopologyWithCoLocation() throws Exception { + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + source.setParallelism(10); + + final JobVertex target = new JobVertex("target"); + target.setInvokableClass(NoOpInvokable.class); + target.setParallelism(10); + + target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + source.setSlotSharingGroup(sharingGroup); + target.setSlotSharingGroup(sharingGroup); + + source.setStrictlyCoLocatedWith(target); + + final JobGraph jobGraph = new JobGraph("test job", source, target); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + + RestartPipelinedRegionStrategy failoverStrategy = (RestartPipelinedRegionStrategy) eg.getFailoverStrategy(); + FailoverRegion sourceRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[0]); + FailoverRegion sourceRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(source.getID()).getTaskVertices()[1]); + FailoverRegion targetRegion1 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[0]); + FailoverRegion targetRegion2 = failoverStrategy.getFailoverRegion(eg.getJobVertex(target.getID()).getTaskVertices()[1]); + + // we use 'assertTrue' here rather than 'assertEquals' because we want to test + // for referential equality, to be on the safe side + assertTrue(sourceRegion1 == sourceRegion2); + assertTrue(sourceRegion2 == targetRegion1); + assertTrue(targetRegion1 == targetRegion2); + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws JobException, JobExecutionException { + // configure the pipelined failover strategy + final Configuration jobManagerConfig = new Configuration(); + jobManagerConfig.setString( + JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, + FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME); + + return ExecutionGraphBuilder.buildGraph( + null, + jobGraph, + jobManagerConfig, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + mock(SlotProvider.class), + PipelinedFailoverRegionBuildingTest.class.getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + Time.seconds(10), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + 1000, + log); + } +}
