azagrebin commented on a change in pull request #13749:
URL: https://github.com/apache/flink/pull/13749#discussion_r512003909
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategyTest.java
##########
@@ -369,35 +252,85 @@ public void
testRegionFailoverForMultipleVerticesRegions() throws Exception {
RestartPipelinedRegionFailoverStrategy strategy = new
RestartPipelinedRegionFailoverStrategy(topology);
- // when v3 fails due to internal error, {v3,v4,v5,v6} should be
restarted
- HashSet<ExecutionVertexID> expectedResult = new HashSet<>();
- expectedResult.add(v3.getId());
- expectedResult.add(v4.getId());
- expectedResult.add(v5.getId());
- expectedResult.add(v6.getId());
- assertEquals(expectedResult,
- strategy.getTasksNeedingRestart(v3.getId(), new
Exception("Test failure")));
-
- // when v3 fails to consume from v2, {v1,v2,v3,v4,v5,v6} should
be restarted
- expectedResult.clear();
- expectedResult.add(v1.getId());
- expectedResult.add(v2.getId());
- expectedResult.add(v3.getId());
- expectedResult.add(v4.getId());
- expectedResult.add(v5.getId());
- expectedResult.add(v6.getId());
- assertEquals(expectedResult,
- strategy.getTasksNeedingRestart(v3.getId(),
- new PartitionConnectionException(
- new ResultPartitionID(
-
v3.getConsumedResults().iterator().next().getId(),
- new ExecutionAttemptID()),
- new Exception("Test failure"))));
+ verifyThatFailedExecution(strategy, v3).restarts(v3, v4, v5,
v6);
+
+ TestingSchedulingResultPartition v2out =
v3.getConsumedResults().iterator().next();
+ verifyThatFailedExecution(strategy, v3)
+ .partitionConnectionCause(v2out)
+ .restarts(v1, v2, v3, v4, v5, v6);
}
- //
------------------------------------------------------------------------
- // utilities
- //
------------------------------------------------------------------------
+ /**
+ * Tests region failover does not restart vertexes which are already in
initial CREATED state.
+ * <pre>
+ * (v1) ---> (v2) --|--> (v3) ---> (v4) --|--> (v5) ---> (v6)
+ *
+ * ^ ^ ^ ^ ^
+ * | | | | |
+ * (pipelined) (blocking) (pipelined) (blocking) (pipelined)
+ * </pre>
+ * Component 1: 1,2; component 2: 3,4; component 3: 5,6
+ */
+ @Test
+ public void testRegionFailoverDoesNotRestartCreatedExecutions() {
+ TestingSchedulingTopology topology = new
TestingSchedulingTopology();
+
+ TestingSchedulingExecutionVertex v1 =
topology.newExecutionVertex(ExecutionState.CREATED);
+ TestingSchedulingExecutionVertex v2 =
topology.newExecutionVertex(ExecutionState.CREATED);
+ TestingSchedulingExecutionVertex v3 =
topology.newExecutionVertex(ExecutionState.CREATED);
+ TestingSchedulingExecutionVertex v4 =
topology.newExecutionVertex(ExecutionState.CREATED);
+ TestingSchedulingExecutionVertex v5 =
topology.newExecutionVertex(ExecutionState.CREATED);
+ TestingSchedulingExecutionVertex v6 =
topology.newExecutionVertex(ExecutionState.CREATED);
+
+ topology.connect(v1, v2, ResultPartitionType.PIPELINED);
+ topology.connect(v2, v3, ResultPartitionType.BLOCKING);
+ topology.connect(v3, v4, ResultPartitionType.PIPELINED);
+ topology.connect(v4, v5, ResultPartitionType.BLOCKING);
+ topology.connect(v5, v6, ResultPartitionType.PIPELINED);
+
+ RestartPipelinedRegionFailoverStrategy strategy = new
RestartPipelinedRegionFailoverStrategy(topology);
+
+ verifyThatFailedExecution(strategy, v3).restarts();
+ TestingSchedulingResultPartition v2out =
v3.getConsumedResults().iterator().next();
+ verifyThatFailedExecution(strategy,
v3).partitionConnectionCause(v2out).restarts();
+ }
+
+ private static VerificationContext verifyThatFailedExecution(
+ FailoverStrategy strategy,
+ SchedulingExecutionVertex executionVertex) {
+ return new VerificationContext(strategy, executionVertex);
+ }
+
+ private static class VerificationContext {
+ private final FailoverStrategy strategy;
+ private final SchedulingExecutionVertex executionVertex;
+ private Throwable cause = new Exception("Test failure");
+
+ private VerificationContext(
+ FailoverStrategy strategy,
+ SchedulingExecutionVertex executionVertex) {
+ this.strategy = strategy;
+ this.executionVertex = executionVertex;
+ }
+
+ private VerificationContext partitionConnectionCause(
Review comment:
true, but I think this improves readability
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]