rkhachatryan commented on a change in pull request #16102:
URL: https://github.com/apache/flink/pull/16102#discussion_r647734804
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -107,17 +111,27 @@ public void testIgnoreInFlightDataDuringRecovery() throws
Exception {
}
private static class SumFailSink implements SinkFunction<Integer>,
CheckpointedFunction {
- public static long result;
- public static long resultBeforeFail;
+ private static long result;
+ private static long resultBeforeFail;
+
+ public SumFailSink() {
+ resultBeforeFail = result = 0;
+ }
@Override
- public void invoke(Integer value) throws Exception {
+ public void invoke(Integer value, Context context) throws Exception {
result += value;
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- resultBeforeFail = result;
+ if (resultBeforeFail == 0) {
+ resultBeforeFail = result;
+ }
+
+ // Allow to send data from the awaited map in this case we can be
sure that in-flight
+ // data exists.
+ SlowMap.checkpointReachSinkLatch.trigger();
Review comment:
I guess the failure reason was that no in-flight data was generated, and
ignoring it didn't make any difference. Is that correct?
If yes, I'm afraid that this line still doesn't guarantee in-flight data:
the snapshot can complete before `SlowMap` actually injects anything. Maybe
wait here for some signal from `SlowMap`?
nit: I'd incapsulate `latch.trigger()` into a method with some meaningful
name for readability
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -179,11 +205,30 @@ public void initializeState(FunctionInitializationContext
context) throws Except
private static class SlowMap extends RichMapFunction<Integer, Integer> {
+ private static OneShotLatch checkpointReachSinkLatch;
+ private static AtomicInteger idGenerator;
+ public int subtaskMapId;
+
+ public SlowMap() {
+ checkpointReachSinkLatch = new OneShotLatch();
+ idGenerator = new AtomicInteger();
Review comment:
Don't we have 3 subtasks currently, each overriding this static field?
Then, in `open()` we'll get zero `subtaskMapId` in each?
(ditto: `checkpointReachSinkLatch`)
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java
##########
@@ -69,7 +73,7 @@ private static Configuration getConfiguration() {
public void testIgnoreInFlightDataDuringRecovery() throws Exception {
// given: Stream which will fail after first checkpoint.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(3);
+ env.setParallelism(PARALLELISM);
Review comment:
We probably should also disable chaining here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]