rmetzger commented on a change in pull request #14948:
URL: https://github.com/apache/flink/pull/14948#discussion_r586279970



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveSchedulerITCase.java
##########
@@ -88,6 +119,177 @@ public void testGlobalFailoverCanRecoverState() throws 
Exception {
         env.execute();
     }
 
+    private enum StopWithSavepointTestBehavior {
+        NO_FAILURE,
+        FAIL_ON_CHECKPOINT,
+        FAIL_ON_STOP,
+        FAIL_ON_FIRST_CHECKPOINT_ONLY
+    }
+
+    @Test
+    public void testStopWithSavepointNoError() throws Exception {
+        StreamExecutionEnvironment env = 
getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+        assertThat(client.getJobStatus().get(), is(JobStatus.FINISHED));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
+        StreamExecutionEnvironment env =
+                
getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnStop() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_STOP);
+        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
+
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        try {
+            client.stopWithSavepoint(false, 
tempFolder.newFolder("savepoint").getAbsolutePath())
+                    .get();
+            fail("Expect exception");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+        // expect job to run again (maybe restart)
+        CommonTestUtils.waitUntilCondition(
+                () -> client.getJobStatus().get() == JobStatus.RUNNING,
+                Deadline.fromNow(Duration.of(1, ChronoUnit.MINUTES)));
+    }
+
+    @Test
+    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() 
throws Exception {
+        
assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration));
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
+
+        env.setParallelism(PARALLELISM);
+
+        env.addSource(new 
DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY))
+                .addSink(new DiscardingSink<>());
+        DummySource.resetForParallelism(PARALLELISM);
+
+        JobClient client = env.executeAsync();
+
+        DummySource.awaitRunning();
+        DummySource.resetForParallelism(PARALLELISM);
+        final File savepointDirectory = tempFolder.newFolder("savepoint");
+        try {
+            client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+            fail("Expect failure of operation");
+        } catch (ExecutionException e) {
+            assertThat(e.getMessage(), containsString("Failure while stopping 
with savepoint"));
+        }
+
+        // trigger second savepoint
+        DummySource.awaitRunning();
+        final String savepoint =
+                client.stopWithSavepoint(false, 
savepointDirectory.getAbsolutePath()).get();
+        assertThat(savepoint, 
containsString(savepointDirectory.getAbsolutePath()));
+    }
+
+    private static StreamExecutionEnvironment getEnvWithSource(
+            StopWithSavepointTestBehavior behavior) {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(PARALLELISM);
+        env.addSource(new DummySource(behavior)).addSink(new 
DiscardingSink<>());
+        return env;
+    }
+
+    private static final class DummySource extends 
RichParallelSourceFunction<Integer>

Review comment:
       There are 200+ implementations of the SourceFunction interface in Flink, 
most of them in tests. It doesn't seem that there's much sharing of test 
sources going on.
   It seems a bit arbitrary to share the sources between these two tests (feel 
free to point me to specific examples of the `SavepointITCase` where sharing 
this class makes sense)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to