[ 
https://issues.apache.org/jira/browse/FLINK-17560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17116940#comment-17116940
 ] 

josson paul kalapparambath commented on FLINK-17560:
----------------------------------------------------

[~xintongsong]

I am able to root cause this problem to stuck tasks in a highly threaded 
environment.

Below i have explained how I was able to re-produce this issue(not consistently 
though)

 

*Scenario: 1*

Job ID: JobID123 (parallelism 1) 

TM has only Single slot
h5. Step 1: Schedule jobA

JobID123 is running on TM on a slot with allocation ID: *'AllocationID123'.* 

Now *JobID123* is mapped to allocationID: *AllocationID123'*
h5. Step 2: Zookeeper stop 

  Task manager tries to cancel/fail all the tasks on *'AllocationID123'*. But 
some of the tasks got stuck and never stops. Which means the *'finally'* block 
which cleans up things never got called.

At this point, the above mentioned tasks are in *CANCELLING* state. But the 
status of the Slot is still in *'ALLOCATED'* state. And it is allocated to job 
id : *JobID123*

I can see that Flink has a Cancel task thread/Interrupter thread/ Watch dog 
thread. But why this task is still stuck. Below I have pasted lines from the 
thread dump from one such instance where we had a Task stuck. Are we hitting 
this issue -> [https://bugs.openjdk.java.net/browse/JDK-8227375 
.|https://bugs.openjdk.java.net/browse/JDK-8227375]

We are using Java-8

Note: This is not the task which is always stuck. It can be any task from the 
pipelines.
{code:java}
"OutputFlusher for Source: 
KUS/snmp_trap_proto/Kafka-Read/Read(KafkaUnboundedSource) -> FlatMap -> 
KUS/snmp_trap_proto/KafkaRecordToCTuple/ParMultiDo(KafkaRecordToCTuple) -> 
HSTrapParDoInit117/ParMultiDo(HealthScoreTrapProcParDo) -> 
ParDo(GroupDataByEntityId)/ParMultiDo(GroupDataByEntityId) -> 
ApplyHSEventFixedWindow118/Window.Assign.out -> ToKeyedWorkItem" #1542 daemon 
prio=5 os_prio=0 tid=0x00007f5c0daff000 nid=0x638 sleeping[0x00007f5b91267000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:362)


   Locked ownable synchronizers:
        - None


"deviceprocessor" #1538 prio=5 os_prio=0 tid=0x00007f5c0ed1a000 nid=0x634 
runnable [0x00007f5bb45d6000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000049fc13cd8> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
        at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
        at 
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.nextBatch(KafkaUnboundedReader.java:613)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:228)
        at 
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeAdvance(ReaderInvocationUtil.java:64)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:281)
        - locked <0x0000000499000420> (a java.lang.Object)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)


   Locked ownable synchronizers:
        - None {code}
h5. Step 3: Zookeeper start

  Even though the tasks are not fully cancelled, the the status of the Slot is 
still in 'ALLOCATED'. Now the TM offers this Slot (*AllocationID123*) to JM. JM 
doesn't care and happily deploys the tasks. (*Is this intended?.*). All are 
happy and the job is running fine
h5. Step 4: Job manager restart (Problem happens at this stage)

TM offers the same slot (with allocation id: *AllocationID123*). Now 
unfortunately JM throws an exception (This is a rare scenario and happened 
because of our internal change in the scheduling part. We are fixing it.). When 
this happens the TM see that it still has Tasks running (Old stuck task) and 
change the status of slot to *'RELEASING'.* Once this happens, the TM doesn't 
have any more slots to offer to the JM and the pipeline will be in a scheduling 
loop at JM side.

Even if the JM hadn't thrown any error, we would have got into a 'slot' lost 
scenario if we try to cancel the pipeline. Cancelling pipeline will not clean 
up this slot because of the stuck job.

*Scenario: 2*

Everything is same except we have *2 slots* instead of 1 at TM side.
h5.  Step1: Same as *Scenario: 1*
h5.  ** Step2: Same as *Scenario: 1*
h5.  ** Step3: zookeeper start

  At this point JM request for a 'New Slot'. TM associates both Slot1 (Old 
allocation id) and Slot2 (new allocation id) to the job ID: *Job123*. JM 
schedules the *Job123* on 'Slot2'. JM tries to free up 'Slot1' and 'Slot1' get 
into 'RELEASING' status and never recovers because of the stuck task. 

I am attaching the JM and TM logs and the thread dump also.

Job id: 3c632ca764b56741d2571ffae9b877f0

Allocation id: 576a29115a9f0b44be4d61ab7dee233c

Execution id: 10d5ea4915c3d50f96a3e8d1dad2c51f

grep "10d5ea4915c3d50f96a3e8d1dad2c51f" tm.log . You can see that 'finally' 
never gets called. [^tm.log]

> No Slots available exception in Apache Flink Job Manager while Scheduling
> -------------------------------------------------------------------------
>
>                 Key: FLINK-17560
>                 URL: https://issues.apache.org/jira/browse/FLINK-17560
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.8.3
>         Environment: Flink verson 1.8.3
> Session cluster
>            Reporter: josson paul kalapparambath
>            Priority: Major
>         Attachments: jobmgr.log, threaddump-tm.txt, tm.log
>
>
> Set up
> ------
> Flink verson 1.8.3
> Zookeeper HA cluster
> 1 ResourceManager/Dispatcher (Same Node)
> 1 TaskManager
> 4 pipelines running with various parallelism's
> Issue
> ------
> Occationally when the Job Manager gets restarted we noticed that all the 
> pipelines are not getting scheduled. The error that is reporeted by the Job 
> Manger is 'not enough slots are available'. This should not be the case 
> because task manager was deployed with sufficient slots for the number of 
> pipelines/parallelism we have.
> We further noticed that the slot report sent by the taskmanger contains solts 
> filled with old CANCELLED job Ids. I am not sure why the task manager still 
> holds the details of the old jobs. Thread dump on the task manager confirms 
> that old pipelines are not running.
> I am aware of https://issues.apache.org/jira/browse/FLINK-12865. But this is 
> not the issue happening in this case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to