tillrohrmann commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r577695496
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
is(false));
}
- private static JobGraph getJobGraph() {
+ //
---------------------------------------------------------------------------------------------
+ // Illegal state behavior tests
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testTriggerSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.triggerSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testStopWithSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.stopWithSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testDeliverOperatorEventToCoordinatorFailsInIllegalState()
throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.deliverOperatorEventToCoordinator(
+ new ExecutionAttemptID(), new OperatorID(), new
TestOperatorEvent());
+ fail("Should have failed with an exception.");
+ } catch (TaskNotRunningException expected) {
Review comment:
maybe `@Test(expected = TaskNotRunningException.class)`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
is(false));
}
- private static JobGraph getJobGraph() {
+ //
---------------------------------------------------------------------------------------------
+ // Illegal state behavior tests
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testTriggerSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.triggerSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testStopWithSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.stopWithSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testDeliverOperatorEventToCoordinatorFailsInIllegalState()
throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.deliverOperatorEventToCoordinator(
+ new ExecutionAttemptID(), new OperatorID(), new
TestOperatorEvent());
+ fail("Should have failed with an exception.");
+ } catch (TaskNotRunningException expected) {
+ }
+ }
+
+ @Test
+ public void
testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws
Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.deliverCoordinationRequestToCoordinator(
+ new OperatorID(), new CoordinationRequest() {}),
+ futureFailedWith(FlinkException.class));
+ }
+
+ @Test
+ public void testUpdateTaskExecutionStateReturnsFalseInIllegalState()
throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph,
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ new ExecutionAttemptID(),
+ ExecutionState.FAILED))),
+ is(false));
+ }
+
+ @Test
+ public void testRequestNextInputSplitFailsInIllegalState() throws
Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new
ExecutionAttemptID());
+ fail("Should have failed with an exception.");
+ } catch (IOException expected) {
+ }
+ }
+
+ @Test
+ public void testRequestPartitionStateFailsInIllegalState() throws
Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.requestPartitionState(new IntermediateDataSetID(), new
ResultPartitionID());
+ fail("Should have failed with an exception.");
+ } catch (PartitionProducerDisposedException expected) {
Review comment:
Maybe `@Test(expected = PartitionProducerDisposedException.class)`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure()
throws Exception {
is(false));
}
- private static JobGraph getJobGraph() {
+ //
---------------------------------------------------------------------------------------------
+ // Illegal state behavior tests
+ //
---------------------------------------------------------------------------------------------
+
+ @Test
+ public void testTriggerSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.triggerSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testStopWithSavepointFailsInIllegalState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.stopWithSavepoint("some directory", false),
+ futureFailedWith(CheckpointException.class));
+ }
+
+ @Test
+ public void testDeliverOperatorEventToCoordinatorFailsInIllegalState()
throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.deliverOperatorEventToCoordinator(
+ new ExecutionAttemptID(), new OperatorID(), new
TestOperatorEvent());
+ fail("Should have failed with an exception.");
+ } catch (TaskNotRunningException expected) {
+ }
+ }
+
+ @Test
+ public void
testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws
Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.deliverCoordinationRequestToCoordinator(
+ new OperatorID(), new CoordinationRequest() {}),
+ futureFailedWith(FlinkException.class));
+ }
+
+ @Test
+ public void testUpdateTaskExecutionStateReturnsFalseInIllegalState()
throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph,
mainThreadExecutor).build();
+
+ assertThat(
+ scheduler.updateTaskExecutionState(
+ new TaskExecutionStateTransition(
+ new TaskExecutionState(
+ jobGraph.getJobID(),
+ new ExecutionAttemptID(),
+ ExecutionState.FAILED))),
+ is(false));
+ }
+
+ @Test
+ public void testRequestNextInputSplitFailsInIllegalState() throws
Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(createJobGraph(),
mainThreadExecutor).build();
+
+ try {
+ scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new
ExecutionAttemptID());
+ fail("Should have failed with an exception.");
+ } catch (IOException expected) {
Review comment:
Maybe `@Test(expected = IOException.class)`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]