rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r647734804



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws 
Exception {
     }
 
     private static class SumFailSink implements SinkFunction<Integer>, 
CheckpointedFunction {
-        public static long result;
-        public static long resultBeforeFail;
+        private static long result;
+        private static long resultBeforeFail;
+
+        public SumFailSink() {
+            resultBeforeFail = result = 0;
+        }
 
         @Override
-        public void invoke(Integer value) throws Exception {
+        public void invoke(Integer value, Context context) throws Exception {
             result += value;
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            resultBeforeFail = result;
+            if (resultBeforeFail == 0) {
+                resultBeforeFail = result;
+            }
+
+            // Allow to send data from the awaited map in this case we can be 
sure that in-flight
+            // data exists.
+            SlowMap.checkpointReachSinkLatch.trigger();

Review comment:
       I guess the failure reason was that no in-flight data was generated, and 
ignoring it didn't make any difference. Is that correct?
   
   If yes, I'm afraid that this line still doesn't guarantee in-flight data: 
the snapshot can complete before `SlowMap` actually injects anything. Maybe 
wait here for some signal from `SlowMap`?
   
   nit: I'd incapsulate `latch.trigger()` into a method with some meaningful 
name for readability

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -179,11 +205,30 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 
     private static class SlowMap extends RichMapFunction<Integer, Integer> {
 
+        private static OneShotLatch checkpointReachSinkLatch;
+        private static AtomicInteger idGenerator;
+        public int subtaskMapId;
+
+        public SlowMap() {
+            checkpointReachSinkLatch = new OneShotLatch();
+            idGenerator = new AtomicInteger();

Review comment:
       Don't we have 3 subtasks currently, each overriding this static field?
   Then, in `open()` we'll get zero `subtaskMapId` in each?
   
   (ditto: `checkpointReachSinkLatch`)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -69,7 +73,7 @@ private static Configuration getConfiguration() {
     public void testIgnoreInFlightDataDuringRecovery() throws Exception {
         // given: Stream which will fail after first checkpoint.
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(3);
+        env.setParallelism(PARALLELISM);

Review comment:
       We probably should also disable chaining here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to