rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r649166506
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws
Exception {
}
private static class SumFailSink implements SinkFunction<Integer>,
CheckpointedFunction {
- public static long result;
- public static long resultBeforeFail;
+ private static long result;
+ private static long resultBeforeFail;
+
+ public SumFailSink() {
+ resultBeforeFail = result = 0;
+ }
@Override
- public void invoke(Integer value) throws Exception {
+ public void invoke(Integer value, Context context) throws Exception {
result += value;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- resultBeforeFail = result;
+ if (resultBeforeFail == 0) {
+ resultBeforeFail = result;
+ }
+
+ // Allow to send data from the awaited map in this case we can be
sure that in-flight
+ // data exists.
+ SlowMap.checkpointReachSinkLatch.trigger();
Review comment:
I think you are right regarding which channel state will be captured and
spanning records.
Maybe put leave it as a comment in the code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]