[ https://issues.apache.org/jira/browse/FLINK-13527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann resolved FLINK-13527. ----------------------------------- Resolution: Duplicate This test instability should be fixed via FLINK-13593. > Instable KafkaProducerExactlyOnceITCase due to CheckpointFailureManager > ----------------------------------------------------------------------- > > Key: FLINK-13527 > URL: https://issues.apache.org/jira/browse/FLINK-13527 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.9.0 > Reporter: Yun Tang > Assignee: vinoyang > Priority: Blocker > > [~banmoy] and I met this instable test below: > [https://api.travis-ci.org/v3/job/565270958/log.txt] > [https://api.travis-ci.com/v3/job/221237628/log.txt] > The root cause is task {{Source: Custom Source -> Map -> Sink: Unnamed > (1/1)}} failed due to expected artificial test failure and then free task > resource including closing the registry. However, the async checkpoint thread > in {{SourceStreamTask}} would then failed and send decline checkpoint message > to JM. > The key logs is like: > {code:java} > 03:36:46,639 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Source: Custom Source -> Map -> Sink: Unnamed (1/1) > (f45ff068d2c80da22c2a958739ec0c87) switched from RUNNING to FAILED. > java.lang.Exception: Artificial Test Failure > at > org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172) > 03:36:46,637 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Decline checkpoint 12 by task f45ff068d2c80da22c2a958739ec0c87 of job > d5b629623731c66f1bac89dec3e87b89 at 03cbfd77-0727-4366-83c4-9aa4923fc817 @ > localhost (dataPort=-1). > 03:36:46,640 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Discarding checkpoint 12 of job d5b629623731c66f1bac89dec3e87b89. > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > snapshot 12 for operator Source: Custom Source -> Map -> Sink: Unnamed (1/1). > Failure reason: Checkpoint was declined. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:431) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1182) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:853) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:667) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:147) > at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1138) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Cannot register Closeable, registry is > already closed. Closing argument. > at > org.apache.flink.util.AbstractCloseableRegistry.registerCloseable(AbstractCloseableRegistry.java:85) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:122) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable$AsyncSnapshotTask.<init>(AsyncSnapshotCallable.java:110) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.toAsyncSnapshotFutureTask(AsyncSnapshotCallable.java:104) > at > org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture(StateSnapshotContextSynchronousImpl.java:127) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:401) > ... 12 more > 03:36:46,642 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from > state RUNNING to FAILING. > java.lang.Exception: Artificial Test Failure > at > org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper.map(FailingIdentityMapper.java:79) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource.run(IntegerSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:172) > 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Try to restart or fail the job Exactly once test > (d5b629623731c66f1bac89dec3e87b89) if no longer possible. > 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from > state FAILING to RESTARTING. > 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Restarting the job Exactly once test (d5b629623731c66f1bac89dec3e87b89). > 03:36:46,643 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Discarding the results produced by task execution > f45ff068d2c80da22c2a958739ec0c87. > 03:36:46,644 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - Job Exactly once test (d5b629623731c66f1bac89dec3e87b89) switched from > state RESTARTING to FAILING. > org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable > failure threshold. > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$null$1(ExecutionGraph.java:586) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > The failure of {{Source: Custom Source -> Map -> Sink: Unnamed}} would fail > the job for the 1st time. However, due to receive declined checkpoint > {{CheckpointFailureManager}} would also fail the job again for the 2nd time. > Unfortunately, some tests within {{KafkaProducerExactlyOnceITCase}} only > allow one restart attempt by {{FixedDelayRestartStrategy}}, that's why the IT > case failed at last. -- This message was sent by Atlassian JIRA (v7.6.14#76016)