[
https://issues.apache.org/jira/browse/FLINK-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16901363#comment-16901363
]
Yun Tang commented on FLINK-13601:
----------------------------------
[~aljoscha] After looking up the [detail
log|https://transfer.sh/bNhvk/1452.8.tar.gz] ofÂ
[flink-ci-failure|https://travis-ci.com/flink-ci/flink/jobs/222711830]. I am
pretty sure that this is a test instability.
Currently, I use {{notifyCheckpointComplete}} interface to record
{{snapshotIndicesOfSubTask}} and {{lastCompletedCheckpointId}} on task side.
However, the checkpoint 15 completed just after this task failed, and
{{notifyCheckpointComplete}} would not invoked on task side anymore since this
the task has been failded. In other words, {{lastCompletedCheckpointId}} would
only be record as last complete checkpoint 14, which leads to the failure.
Below is related code:
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) {
if (getRuntimeContext().getIndexOfThisSubtask() == NUM_OF_REGIONS - 1) {
lastCompletedCheckpointId.set(checkpointId);
snapshotIndicesOfSubTask.put(checkpointId, lastRegionIndex);
numCompletedCheckpoints.incrementAndGet();
}
}
{code}
And below is the logs which record this situation.
{code:java}
10:44:38,755 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 15 @ 1565088278754 for job
9e0fbeaa580123e05cfce5554f443d23.
10:44:38,762 INFO org.apache.flink.runtime.taskmanager.Task
- Map (3/3) (fb073c45e552de69f06e8037f581491c) switched from RUNNING to FAILED.
org.apache.flink.test.checkpointing.RegionFailoverITCase$TestException
at
org.apache.flink.test.checkpointing.RegionFailoverITCase$FailingMapperFunction.map(RegionFailoverITCase.java:344)
at
org.apache.flink.test.checkpointing.RegionFailoverITCase$FailingMapperFunction.map(RegionFailoverITCase.java:315)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:269)
at
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:140)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:688)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
at java.lang.Thread.run(Thread.java:748)
10:44:38,776 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
- Calculating tasks to restart to recover the failed task
5c51e52cde5a1c4df827ddb38fbc8da9_2.
10:44:38,776 INFO
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
- 3 tasks should be restarted to recover the failed task
5c51e52cde5a1c4df827ddb38fbc8da9_2.
10:44:38,776 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Source: Custom Source (3/3) (1a7ad9441ead7561b1a92bef8d46e421) switched from
RUNNING to CANCELING.
10:44:38,776 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Sink: Unnamed (3/3) (6cdcfb1522f95345e0afbd32e82aaa68) switched from RUNNING
to CANCELING.
10:44:38,777 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
- Discarding the results produced by task execution
fb073c45e552de69f06e8037f581491c.
10:44:38,777 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to cancel task Source: Custom Source (3/3)
(1a7ad9441ead7561b1a92bef8d46e421).
10:44:38,780 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source (3/3) (1a7ad9441ead7561b1a92bef8d46e421) switched from
RUNNING to CANCELING.
10:44:38,780 INFO org.apache.flink.runtime.taskmanager.Task
- Triggering cancellation of task code Source: Custom Source (3/3)
(1a7ad9441ead7561b1a92bef8d46e421).
10:44:38,780 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 15 for job 9e0fbeaa580123e05cfce5554f443d23 (35251
bytes in 26 ms).
{code}
I plan to use a customized {{CompletedCheckpointStore}} to record information
instead of relaying on the {{notifyCheckpointComplete}} invoked on task side to
fix this problem. Pleas assign this ticket to me.
> RegionFailoverITCase is unstable
> --------------------------------
>
> Key: FLINK-13601
> URL: https://issues.apache.org/jira/browse/FLINK-13601
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Coordination
> Affects Versions: 1.9.0, 1.10.0
> Reporter: Aljoscha Krettek
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.9.0
>
>
> Excerpt from https://travis-ci.com/flink-ci/flink/jobs/222711830:
> {code}
> 10:44:31.222 [INFO] Running
> org.apache.flink.test.checkpointing.RegionFailoverITCase
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: 9e0fbeaa580123e05cfce5554f443d23)
> at
> org.apache.flink.client.program.MiniClusterClient.submitJob(MiniClusterClient.java:92)
> at
> org.apache.flink.test.checkpointing.RegionFailoverITCase.testMultiRegionFailover(RegionFailoverITCase.java:132)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> at
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.MiniClusterClient.submitJob(MiniClusterClient.java:90)
> ... 13 more
> Caused by: java.lang.RuntimeException: Test failed due to unexpected
> recovered index: 2000, while last completed checkpoint record index: 1837
> at
> org.apache.flink.test.checkpointing.RegionFailoverITCase$StringGeneratingSourceFunction.initializeState(RegionFailoverITCase.java:300)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:862)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:367)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:688)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:518)
> ... 1 more
> 10:44:39.210 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time
> elapsed: 7.983 s <<< FAILURE! - in
> org.apache.flink.test.checkpointing.RegionFailoverITCase
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)