rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r648621232



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws 
Exception {
     }
 
     private static class SumFailSink implements SinkFunction<Integer>, 
CheckpointedFunction {
-        public static long result;
-        public static long resultBeforeFail;
+        private static long result;
+        private static long resultBeforeFail;
+
+        public SumFailSink() {
+            resultBeforeFail = result = 0;
+        }
 
         @Override
-        public void invoke(Integer value) throws Exception {
+        public void invoke(Integer value, Context context) throws Exception {
             result += value;
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            resultBeforeFail = result;
+            if (resultBeforeFail == 0) {
+                resultBeforeFail = result;
+            }
+
+            // Allow to send data from the awaited map in this case we can be 
sure that in-flight
+            // data exists.
+            SlowMap.checkpointReachSinkLatch.trigger();

Review comment:
       So the state is expected in channel between the active Map-3 and the 
Sink, right? 
   But isn't it possible that right after receiving the barrier, sink will 
"unblock" Map-1 and -2; receive the barriers immediately, before processing any 
elements from the active Map-3 - thereby completing checkpoint without any 
channel state?
   
   > But it is a good question how to be sure that the checkpoint started when 
somebody is already waiting on this latch. Ideally, the checkpoint should be 
triggered manually when the latch is taken but I don't sure that it is possible 
to do it in ITCase(or I am wrong?).
   
   
   It's possible to trigger savepoint from the outside, but it doesn't capture 
inflight data apparently.
   One option is to wait in `StreamOperator.prepareSnapshotPreBarrier` (but 
need to replace Map with an Operator).
   Another one is to decline checkpoints in Map until it waited enough. E.g. by 
throwing an exception and causing a restart (I think throwing 
`CheckpointException` may not work from `Map`, and we'll probably need to 
handle multple checkpoints instead of just one)
   Or maybe just re-run the test?:) As long as we can clearly distinguish 
between missing channel state (test issue) and other failures this should be 
fine I think.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to