Myasuka commented on a change in pull request #8820: [FLINK-12916][tests] Retry 
cancelWithSavepoint on cancellation barrier in AbstractOperatorRestoreTestBase
URL: https://github.com/apache/flink/pull/8820#discussion_r303515559
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 ##########
 @@ -66,12 +72,16 @@
        private static final int NUM_TMS = 1;
        private static final int NUM_SLOTS_PER_TM = 4;
        private static final Duration TEST_TIMEOUT = Duration.ofSeconds(10000L);
-       private static final Pattern 
PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern
-               .compile(
-                       "(was not running)" +
-                               "|(Not all required tasks are currently 
running)" +
-                               "|(Checkpoint was declined \\(tasks not 
ready\\))"
-               );
+
+       private static final Pattern 
PATTERN_CANCEL_WITH_SAVEPOINT_TOLERATED_EXCEPTIONS = Pattern.compile(
+               Stream.of(
+                       TRIGGER_SAVEPOINT_FAILURE.message(),
+                       NOT_ALL_REQUIRED_TASKS_RUNNING.message(),
+                       CHECKPOINT_DECLINED_TASK_NOT_READY.message(),
+                       // If task already in state RUNNING while stream task 
not running, stream task would then broadcast barrier.
+                       CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER.message())
 
 Review comment:
   As this test use `EXACTLY_ONCE` to run checkpoints, we would analysis all 
cases that might crated `CancelCheckpointMarker` with [failed 
instance](https://api.travis-ci.org/v3/job/557572323/log.txt) of the 
[mvn-1.log](https://transfer.sh/nw1vG/39461.8.tar.gz).
   
   I paste the error log below, and would use this to analysis:
   ~~~
   23:53:31,033 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph    
    - Source: Custom Source (1/1) (ea4ff5f207089adad2fb67617ba00a36) switched 
from DEPLOYING to RUNNING.
   23:53:31,036 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
    - Triggering checkpoint 382 @ 1562889211036 for job 
5b085f6a3ad6cb824d085deabaee3baf.
   23:53:31,038 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend 
    - Initializing heap keyed state backend with stream factory.
   23:53:31,040 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend 
    - Initializing heap keyed state backend with stream factory.
   23:53:31,042 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
    - Decline checkpoint 382 by task 88f46280796f75b61dc22d811ebf8911 of job 
5b085f6a3ad6cb824d085deabaee3baf at 9ec8cafd-f53e-41f8-b1c0-5531d7ff133f @ 
localhost (dataPort=-1).
   23:53:31,042 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
    - Discarding checkpoint 382 of job 5b085f6a3ad6cb824d085deabaee3baf.
   org.apache.flink.runtime.checkpoint.CheckpointException: Task received 
cancellation from one of its inputs
        at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyAbortOnCancellationBarrier(CheckpointBarrierHandler.java:96)
        at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processCancellationBarrier(CheckpointBarrierAligner.java:223)
        at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:161)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:102)
        at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.pollNextNullable(StreamTaskNetworkInput.java:47)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:135)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:515)
        at java.lang.Thread.run(Thread.java:748)
   23:53:31,042 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend 
    - Initializing heap keyed state backend with stream factory.
   23:53:31,042 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend 
    - Initializing heap keyed state backend with stream factory.
   23:53:31,049 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph    
    - Job Flink Streaming Job (5b085f6a3ad6cb824d085deabaee3baf) switched from 
state RUNNING to CANCELLING.
   ~~~
   
   `CheckpointBarrierAligner`:
   * receive new checkpoint in `CheckpointBarrierHandler` but older checkpoints 
have not completed (see 
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java#L138)).
 This is impossible for this case, as `checkpoint 382` is the first checkpoint 
to trigger.
   * process end of partition. This should not happen in this case, as for 
migration job, `IntegerTupleSource` would never ended by itself (see 
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L138)).
   * buffered checkpoint size over the limit. This should not happen as default 
`task.checkpoint.alignment.max-size` is `-1`, which means no limit.
   
   Other cases in `CheckpointBarrierAligner` would only process already created 
`CheckpointBarrierHandler` not to create the first `CheckpointBarrierHandler`.
   
   `StreamTask`
   * This should be the only place where to cause this case. If task received 
the checkpoint action but `isRunning` in `StreamTask` is still false (see 
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L773)),
 that would also create the first `CheckpointBarrierHandler`. Please take a 
look at above error logs, you would find a lot of `"Initializing heap keyed 
state backend with stream factory"` before and after that checkpoint failure. 
Actually this log printed in `initializeState()` (see 
[code](https://github.com/apache/flink/blob/0fe634ccbc9e6caf82b40af7ae900a94221b9a8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L367))
 of `StreamTask`. Please pay attention again that the field 'isRunning' is 
still false when calling `initializeState()`, in other words, there exist 
possibility that the `StreamTask` still has `isRunning` as false while received 
checkpoint action request.
   
   In a nutshell, we should ignore the 
`CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER` case to trigger savepoint again. 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to