[ 
https://issues.apache.org/jira/browse/FLINK-20816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312831#comment-17312831
 ] 

Yun Tang commented on FLINK-20816:
----------------------------------

>From the comparison of success and failure logs, I think the root cause is 
>that {{DeclineSink}} did not execute sync phase of snapshot of checkpoint-2. 
>However, we expect to fail the checkpoint-2 during async phase and that's why 
>the test timeout as we did not wait for the expect checkpoint failure.

If we extract all related logs of {{DeclineSink}} from failure log:
{code:java}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- found barrier 2, lastSeenBarrier = 1 (COMPLETED) @ 
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
 [] - DeclineSink (1/1)#0 (7457bf515844f409738c9929fffc54f7): Received barrier 
from channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} @ 2.
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] - 
DeclineSink (1/1)#0 starting checkpoint 2 (CheckpointOptions {checkpointType = 
CHECKPOINT, targetLocation = (default), isExactlyOnceMode = true, 
isUnalignedCheckpoint = true, alignmentTimeout = 9223372036854775807})
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister [] 
- startPersisting 2, lastSeenBarrier = 2 (BARRIER_RECEIVED) @ 
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
 [] - DeclineSink (1/1)#0 (7457bf515844f409738c9929fffc54f7): Triggering 
checkpoint 2 on the barrier announcement at 1617138259258.
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Starting 
checkpoint (2) CHECKPOINT on task DeclineSink (1/1)#0
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] - 
DeclineSink (1/1)#0 finishing output data, checkpoint 2
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] - 
DeclineSink (1/1)#0 requested write result, checkpoint 2
21:04:19,273 [Channel state writer DeclineSink (1/1)#0] DEBUG 
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter [] - 
complete output, input completed: false
21:05:58,298 [flink-akka.actor.default-dispatcher-12] INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - DeclineSink 
(1/1) (7457bf515844f409738c9929fffc54f7) switched from RUNNING to CANCELING.
{code}

Since I am not familiar with unaligned checkpoint, I think {{Channel state 
writer DeclineSink}} should wait for input completed as true and then [code 
below|https://github.com/apache/flink/blob/04bbf03a0cdb2f455c1b06569dea95ace6fa7e7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L305-L317]
 could execute:

{code:java}
        // Step (4): Take the state snapshot. This should be largely 
asynchronous, to not impact
        // progress of the
        // streaming topology

        Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
                new HashMap<>(operatorChain.getNumberOfOperators());
        try {
            if (takeSnapshotSync(
                    snapshotFutures, metadata, metrics, options, operatorChain, 
isRunning)) {
                finishAndReportAsync(snapshotFutures, metadata, metrics, 
isRunning);
            } else {
                cleanup(snapshotFutures, metadata, metrics, new 
Exception("Checkpoint declined"));
            }
        } catch (Exception ex) {
            cleanup(snapshotFutures, metadata, metrics, ex);
            throw ex;
        }
{code}




> NotifyCheckpointAbortedITCase failed due to timeout
> ---------------------------------------------------
>
>                 Key: FLINK-20816
>                 URL: https://issues.apache.org/jira/browse/FLINK-20816
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.2, 1.13.0
>            Reporter: Matthias
>            Assignee: Matthias
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.13.0
>
>         Attachments: flink-20816-failure.log, flink-20816-success.log
>
>
> [This 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=152&view=logs&j=0a15d512-44ac-5ba5-97ab-13a5d066c22c&t=634cd701-c189-5dff-24cb-606ed884db87&l=4245]
>  failed caused by a failing of {{NotifyCheckpointAbortedITCase}} due to a 
> timeout.
> {code}
> 2020-12-29T21:48:40.9430511Z [INFO] Running 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase
> 2020-12-29T21:50:28.0087043Z [ERROR] Tests run: 2, Failures: 0, Errors: 1, 
> Skipped: 0, Time elapsed: 107.062 s <<< FAILURE! - in 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase
> 2020-12-29T21:50:28.0087961Z [ERROR] 
> testNotifyCheckpointAborted[unalignedCheckpointEnabled 
> =true](org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase)  
> Time elapsed: 104.044 s  <<< ERROR!
> 2020-12-29T21:50:28.0088619Z org.junit.runners.model.TestTimedOutException: 
> test timed out after 100000 milliseconds
> 2020-12-29T21:50:28.0088972Z  at java.lang.Object.wait(Native Method)
> 2020-12-29T21:50:28.0089267Z  at java.lang.Object.wait(Object.java:502)
> 2020-12-29T21:50:28.0089633Z  at 
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> 2020-12-29T21:50:28.0090458Z  at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:200)
> 2020-12-29T21:50:28.0091313Z  at 
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:183)
> 2020-12-29T21:50:28.0091819Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-12-29T21:50:28.0092199Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-12-29T21:50:28.0092675Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-12-29T21:50:28.0093095Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-12-29T21:50:28.0093495Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-12-29T21:50:28.0093980Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-12-29T21:50:28.0094444Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-12-29T21:50:28.0094917Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-12-29T21:50:28.0095663Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-12-29T21:50:28.0096221Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-12-29T21:50:28.0096675Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-12-29T21:50:28.0097022Z  at java.lang.Thread.run(Thread.java:748)
> {code}
> The branch contained changes from FLINK-20594 and FLINK-20595. These issues 
> remove code that is not used anymore and should have had only affects on unit 
> tests. [The previous 
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=151&view=results]
>  containing all the changes accept for 
> [9c57c37|https://github.com/XComp/flink/commit/9c57c37c50733a1f592a4fc5e492b22be80d8279]
>  passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to