[ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16877073#comment-16877073
 ] 

zhijiang commented on FLINK-12852:
----------------------------------

[~StephanEwen] thanks for concerning this issue and sharing the suggestions.

The root reason of this issue is the mechanism of distributing global buffers 
among LocalBufferPools. As we know the local pool has core size and max size 
for partition/gate. The core size must be satisfied otherwise an exception 
would be thrown. Every local pool could use max size at most if there are 
enough buffers in global pool to make better use of resource. But the 
precondition is the extra buffers beyond core size could be returned finally 
after redistribution.  This precondition/assumption is not always satisfied 
especially in credit-based mode for the following reasons:
 * In non-credit mode, the producer could always push data to network until 
backpressure, so the extra used buffers in partition's local pool could be 
recycled in time. But now the producer could not send buffer to network until 
the consumer requested the exclusive buffers as initial credits.
 * In non-credit mode, the gate's local pool only needs the core size equal to 
numChannels, but now we need 2*numChannels as exclusive core size by default in 
credit-based mode. So the probability is higher than before.
 * In non-credit mode, the buffer request from global pool is lazy by data 
driven, that means after consumer receives data from network, then the local 
pool would request buffer from global pool. But now the exclusive buffer 
request from global pool is eager during startup. So the probability is also 
higher than before. If we make exclusive request also lazy, then it might also 
relieve the deadlock issue.

I think the previous non-credit mode could not avoid deadlock completely in 
theory, not very sure, especially for some corner cases. But in credit-based 
mode, the above factors would make the probability of deadlock much higher than 
before. 

I agree with the implications mentioned by Stephan, the current blocking 
partition would not exist this issue. For streaming job it still exists this 
probability, but i think it should not be a blocker for release-1.9.

The current proposed PR makes the deadlock to fail instead. It seems a bit 
better than now to tell users what happens, but it does not solve this issue in 
root. And users might still have a bad experience. We ever avoided this issue 
in alibaba via network resource matching in ResourceProfile and fixed size in 
local pool.

I also agree with the general ideas Stephan proposed for solving this issue. 
The slot resource isolation (including shuffle resource) might be a right way 
to go in future. ATM we could make some improvements to decrease this 
probability, such as making exclusive size as 1 by default which Jira was 
already created before and making exclusive request lazy as floating buffers.

 

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
>                 Key: FLINK-12852
>                 URL: https://issues.apache.org/jira/browse/FLINK-12852
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.7.2, 1.8.1, 1.9.0
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000 
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to