dawidwys commented on a change in pull request #17055:
URL: https://github.com/apache/flink/pull/17055#discussion_r700305457



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1264,6 +1265,42 @@ public void testCheckpointDeclinedOnClosedOperator() 
throws Throwable {
         assertEquals(1, 
harness.getCheckpointResponder().getDeclineReports().size());
     }
 
+    @Test
+    public void testAbortPreviousCheckpointBeforeCompleteTerminateSavepoint() 
throws Throwable {
+        // given: Marker that the task is finished.
+        AtomicBoolean finishTask = new AtomicBoolean();
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                (env) ->
+                                        new OneInputStreamTask<Integer, 
Integer>(env) {
+                                            @Override
+                                            protected void finishTask() throws 
Exception {
+                                                super.finishTask();
+                                                finishTask.set(true);
+                                            }
+                                        },
+                                BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+        StreamTaskMailboxTestHarness<Integer> harness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new TestBoundedOneInputStreamOperator())
+                        .build();
+
+        // when: Receiving the abort notification of the previous checkpoint 
before the complete
+        // notification of the savepoint terminate.
+        harness.streamTask.notifyCheckpointAbortAsync(1);
+        harness.streamTask.notifyCheckpointCompleteAsync(2);

Review comment:
       I meant the order of `notifyCheckpointCompleteAsync` and 
`triggerCheckpointOnBarrier`.
   
   The barrier must arrive first before we can say the checkpoint completed. 
It's not possible to receive a `notifyCheckpointCompleteAsync` before we've 
seen a barrier for the checkpoint.
   
   In your testcase you're first sending the `notifyCheckpointCompleteAsync(2)` 
and after that you're sending a barrier: `triggerCheckpointOnBarrier(...(2)...)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to