akalash commented on a change in pull request #17055:
URL: https://github.com/apache/flink/pull/17055#discussion_r700961981
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1264,6 +1265,42 @@ public void testCheckpointDeclinedOnClosedOperator()
throws Throwable {
assertEquals(1,
harness.getCheckpointResponder().getDeclineReports().size());
}
+ @Test
+ public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint()
throws Throwable {
+ // given: Marker that the task is finished.
+ AtomicBoolean finishTask = new AtomicBoolean();
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ (env) ->
+ new OneInputStreamTask<Integer,
Integer>(env) {
+ @Override
+ protected void finishTask() throws
Exception {
+ super.finishTask();
+ finishTask.set(true);
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(
+ new TestBoundedOneInputStreamOperator())
+ .build();
+
+ // when: Receiving the abort notification of the previous checkpoint
before the complete
+ // notification of the savepoint terminate.
+ harness.streamTask.notifyCheckpointAbortAsync(1);
+ harness.streamTask.notifyCheckpointCompleteAsync(2);
Review comment:
Do I understand correctly, that you are talking about order in the test?
Do you mean that we should explicitly call `triggerCheckpointOnBarrier` first?
If so please, check the my last commit I a little rewrote the test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]