rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r649138609



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -98,75 +123,122 @@ public void testIgnoreInFlightDataDuringRecovery() throws 
Exception {
 
         // then: Actual result should be less than the ideal result because 
some of data was
         // ignored.
-        assertThat(SumFailSink.result, lessThan(resultWithoutIgnoringData));
+        assertThat(result.get().longValue(), 
lessThan(resultWithoutIgnoringData));
 
         // and: Actual result should be equal to sum of result before fail + 
source value after
         // recovery.
-        long expectedResult = SumFailSink.resultBeforeFail + 
sourceValueAfterRestore;
-        assertEquals(expectedResult, SumFailSink.result);
+        long expectedResult = resultBeforeFail.get().longValue() + 
sourceValueAfterRestore;
+        assertEquals(expectedResult, result.get().longValue());
     }
 
     private static class SumFailSink implements SinkFunction<Integer>, 
CheckpointedFunction {
-        public static long result;
-        public static long resultBeforeFail;
+        private final SharedReference<OneShotLatch> checkpointReachSinkLatch;
+        private final SharedReference<AtomicLong> resultBeforeFail;
+        private final SharedReference<AtomicLong> result;
+
+        public SumFailSink(
+                SharedReference<OneShotLatch> checkpointReachSinkLatch,
+                SharedReference<AtomicLong> resultBeforeFail,
+                SharedReference<AtomicLong> result) {
+            this.checkpointReachSinkLatch = checkpointReachSinkLatch;
+            this.resultBeforeFail = resultBeforeFail;
+            this.result = result;
+        }
 
         @Override
-        public void invoke(Integer value) throws Exception {
-            result += value;
+        public void invoke(Integer value, Context context) throws Exception {
+            result.get().addAndGet(value);
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            resultBeforeFail = result;
+            if (resultBeforeFail.get().longValue() == 0) {
+                resultBeforeFail.get().set(result.get().longValue());
+                sinkCheckpointStarted();
+            }
         }
 
         @Override
         public void initializeState(FunctionInitializationContext context) 
throws Exception {
-            result = resultBeforeFail;
+            result.get().set(resultBeforeFail.get().longValue());
+        }
+
+        /**
+         * Allow to send data from the awaited map in this case if number of 
waiters more than 0, we
+         * can be sure that in-flight data exists(at least the data which is 
processing by waiters
+         * during the waiting will be sent to the sink before the checkpoint 
barrier would be
+         * handled).
+         */
+        public void sinkCheckpointStarted() {
+            checkpointReachSinkLatch.get().trigger();
         }
     }
 
     private static class NumberSource implements SourceFunction<Integer>, 
CheckpointedFunction {
 
         private static final long serialVersionUID = 1L;
+        private final SharedReference<AtomicInteger> lastCheckpointValue;
         private ListState<Integer> valueState;
-        public static int lastCheckpointedValue;
+        private boolean isRunning = true;
+
+        public NumberSource(SharedReference<AtomicInteger> 
lastCheckpointValue) {
+            this.lastCheckpointValue = lastCheckpointValue;
+        }
 
         @Override
         public void run(SourceContext<Integer> ctx) throws Exception {
             Iterator<Integer> stateIt = valueState.get().iterator();
             boolean isRecovered = stateIt.hasNext();
 
             if (isRecovered) {
-                Integer lastValue = stateIt.next();
+                synchronized (ctx.getCheckpointLock()) {
+                    Integer lastValue = stateIt.next();
 
-                // Checking that ListState is recovered correctly.
-                assertEquals(lastCheckpointedValue, lastValue.intValue());
+                    // Checking that ListState is recovered correctly.
+                    assertEquals(lastCheckpointValue.get().intValue(), 
lastValue.intValue());
 
-                // if it is started after recovery, just send one more value 
and finish.
-                ctx.collect(lastValue + 1);
+                    // if it is started after recovery, just send one more 
value and finish.
+                    ctx.collect(lastValue + 1);
+                }
             } else {
                 int next = 0;
-                while (true) {
+                while (isRunning) {
                     synchronized (ctx.getCheckpointLock()) {
-                        next++;
-                        valueState.update(singletonList(next));
-                        ctx.collect(next);
+                        // Emit data by batches to reduce the probability that 
before the first
+                        // checkpoint will be generated not enough data.
+                        do {
+                            next++;
+                            valueState.update(singletonList(next));
+                            ctx.collect(next);
+                        } while (next % 50 != 0 && isRunning);
                     }
+
+                    // Avoid the huge backpressure.
+                    LockSupport.parkNanos(100000);
                 }
             }
         }
 
         @Override
-        public void cancel() {}
+        public void cancel() {
+            isRunning = false;
+        }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            if (lastCheckpointedValue > 0) {
+            if (lastCheckpointValue.get().get() > 0) {
                 throw new RuntimeException("Error during snapshot");
             }
 
-            lastCheckpointedValue = valueState.get().iterator().next();
+            Integer state = valueState.get().iterator().next();
+
+            if (state < PARALLELISM) {
+                // Try to restart task.
+                throw new RuntimeException(
+                        "Not enough data to guarantee the in-flight data were 
generated before the first checkpoint");

Review comment:
       This will fail with NoSuchElementException (the effect is the same, but 
less obvious).
   And I think the allowed number of restarts should be increased (maybe 
infinite - it depends on how soon the source will start producing records vs 
checkpoint is triggered).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to