dawidwys commented on a change in pull request #17055:
URL: https://github.com/apache/flink/pull/17055#discussion_r701123139
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1264,6 +1265,42 @@ public void testCheckpointDeclinedOnClosedOperator()
throws Throwable {
assertEquals(1,
harness.getCheckpointResponder().getDeclineReports().size());
}
+ @Test
+ public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint()
throws Throwable {
+ // given: Marker that the task is finished.
+ AtomicBoolean finishTask = new AtomicBoolean();
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ (env) ->
+ new OneInputStreamTask<Integer,
Integer>(env) {
+ @Override
+ protected void finishTask() throws
Exception {
+ super.finishTask();
+ finishTask.set(true);
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator())
+ .build();
+
+ // when: Receiving the abort notification of the previous checkpoint
before the complete
+ // notification of the savepoint terminate.
+ harness.streamTask.notifyCheckpointAbortAsync(1);
+ harness.streamTask.notifyCheckpointCompleteAsync(2);
Review comment:
Ok, I checked it myself. If we execute it from the test thread it gets
stuck in the synchronous savepoint loop.
I think executing `triggerCheckpointOnBarrier` from the `MailboxExecutor` is
fine here. I would not do it for the `*Async` methods.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]