[
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264175#comment-16264175
]
Stefan Richter commented on FLINK-8098:
---------------------------------------
Parallel scripts was just another option that I have seen before causing a
similar problem.
I think I know where the exception comes from: your task is failing from some
other error and Flink going through the shutdown procedure. This includes
closing the HDFS output streams for ongoing checkpoints to terminate
potentially blocking IO calls. This closing also deletes the files in HDFS to
avoid lingering checkpoint files. At the same time, it is possible that the
async checkpoint was already in the process of finalizing exactly this file and
the file already was removed by the shutdown procedure's close. This means two
things: 1. the exception is not the cause, but an effect of your job failing
from something else, and 2. it is a sideeffect of the fast shutdown procedure
closing and cleaning up things. While this logs an exception, it is actually
all good: the file was just cleaned up already from the shutdown and the
already canceled (!) checkpoint fails because of this.
If you agree, can you please close the issue?
> LeaseExpiredException when using FsStateBackend for checkpointing due to
> multiple mappers tries to access the same file.
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
> Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled
> checkpointing(1000). But application gets the crash after 1-2 days. After
> analysing logs i found following trace.
> {code}
> 2017-11-17 11:19:06,696 WARN
> org.apache.flink.streaming.runtime.tasks.StreamTask - Could not
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
> at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
> at
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
> at
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
> ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
> in order to obtain the stream state handle
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
> at
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
> at
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
> (inode 812148671): File does not exist. [Lease. Holder:
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3749)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3716)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:911)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:547)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2345)
> at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> at com.sun.proxy.$Proxy12.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462)
> at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy13.complete(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2291)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2272)
> at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:48)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:319)
> ... 12 more
> {code}
> So may be multiple mappers were trying to write the "same file".
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)