akalash commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r648501659



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws 
Exception {
     }
 
     private static class SumFailSink implements SinkFunction<Integer>, 
CheckpointedFunction {
-        public static long result;
-        public static long resultBeforeFail;
+        private static long result;
+        private static long resultBeforeFail;
+
+        public SumFailSink() {
+            resultBeforeFail = result = 0;
+        }
 
         @Override
-        public void invoke(Integer value) throws Exception {
+        public void invoke(Integer value, Context context) throws Exception {
             result += value;
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            resultBeforeFail = result;
+            if (resultBeforeFail == 0) {
+                resultBeforeFail = result;
+            }
+
+            // Allow to send data from the awaited map in this case we can be 
sure that in-flight
+            // data exists.
+            SlowMap.checkpointReachSinkLatch.trigger();

Review comment:
        the snapshot can not be completed before emitting data because at least 
two of subtasks waiting in the middle of processing and they cannot handle the 
checkpoint barrier before emitting the current value but these value can not be 
emitted before Sink don't start the checkpoint which guarantees that at least 
these two records will be in-flight data.
   
   But it is a good question how to be sure that the checkpoint started when 
somebody is already waiting on this latch. Ideally, the checkpoint should be 
triggered manually when the latch is taken but I don't sure that it is possible 
to do it in ITCase(or I am wrong?). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to