tillrohrmann commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r577695496



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 is(false));
     }
 
-    private static JobGraph getJobGraph() {
+    // 
---------------------------------------------------------------------------------------------
+    // Illegal state behavior tests
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testTriggerSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.triggerSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testStopWithSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.stopWithSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() 
throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.deliverOperatorEventToCoordinator(
+                    new ExecutionAttemptID(), new OperatorID(), new 
TestOperatorEvent());
+            fail("Should have failed with an exception.");
+        } catch (TaskNotRunningException expected) {

Review comment:
       maybe `@Test(expected = TaskNotRunningException.class)`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 is(false));
     }
 
-    private static JobGraph getJobGraph() {
+    // 
---------------------------------------------------------------------------------------------
+    // Illegal state behavior tests
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testTriggerSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.triggerSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testStopWithSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.stopWithSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() 
throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.deliverOperatorEventToCoordinator(
+                    new ExecutionAttemptID(), new OperatorID(), new 
TestOperatorEvent());
+            fail("Should have failed with an exception.");
+        } catch (TaskNotRunningException expected) {
+        }
+    }
+
+    @Test
+    public void 
testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws 
Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.deliverCoordinationRequestToCoordinator(
+                        new OperatorID(), new CoordinationRequest() {}),
+                futureFailedWith(FlinkException.class));
+    }
+
+    @Test
+    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() 
throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.updateTaskExecutionState(
+                        new TaskExecutionStateTransition(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(),
+                                        new ExecutionAttemptID(),
+                                        ExecutionState.FAILED))),
+                is(false));
+    }
+
+    @Test
+    public void testRequestNextInputSplitFailsInIllegalState() throws 
Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new 
ExecutionAttemptID());
+            fail("Should have failed with an exception.");
+        } catch (IOException expected) {
+        }
+    }
+
+    @Test
+    public void testRequestPartitionStateFailsInIllegalState() throws 
Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.requestPartitionState(new IntermediateDataSetID(), new 
ResultPartitionID());
+            fail("Should have failed with an exception.");
+        } catch (PartitionProducerDisposedException expected) {

Review comment:
       Maybe `@Test(expected = PartitionProducerDisposedException.class)`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -291,12 +480,127 @@ public void testHowToHandleFailureUnrecoverableFailure() 
throws Exception {
                 is(false));
     }
 
-    private static JobGraph getJobGraph() {
+    // 
---------------------------------------------------------------------------------------------
+    // Illegal state behavior tests
+    // 
---------------------------------------------------------------------------------------------
+
+    @Test
+    public void testTriggerSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.triggerSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testStopWithSavepointFailsInIllegalState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.stopWithSavepoint("some directory", false),
+                futureFailedWith(CheckpointException.class));
+    }
+
+    @Test
+    public void testDeliverOperatorEventToCoordinatorFailsInIllegalState() 
throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.deliverOperatorEventToCoordinator(
+                    new ExecutionAttemptID(), new OperatorID(), new 
TestOperatorEvent());
+            fail("Should have failed with an exception.");
+        } catch (TaskNotRunningException expected) {
+        }
+    }
+
+    @Test
+    public void 
testDeliverCoordinationRequestToCoordinatorFailsInIllegalState() throws 
Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.deliverCoordinationRequestToCoordinator(
+                        new OperatorID(), new CoordinationRequest() {}),
+                futureFailedWith(FlinkException.class));
+    }
+
+    @Test
+    public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() 
throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, 
mainThreadExecutor).build();
+
+        assertThat(
+                scheduler.updateTaskExecutionState(
+                        new TaskExecutionStateTransition(
+                                new TaskExecutionState(
+                                        jobGraph.getJobID(),
+                                        new ExecutionAttemptID(),
+                                        ExecutionState.FAILED))),
+                is(false));
+    }
+
+    @Test
+    public void testRequestNextInputSplitFailsInIllegalState() throws 
Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(createJobGraph(), 
mainThreadExecutor).build();
+
+        try {
+            scheduler.requestNextInputSplit(JOB_VERTEX.getID(), new 
ExecutionAttemptID());
+            fail("Should have failed with an exception.");
+        } catch (IOException expected) {

Review comment:
       Maybe `@Test(expected = IOException.class)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to