[
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879173#comment-16879173
]
Stephan Ewen commented on FLINK-12852:
--------------------------------------
So, in summary we have these options.
1. Only allocate the minimum per producer, *which is one buffer per channel*.
This would be needed to keep the requirement similar to what we have at the
moment, but it is much less than we recommend for the credit-based network data
exchange (2* channels + floating)
==> *Not a good option in my opinion*
2. Coordinate the deployment sink-to-source such that receivers always have
their buffers first. This will be complex to implement and coordinate and break
with many assumptions about tasks being independent (coordination wise) on the
TaskManagers. Giving that assumption up will be a pretty big step and cause
lot's of complexity in the future.
It will also increase deployment delays. Low deployment delays should be
a design goal in my opinion, as it will enable other features more easily, like
low-disruption upgrades, etc.
==> *Not a good option either, in my opinion*
3. Make buffers always revokable, by spilling.
This is tricky to implement very efficiently, especially because
- there is the logic that slices buffers for early sends for the
low-latency streaming stuff
- the spilling request will come from an asynchronous call. That will
probably stay like that even with the mailbox, because the main thread will be
frequently blocked on buffer allocation when this request comes.
==> I think this would explode in complexity and bugs, unless we rewrite
other parts significantly. *Not a good option either*
4. We allocate the recommended number for good throughput (2*numChannels +
floating) per consumer and per producer.
No dynamic rebalancing any more. This would increase the number of
required network buffers in certain high-parallelism scenarios quite a bit with
the default config. Users can down-configure this by setting the per-channel
buffers lower. But it would break user setups and require them to adjust the
config when upgrading.
==> *Not a great option, in my opinion*
5. We make the network resource per slot and ask the scheduler to attach
information about how many producers and how many consumers will be in the
slot, worst case. We use that to pre-compute how many excess buffers the
producers may take.
This will also break with some assumptions and lead us to the point that
we have to pre-compute network buffers in the same way as managed memory.
Seeing how much pain it is with the managed memory, this seems not so great.
==> *Not a great option either*
This is tricky. At this point it looks like option (4) is the only one that is
feasible without severe performance issues or an explosion of complexity.
> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> ----------------------------------------------------------------------
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.7.2, 1.8.1, 1.9.0
> Reporter: Yun Gao
> Assignee: Yun Gao
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.9.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> When running tests with an upstream vertex and downstream vertex, deadlock
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x00007f2cca81b000
> nid=0x38845 waiting on condition [0x00007f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000073ed6b6f0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x000000073fbc81f0> (a java.lang.Object)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and
> there may be over-allocation, when assignExclusiveSegments there are no
> available memory.
>
> The detail of the scenarios is as follows: The parallelism of both upstream
> vertex and downstream vertex are 1500 and 500 respectively. There are 200 TM
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces
> data quickly and each occupy about 990 buffers. Then the DownStream task
> starts and try to assigning exclusive buffers for 1500 -9 = 1491
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all
> downstream tasks can start, the job will be blocked finally and no buffer can
> be released, and the deadlock finally occurred.
>
> I think although increasing the network memory solves the problem, the
> deadlock may not be acceptable. Fined grained resource management
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the
> network memory into the ResourceProfile.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)