zentol commented on a change in pull request #16229:
URL: https://github.com/apache/flink/pull/16229#discussion_r655956199



##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -219,6 +223,30 @@ public Long map(Long value) throws Exception {
                                         if (++num > failAt) {
                                             throw new Exception("Artificial 
intermittent failure.");
                                         }
+                                        // This test depends on checkpoints 
persisting progress from
+                                        // the source before above exception 
gets triggered.
+                                        // Otherwise, the job will run for a 
long time (or forever)
+                                        // because above exception will be 
thrown before any
+                                        // checkpoint successfully completes.
+                                        //
+                                        // Checkpoints are triggered once the 
checkpoint scheduler
+                                        // gets started + a random initial 
delay.
+                                        // For DefaultScheduler, this 
mechanism is fine, because DS
+                                        // starts the checkpoint coordinator, 
then requests the
+                                        // required slots and then deploys the 
tasks. These
+                                        // operations take enough time to have 
a checkpoint
+                                        // triggered by the time the task 
starts running.
+                                        // AdaptiveScheduler starts the 
CheckpointCoordinator right
+                                        // before deploying tasks (when slots 
are available
+                                        // already), hence tasks will start 
running almost
+                                        // immediately, and the checkpoint 
gets triggered too late
+                                        // (it won't be able to complete 
before the artificial
+                                        // failure from this test)
+                                        // When we detect AdaptiveScheduler, 
we artificially slow
+                                        // down
+                                        if (adaptiveSchedulerEnabled) {
+                                            Thread.sleep(1);

Review comment:
       surely there is a more sane way to do this, like waiting for 
notifyCheckpointComplete to arrive before throwing the exception.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -219,6 +223,30 @@ public Long map(Long value) throws Exception {
                                         if (++num > failAt) {
                                             throw new Exception("Artificial 
intermittent failure.");
                                         }
+                                        // This test depends on checkpoints 
persisting progress from
+                                        // the source before above exception 
gets triggered.
+                                        // Otherwise, the job will run for a 
long time (or forever)
+                                        // because above exception will be 
thrown before any
+                                        // checkpoint successfully completes.
+                                        //
+                                        // Checkpoints are triggered once the 
checkpoint scheduler
+                                        // gets started + a random initial 
delay.
+                                        // For DefaultScheduler, this 
mechanism is fine, because DS
+                                        // starts the checkpoint coordinator, 
then requests the
+                                        // required slots and then deploys the 
tasks. These
+                                        // operations take enough time to have 
a checkpoint
+                                        // triggered by the time the task 
starts running.
+                                        // AdaptiveScheduler starts the 
CheckpointCoordinator right
+                                        // before deploying tasks (when slots 
are available
+                                        // already), hence tasks will start 
running almost
+                                        // immediately, and the checkpoint 
gets triggered too late
+                                        // (it won't be able to complete 
before the artificial
+                                        // failure from this test)
+                                        // When we detect AdaptiveScheduler, 
we artificially slow
+                                        // down
+                                        if (adaptiveSchedulerEnabled) {
+                                            Thread.sleep(1);

Review comment:
       (I suppose the _source_ should wait until at least 1 checkpoint was 
taken before emitting the last value?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to