zentol commented on a change in pull request #16229:
URL: https://github.com/apache/flink/pull/16229#discussion_r655956199
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -219,6 +223,30 @@ public Long map(Long value) throws Exception {
if (++num > failAt) {
throw new Exception("Artificial
intermittent failure.");
}
+ // This test depends on checkpoints
persisting progress from
+ // the source before above exception
gets triggered.
+ // Otherwise, the job will run for a
long time (or forever)
+ // because above exception will be
thrown before any
+ // checkpoint successfully completes.
+ //
+ // Checkpoints are triggered once the
checkpoint scheduler
+ // gets started + a random initial
delay.
+ // For DefaultScheduler, this
mechanism is fine, because DS
+ // starts the checkpoint coordinator,
then requests the
+ // required slots and then deploys the
tasks. These
+ // operations take enough time to have
a checkpoint
+ // triggered by the time the task
starts running.
+ // AdaptiveScheduler starts the
CheckpointCoordinator right
+ // before deploying tasks (when slots
are available
+ // already), hence tasks will start
running almost
+ // immediately, and the checkpoint
gets triggered too late
+ // (it won't be able to complete
before the artificial
+ // failure from this test)
+ // When we detect AdaptiveScheduler,
we artificially slow
+ // down
+ if (adaptiveSchedulerEnabled) {
+ Thread.sleep(1);
Review comment:
surely there is a more sane way to do this, like waiting for
notifyCheckpointComplete to arrive before throwing the exception.
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -219,6 +223,30 @@ public Long map(Long value) throws Exception {
if (++num > failAt) {
throw new Exception("Artificial
intermittent failure.");
}
+ // This test depends on checkpoints
persisting progress from
+ // the source before above exception
gets triggered.
+ // Otherwise, the job will run for a
long time (or forever)
+ // because above exception will be
thrown before any
+ // checkpoint successfully completes.
+ //
+ // Checkpoints are triggered once the
checkpoint scheduler
+ // gets started + a random initial
delay.
+ // For DefaultScheduler, this
mechanism is fine, because DS
+ // starts the checkpoint coordinator,
then requests the
+ // required slots and then deploys the
tasks. These
+ // operations take enough time to have
a checkpoint
+ // triggered by the time the task
starts running.
+ // AdaptiveScheduler starts the
CheckpointCoordinator right
+ // before deploying tasks (when slots
are available
+ // already), hence tasks will start
running almost
+ // immediately, and the checkpoint
gets triggered too late
+ // (it won't be able to complete
before the artificial
+ // failure from this test)
+ // When we detect AdaptiveScheduler,
we artificially slow
+ // down
+ if (adaptiveSchedulerEnabled) {
+ Thread.sleep(1);
Review comment:
(I suppose the _source_ should wait until at least 1 checkpoint was
taken before emitting the last value?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]