[
https://issues.apache.org/jira/browse/FLINK-20816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312831#comment-17312831
]
Yun Tang commented on FLINK-20816:
----------------------------------
>From the comparison of success and failure logs, I think the root cause is
>that {{DeclineSink}} did not execute sync phase of snapshot of checkpoint-2.
>However, we expect to fail the checkpoint-2 during async phase and that's why
>the test timeout as we did not wait for the expect checkpoint failure.
If we extract all related logs of {{DeclineSink}} from failure log:
{code:java}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister []
- found barrier 2, lastSeenBarrier = 1 (COMPLETED) @
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - DeclineSink (1/1)#0 (7457bf515844f409738c9929fffc54f7): Received barrier
from channel InputChannelInfo{gateIdx=0, inputChannelIdx=0} @ 2.
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] -
DeclineSink (1/1)#0 starting checkpoint 2 (CheckpointOptions {checkpointType =
CHECKPOINT, targetLocation = (default), isExactlyOnceMode = true,
isUnalignedCheckpoint = true, alignmentTimeout = 9223372036854775807})
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister []
- startPersisting 2, lastSeenBarrier = 2 (BARRIER_RECEIVED) @
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler
[] - DeclineSink (1/1)#0 (7457bf515844f409738c9929fffc54f7): Triggering
checkpoint 2 on the barrier announcement at 1617138259258.
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask [] - Starting
checkpoint (2) CHECKPOINT on task DeclineSink (1/1)#0
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] -
DeclineSink (1/1)#0 finishing output data, checkpoint 2
21:04:19,272 [ DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl [] -
DeclineSink (1/1)#0 requested write result, checkpoint 2
21:04:19,273 [Channel state writer DeclineSink (1/1)#0] DEBUG
org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter [] -
complete output, input completed: false
21:05:58,298 [flink-akka.actor.default-dispatcher-12] INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DeclineSink
(1/1) (7457bf515844f409738c9929fffc54f7) switched from RUNNING to CANCELING.
{code}
Since I am not familiar with unaligned checkpoint, I think {{Channel state
writer DeclineSink}} should wait for input completed as true and then [code
below|https://github.com/apache/flink/blob/04bbf03a0cdb2f455c1b06569dea95ace6fa7e7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java#L305-L317]
could execute:
{code:java}
// Step (4): Take the state snapshot. This should be largely
asynchronous, to not impact
// progress of the
// streaming topology
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures =
new HashMap<>(operatorChain.getNumberOfOperators());
try {
if (takeSnapshotSync(
snapshotFutures, metadata, metrics, options, operatorChain,
isRunning)) {
finishAndReportAsync(snapshotFutures, metadata, metrics,
isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new
Exception("Checkpoint declined"));
}
} catch (Exception ex) {
cleanup(snapshotFutures, metadata, metrics, ex);
throw ex;
}
{code}
> NotifyCheckpointAbortedITCase failed due to timeout
> ---------------------------------------------------
>
> Key: FLINK-20816
> URL: https://issues.apache.org/jira/browse/FLINK-20816
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.12.2, 1.13.0
> Reporter: Matthias
> Assignee: Matthias
> Priority: Critical
> Labels: test-stability
> Fix For: 1.13.0
>
> Attachments: flink-20816-failure.log, flink-20816-success.log
>
>
> [This
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=152&view=logs&j=0a15d512-44ac-5ba5-97ab-13a5d066c22c&t=634cd701-c189-5dff-24cb-606ed884db87&l=4245]
> failed caused by a failing of {{NotifyCheckpointAbortedITCase}} due to a
> timeout.
> {code}
> 2020-12-29T21:48:40.9430511Z [INFO] Running
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase
> 2020-12-29T21:50:28.0087043Z [ERROR] Tests run: 2, Failures: 0, Errors: 1,
> Skipped: 0, Time elapsed: 107.062 s <<< FAILURE! - in
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase
> 2020-12-29T21:50:28.0087961Z [ERROR]
> testNotifyCheckpointAborted[unalignedCheckpointEnabled
> =true](org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase)
> Time elapsed: 104.044 s <<< ERROR!
> 2020-12-29T21:50:28.0088619Z org.junit.runners.model.TestTimedOutException:
> test timed out after 100000 milliseconds
> 2020-12-29T21:50:28.0088972Z at java.lang.Object.wait(Native Method)
> 2020-12-29T21:50:28.0089267Z at java.lang.Object.wait(Object.java:502)
> 2020-12-29T21:50:28.0089633Z at
> org.apache.flink.core.testutils.OneShotLatch.await(OneShotLatch.java:61)
> 2020-12-29T21:50:28.0090458Z at
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.verifyAllOperatorsNotifyAborted(NotifyCheckpointAbortedITCase.java:200)
> 2020-12-29T21:50:28.0091313Z at
> org.apache.flink.test.checkpointing.NotifyCheckpointAbortedITCase.testNotifyCheckpointAborted(NotifyCheckpointAbortedITCase.java:183)
> 2020-12-29T21:50:28.0091819Z at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-12-29T21:50:28.0092199Z at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-12-29T21:50:28.0092675Z at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-12-29T21:50:28.0093095Z at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-12-29T21:50:28.0093495Z at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-12-29T21:50:28.0093980Z at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-12-29T21:50:28.0094444Z at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-12-29T21:50:28.0094917Z at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-12-29T21:50:28.0095663Z at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-12-29T21:50:28.0096221Z at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-12-29T21:50:28.0096675Z at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-12-29T21:50:28.0097022Z at java.lang.Thread.run(Thread.java:748)
> {code}
> The branch contained changes from FLINK-20594 and FLINK-20595. These issues
> remove code that is not used anymore and should have had only affects on unit
> tests. [The previous
> build|https://dev.azure.com/mapohl/flink/_build/results?buildId=151&view=results]
> containing all the changes accept for
> [9c57c37|https://github.com/XComp/flink/commit/9c57c37c50733a1f592a4fc5e492b22be80d8279]
> passed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)