[ 
https://issues.apache.org/jira/browse/FLINK-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15509469#comment-15509469
 ] 

Stephan Ewen commented on FLINK-4632:
-------------------------------------

Do you know which of the following two situations is the case?

(1) A task is in "CANCELING" and then the TaskManager for that specific task is 
killed. For some reason, Flink does not learn about the lost TaskManager and 
waits for the cancellation to complete.

(2) Flink tries to cancel a Task, but the Task can simply not cancel. Currently 
Flink relies on the cooperation of Tasks to cancel. If they cannot cancel, the 
cancelling may get stuck.


If it is (2), we are currently adding a safety net that kills the TaskManager 
is cancelling takes too long. (killing the process is the only guarnteed way in 
the JVM to terminate a thread).
That should in the YARN and Mesos cases help to recover.

> when yarn nodemanager lost,  flink hung
> ---------------------------------------
>
>                 Key: FLINK-4632
>                 URL: https://issues.apache.org/jira/browse/FLINK-4632
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, Streaming
>    Affects Versions: 1.2.0, 1.1.2
>         Environment: cdh5.5.1  jdk1.7 flink1.1.2  1.2-snapshot   kafka0.8.2
>            Reporter: 刘喆
>
> When run flink streaming on yarn,  using kafka as source,  it runs well when 
> start. But after long run, for example  8 hours, dealing 60,000,000+ 
> messages, it hung: no messages consumed,   one taskmanager is CANCELING, the 
> exception show:
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> connection timeout
>       at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:152)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>       at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>       at 
> io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>       at 
> io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: 连接超时
>       at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>       at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>       at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>       at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>       at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>       at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
>       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>       ... 6 more
> after apply    https://issues.apache.org/jira/browse/FLINK-4625   
> it show:
> java.lang.Exception: TaskManager was lost/killed: 
> ResourceID{resourceId='container_1471620986643_744852_01_001400'} @ 
> 38.slave.adh (dataPort=45349)
>       at 
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:162)
>       at 
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>       at 
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:138)
>       at 
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>       at 
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:224)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1054)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:458)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
>       at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>       at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to