zhuzhurk commented on a change in pull request #9904: [FLINK-14232][runtime]
Support global failure handling in NG scheduling
URL: https://github.com/apache/flink/pull/9904#discussion_r338062348
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java
##########
@@ -40,24 +42,38 @@
*/
public class ExecutionFailureHandlerTest extends TestLogger {
+ private static final long restartDelayMs = 1234L;
+
+ private FailoverTopology failoverTopology;
+
+ private TestFailoverStrategy failoverStrategy;
+
+ private TestRestartBackoffTimeStrategy restartStrategy;
+
+ private ExecutionFailureHandler executionFailureHandler;
+
+ @Before
+ public void setUp() {
+ TestFailoverTopology.Builder topologyBuilder = new
TestFailoverTopology.Builder();
+ topologyBuilder.newVertex();
+ failoverTopology = topologyBuilder.build();
+
+ failoverStrategy = new TestFailoverStrategy();
+ restartStrategy = new TestRestartBackoffTimeStrategy(true,
restartDelayMs);
+ executionFailureHandler = new
ExecutionFailureHandler(failoverTopology, failoverStrategy, restartStrategy);
+ }
+
/**
* Tests the case that task restarting is accepted.
*/
@Test
public void testNormalFailureHandling() {
- // failover strategy which always suggests restarting the given
tasks
- Set<ExecutionVertexID> tasksToRestart = new HashSet<>();
- tasksToRestart.add(new ExecutionVertexID(new JobVertexID(), 0));
- FailoverStrategy failoverStrategy = new
TestFailoverStrategy(tasksToRestart);
-
- // restart strategy which accepts restarting
- boolean canRestart = true;
- long restartDelayMs = 1234;
- RestartBackoffTimeStrategy restartStrategy = new
TestRestartBackoffTimeStrategy(canRestart, restartDelayMs);
- ExecutionFailureHandler executionFailureHandler = new
ExecutionFailureHandler(failoverStrategy, restartStrategy);
+ final Set<ExecutionVertexID> tasksToRestart =
Collections.singleton(
+ new ExecutionVertexID(new JobVertexID(), 0));
+ failoverStrategy.setTasksToRestart(tasksToRestart);
// trigger a task failure
- FailureHandlingResult result =
executionFailureHandler.getFailureHandlingResult(
+ final FailureHandlingResult result =
executionFailureHandler.getFailureHandlingResult(
new ExecutionVertexID(new JobVertexID(), 0),
Review comment:
It's by design to be different so that we are sure it's really returning the
result from failoverStrategy `failoverStrategy.setTasksToRestart`.
This case is to verify task failure handling and is using a
`TestFailoverStrategy` so it does not rely on the `FailoverTopology`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services