Myasuka commented on a change in pull request #8820: [FLINK-12916][tests] Retry
cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase
URL: https://github.com/apache/flink/pull/8820#discussion_r303515559
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
##########
@@ -66,12 +72,16 @@
private static final int NUM_TMS = 1;
private static final int NUM_SLOTS_PER_TM = 4;
private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
- private static final Pattern
PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern
- .compile(
- "(was not running)" +
- "|(Not all required tasks are currently
running)" +
- "|(Checkpoint was declined \\(tasks not
ready\\))"
- );
+
+ private static final Pattern
PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern.compile(
+ Stream.of(
+ TRIGGER_SAVEPOINT_FAILURE.message(),
+ NOT_ALL_REQUIRED_TASKS_RUNNING.message(),
+ CHECKPOINT_DECLINED_TASK_NOT_READY.message(),
+ // If task already in state RUNNING while stream task
not running, stream task would then broadcast barrier.
+ CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER.message())
Review comment:
As this test use `EXACTLY_ONCE` to run checkpoints, we would analysis all
cases that might crated `CancelCheckpointMarker` with [failed
instance](https://api.travis-ci.org/v3/job/557572323/log.txt) of the
[mvn-1.log](https://transfer.sh/nw1vG/39461.8.tar.gz).
I paste the error log below, and would use this to analysis:
~~~
23:53:31,033 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Source: Custom Source (1/1) (ea4ff5f207089adad2fb67617ba00a36) switched
from DEPLOYING to RUNNING.
23:53:31,036 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 382 @ 1562889211036 for job
5b085f6a3ad6cb824d085deabaee3baf.
23:53:31,038 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
- Initializing heap keyed state backend with stream factory.
23:53:31,040 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
- Initializing heap keyed state backend with stream factory.
23:53:31,042 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Decline checkpoint 382 by task 88f46280796f75b61dc22d811ebf8911 of job
5b085f6a3ad6cb824d085deabaee3baf at 9ec8cafd-f53e-41f8-b1c0-5531d7ff133f @
localhost (dataPort=-1).
23:53:31,042 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Discarding checkpoint 382 of job 5b085f6a3ad6cb824d085deabaee3baf.
org.apache.flink.runtime.checkpoint.CheckpointException: Task received
cancellation from one of its inputs
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyAbortOnCancellationBarrier(CheckpointBarrierHandler.java:96)
at
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processCancellationBarrier(CheckpointBarrierAligner.java:223)
at
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
at java.lang.Thread.run(Thread.java:748)
23:53:31,042 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
- Initializing heap keyed state backend with stream factory.
23:53:31,042 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend
- Initializing heap keyed state backend with stream factory.
23:53:31,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Job Flink Streaming Job (5b085f6a3ad6cb824d085deabaee3baf) switched from
state RUNNING to CANCELLING.
~~~
`CheckpointBarrierAligner`:
* receive new checkpoint in `CheckpointBarrierHandler` but older checkpoints
have not completed (see
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java#L138)).
This is impossible for this case, as `checkpoint 382` is the first checkpoint
to trigger.
* process end of partition. This should not happen in this case, as for
migration job, `IntegerTupleSource` would never ended by itself (see
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L138)).
* buffered checkpoint size over the limit. This should not happen as default
`task.checkpoint.alignment.max-size` is `-1`, which means no limit.
Other cases in `CheckpointBarrierAligner` would only process already created
`CheckpointBarrierHandler` not to create the first `CheckpointBarrierHandler`.
`StreamTask`
* This should be the only place where to cause this case. If task received
the checkpoint action but `isRunning` in `StreamTask` is still false (see
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L773)),
that would also create the first `CheckpointBarrierHandler`. Please take a
look at above error logs, you would find a lot of `"Initializing heap keyed
state backend with stream factory"` before and after that checkpoint failure.
Actually this log printed in `initializeState()` (see
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L367))
of `StreamTask`. Please pay attention again that the field 'isRunning' is
still false when calling `initializeState()`, in other words, there exist
possibility that the `StreamTask` still has `isRunning` as false while received
checkpoint action request.
In a nutshell, we should ignore the
`CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER` case to trigger savepoint again.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services