akalash commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r648958362
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws
Exception {
}
private static class SumFailSink implements SinkFunction<Integer>,
CheckpointedFunction {
- public static long result;
- public static long resultBeforeFail;
+ private static long result;
+ private static long resultBeforeFail;
+
+ public SumFailSink() {
+ resultBeforeFail = result = 0;
+ }
@Override
- public void invoke(Integer value) throws Exception {
+ public void invoke(Integer value, Context context) throws Exception {
result += value;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- resultBeforeFail = result;
+ if (resultBeforeFail == 0) {
+ resultBeforeFail = result;
+ }
+
+ // Allow to send data from the awaited map in this case we can be
sure that in-flight
+ // data exists.
+ SlowMap.checkpointReachSinkLatch.trigger();
Review comment:
I actually expect the state between Source and Map-1, Map-2 or between
Map-1, Map-2 and Sink. So I know that some data was emited by Source but
weren't achieve the Sink because Map-1 and Map-2 are waiting. When Sink
receives the barrier from Map-3 any other data which were sent to/from Map-1
or Map-2 before the checkpoint barrier are in-flight data. As I understand, if
Source sent at least one record to Map-1 or Map-2 before checkpoint was
triggered it should work because this record should become in-flight data in
some gate Source-Map or Map-Sink. But maybe I wrong somewhere.
I implemented restarting the task if there are not enough data for in-flight
data guarantees. I don't know it will help or not but will see.
> How do we prevent/handle records spanning multiple buffers in this test?
If they are possible and we drop them, then failures are inevitable. So we'll
have to re-run the test in that case.
This problem is ignored right now. As I understand, it hardly possible here
because the value is integer. But in general, this problem is possible but
right now I don't have a good idea how to handle it correctly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]