akalash commented on a change in pull request #17055:
URL: https://github.com/apache/flink/pull/17055#discussion_r700881323
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1264,6 +1265,42 @@ public void testCheckpointDeclinedOnClosedOperator()
throws Throwable {
assertEquals(1,
harness.getCheckpointResponder().getDeclineReports().size());
}
+ @Test
+ public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint()
throws Throwable {
+ // given: Marker that the task is finished.
+ AtomicBoolean finishTask = new AtomicBoolean();
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ (env) ->
+ new OneInputStreamTask<Integer,
Integer>(env) {
+ @Override
+ protected void finishTask() throws
Exception {
+ super.finishTask();
+ finishTask.set(true);
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator())
+ .build();
+
+ // when: Receiving the abort notification of the previous checkpoint
before the complete
+ // notification of the savepoint terminate.
+ harness.streamTask.notifyCheckpointAbortAsync(1);
+ harness.streamTask.notifyCheckpointCompleteAsync(2);
Review comment:
`notifyCheckpointCompleteAsync` and `notifyCheckpointAbortAsync`
actually do nothing but add the task to mailbox. When
`triggerCheckpointOnBarrier` is called it first of all does its own job and
only then it directly execute all waiting mails until the checkpoint is
complete. So in fact the `triggerCheckpointOnBarrier` is always first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]