akalash commented on a change in pull request #15728:
URL: https://github.com/apache/flink/pull/15728#discussion_r625177400
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java
##########
@@ -149,18 +150,80 @@ public void testComputeWithMultipleLevels() throws
Exception {
}
@Test
- public void testWithTriggeredTasksNotRunning() throws Exception {
+ public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws
Exception {
+ // given: Execution graph builder with one RUNNING source and NOT
RUNNING source.
+ FunctionWithException<JobVertexID, ExecutionGraph, Exception>
twoSourcesBuilder =
+ (id) ->
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(id)
+ .addJobVertex(new JobVertexID())
+ .setTransitToRunning(vertex ->
!vertex.getJobvertexId().equals(id))
+ .build();
+
+ // when: Creating the checkpoint plan.
+ runWithNotRunTask(twoSourcesBuilder);
+
+ // then: The plan failed because one task didn't have RUNNING state.
+ }
+
+ @Test
+ public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws
Exception {
+ // given: Execution graph builder with one NOT RUNNING source and
RUNNING not source task.
+ FunctionWithException<JobVertexID, ExecutionGraph, Exception>
sourceAndNotSourceBuilder =
+ (id) ->
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(id)
+ .addJobVertex(new JobVertexID(), false)
+ .setTransitToRunning(vertex ->
!vertex.getJobvertexId().equals(id))
+ .build();
+
+ // when: Creating the checkpoint plan.
+ runWithNotRunTask(sourceAndNotSourceBuilder);
+
+ // then: The plan failed because one task didn't have RUNNING state.
+ }
+
+ @Test
+ public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws
Exception {
+ // given: Execution graph builder with NOT RUNNING not source and
RUNNING not source task.
+ FunctionWithException<JobVertexID, ExecutionGraph, Exception>
twoNotSourceBuilder =
+ (id) ->
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(id, false)
+ .addJobVertex(new JobVertexID(), false)
+ .setTransitToRunning(vertex ->
!vertex.getJobvertexId().equals(id))
+ .build();
+
+ // when: Creating the checkpoint plan.
+ runWithNotRunTask(twoNotSourceBuilder);
+
+ // then: The plan failed because one task didn't have RUNNING state.
+ }
+
+ @Test
+ public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws
Exception {
+ // given: Execution graph builder with NOT RUNNING not source and
RUNNING source tasks.
+ FunctionWithException<JobVertexID, ExecutionGraph, Exception>
sourceAndNotSourceBuilder =
+ (id) ->
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(id, false)
+ .addJobVertex(new JobVertexID())
+ .setTransitToRunning(vertex ->
!vertex.getJobvertexId().equals(id))
+ .build();
+
+ // when: Creating the checkpoint plan.
+ runWithNotRunTask(sourceAndNotSourceBuilder);
+
+ // then: The plan failed because one task didn't have RUNNING state.
+ }
+
+ private void runWithNotRunTask(
+ FunctionWithException<JobVertexID, ExecutionGraph, Exception>
graphBuilder)
Review comment:
I've rewritten tests mostly as you suggested and I also collapsed them
into one test. Please, take a look.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]