XComp commented on code in PR #19968:
URL: https://github.com/apache/flink/pull/19968#discussion_r901097551
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java:
##########
@@ -57,11 +79,12 @@ class StopWithSavepoint extends StateWithExecutionGraph {
private final CheckpointScheduling checkpointScheduling;
- private boolean hasFullyFinished = false;
-
- @Nullable private String savepoint = null;
-
@Nullable private Throwable operationFailureCause;
+ private boolean hasPendingStateTransition = false;
+
+ // be careful when applying operations on this future that can trigger
state transitions,
+ // as several other methods do the same and we mustn't trigger multiple
transitions!
+ private final CompletableFuture<String> internalSavepointFuture = new
CompletableFuture<>();
Review Comment:
May we add some comment to the `operationsFuture` as well to distinguish the
two from each other? The purpose of the `operationFuture` is, as far as I
understand, to have a future that completes dependening on the internal state
as soon as the subsequent state transition is happening.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -337,6 +343,70 @@ void testRestartOnTaskFailureAfterSavepointCompletion()
throws Exception {
}
}
+ @Test
+ void testOnFailureWaitsForSavepointCompletion() throws Exception {
+ try (MockStopWithSavepointContext ctx = new
MockStopWithSavepointContext()) {
+ CheckpointScheduling mockStopWithSavepointOperations = new
MockCheckpointScheduling();
+ CompletableFuture<String> savepointFuture = new
CompletableFuture<>();
+ StateTrackingMockExecutionGraph executionGraph = new
StateTrackingMockExecutionGraph();
+ StopWithSavepoint sws =
+ createStopWithSavepoint(
+ ctx, mockStopWithSavepointOperations,
executionGraph, savepointFuture);
+ ctx.setStopWithSavepoint(sws);
+
+ ctx.setHowToHandleFailure(failure ->
FailureResult.canRestart(failure, Duration.ZERO));
+
+ sws.onFailure(new Exception("task failure"));
+ ctx.triggerExecutors();
Review Comment:
Should we add a comment here (and the other two test cases) indicating that
we're not expecting anything to happen here because the `onFailure` call should
have triggered a state change? ...to add a bit of context here. Because the
test would succeed without this code line as well.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
try (MockStopWithSavepointContext ctx = new
MockStopWithSavepointContext()) {
StateTrackingMockExecutionGraph mockExecutionGraph =
new StateTrackingMockExecutionGraph();
- StopWithSavepoint sws = createStopWithSavepoint(ctx,
mockExecutionGraph);
+ final CompletableFuture<String> savepointFuture =
CompletableFuture.completedFuture("");
Review Comment:
I guess, we should extend the test name here to something like
`testJobFailedWithSavepointCreationSuccessful` to distinguish the test in a
better way from the other test cases.
Additionally, we should verify that a `StopWithSavepointStopException` is
part of the cause for this specific test case, shouldn't we?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java:
##########
@@ -82,7 +82,9 @@ void testJobFailed() throws Exception {
try (MockStopWithSavepointContext ctx = new
MockStopWithSavepointContext()) {
StateTrackingMockExecutionGraph mockExecutionGraph =
new StateTrackingMockExecutionGraph();
- StopWithSavepoint sws = createStopWithSavepoint(ctx,
mockExecutionGraph);
+ final CompletableFuture<String> savepointFuture =
CompletableFuture.completedFuture("");
Review Comment:
> needs another explicit test for job fails + successful savepoint
is that the test you're refering to in your [comment
above](https://github.com/apache/flink/pull/19968#pullrequestreview-1010663103)?
`testJobFailed` pretty much covers the scenario, doesn't it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]