[ https://issues.apache.org/jira/browse/FLINK-17560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116940#comment-17116940 ]
josson paul kalapparambath commented on FLINK-17560: ---------------------------------------------------- [~xintongsong] I am able to root cause this problem to stuck tasks in a highly threaded environment. Below i have explained how I was able to re-produce this issue(not consistently though) *Scenario: 1* Job ID: JobID123 (parallelism 1) TM has only Single slot h5. Step 1: Schedule jobA JobID123 is running on TM on a slot with allocation ID: *'AllocationID123'.* Now *JobID123* is mapped to allocationID: *AllocationID123'* h5. Step 2: Zookeeper stop Task manager tries to cancel/fail all the tasks on *'AllocationID123'*. But some of the tasks got stuck and never stops. Which means the *'finally'* block which cleans up things never got called. At this point, the above mentioned tasks are in *CANCELLING* state. But the status of the Slot is still in *'ALLOCATED'* state. And it is allocated to job id : *JobID123* I can see that Flink has a Cancel task thread/Interrupter thread/ Watch dog thread. But why this task is still stuck. Below I have pasted lines from the thread dump from one such instance where we had a Task stuck. Are we hitting this issue -> [https://bugs.openjdk.java.net/browse/JDK-8227375 .|https://bugs.openjdk.java.net/browse/JDK-8227375] We are using Java-8 Note: This is not the task which is always stuck. It can be any task from the pipelines. {code:java} "OutputFlusher for Source: KUS/snmp_trap_proto/Kafka-Read/Read(KafkaUnboundedSource) -> FlatMap -> KUS/snmp_trap_proto/KafkaRecordToCTuple/ParMultiDo(KafkaRecordToCTuple) -> HSTrapParDoInit117/ParMultiDo(HealthScoreTrapProcParDo) -> ParDo(GroupDataByEntityId)/ParMultiDo(GroupDataByEntityId) -> ApplyHSEventFixedWindow118/Window.Assign.out -> ToKeyedWorkItem" #1542 daemon prio=5 os_prio=0 tid=0x00007f5c0daff000 nid=0x638 sleeping[0x00007f5b91267000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:362) Locked ownable synchronizers: - None "deviceprocessor" #1538 prio=5 os_prio=0 tid=0x00007f5c0ed1a000 nid=0x634 runnable [0x00007f5bb45d6000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000000049fc13cd8> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362) at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941) at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:613) at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:228) at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:64) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:281) - locked <0x0000000499000420> (a java.lang.Object) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Locked ownable synchronizers: - None {code} h5. Step 3: Zookeeper start Even though the tasks are not fully cancelled, the the status of the Slot is still in 'ALLOCATED'. Now the TM offers this Slot (*AllocationID123*) to JM. JM doesn't care and happily deploys the tasks. (*Is this intended?.*). All are happy and the job is running fine h5. Step 4: Job manager restart (Problem happens at this stage) TM offers the same slot (with allocation id: *AllocationID123*). Now unfortunately JM throws an exception (This is a rare scenario and happened because of our internal change in the scheduling part. We are fixing it.). When this happens the TM see that it still has Tasks running (Old stuck task) and change the status of slot to *'RELEASING'.* Once this happens, the TM doesn't have any more slots to offer to the JM and the pipeline will be in a scheduling loop at JM side. Even if the JM hadn't thrown any error, we would have got into a 'slot' lost scenario if we try to cancel the pipeline. Cancelling pipeline will not clean up this slot because of the stuck job. *Scenario: 2* Everything is same except we have *2 slots* instead of 1 at TM side. h5. Step1: Same as *Scenario: 1* h5. ** Step2: Same as *Scenario: 1* h5. ** Step3: zookeeper start At this point JM request for a 'New Slot'. TM associates both Slot1 (Old allocation id) and Slot2 (new allocation id) to the job ID: *Job123*. JM schedules the *Job123* on 'Slot2'. JM tries to free up 'Slot1' and 'Slot1' get into 'RELEASING' status and never recovers because of the stuck task. I am attaching the JM and TM logs and the thread dump also. Job id: 3c632ca764b56741d2571ffae9b877f0 Allocation id: 576a29115a9f0b44be4d61ab7dee233c Execution id: 10d5ea4915c3d50f96a3e8d1dad2c51f grep "10d5ea4915c3d50f96a3e8d1dad2c51f" tm.log . You can see that 'finally' never gets called. [^tm.log] > No Slots available exception in Apache Flink Job Manager while Scheduling > ------------------------------------------------------------------------- > > Key: FLINK-17560 > URL: https://issues.apache.org/jira/browse/FLINK-17560 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.8.3 > Environment: Flink verson 1.8.3 > Session cluster > Reporter: josson paul kalapparambath > Priority: Major > Attachments: jobmgr.log, threaddump-tm.txt, tm.log > > > Set up > ------ > Flink verson 1.8.3 > Zookeeper HA cluster > 1 ResourceManager/Dispatcher (Same Node) > 1 TaskManager > 4 pipelines running with various parallelism's > Issue > ------ > Occationally when the Job Manager gets restarted we noticed that all the > pipelines are not getting scheduled. The error that is reporeted by the Job > Manger is 'not enough slots are available'. This should not be the case > because task manager was deployed with sufficient slots for the number of > pipelines/parallelism we have. > We further noticed that the slot report sent by the taskmanger contains solts > filled with old CANCELLED job Ids. I am not sure why the task manager still > holds the details of the old jobs. Thread dump on the task manager confirms > that old pipelines are not running. > I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this is > not the issue happening in this case. -- This message was sent by Atlassian Jira (v8.3.4#803005)