Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141886467
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -390,7 +390,63 @@ public BufferProvider getBufferProvider() throws
IOException {
return inputGate.getBufferProvider();
}
- public void onBuffer(Buffer buffer, int sequenceNumber) {
+ /**
+ * Requests buffer from input channel directly for receiving network
data.
+ * It should always return an available buffer in credit-based mode.
+ *
+ * @return The available buffer.
+ */
+ public Buffer requestBuffer() {
+ synchronized (availableBuffers) {
+ return availableBuffers.poll();
+ }
+ }
+
+ /**
+ * Receives the backlog from producer's buffer response. If the number
of available
+ * buffers is less than the backlog length, it will request floating
buffers from buffer
+ * pool, and then notify unannounced credits to the producer.
+ *
+ * @param backlog The number of unsent buffers in the producer's sub
partition.
+ */
+ private void onSenderBacklog(int backlog) {
+ int numRequestedBuffers = 0;
+
+ synchronized (availableBuffers) {
+ // Important: the isReleased check should be inside the
synchronized block.
+ if (!isReleased.get()) {
+ senderBacklog.set(backlog);
+
+ while (senderBacklog.get() >
availableBuffers.size() && !isWaitingForFloatingBuffers.get()) {
--- End diff --
I was thinking about it a bit more and was talking to @StephanEwen about
it, and we think, that it is actually fine to grab all resources we need at the
moment. If there are not enough buffers at some point, the fair distribution
will start when the buffers are recycled, i.e. via the callbacks of the new
`BufferListener`. Since each channel always has its own exclusive buffers, we
can guarantee that it always makes progress anyway! Additionally, we cannot
really make a fair distribution from the start when receiving the first backlog
(since we do not know all the other backlogs) unless we're waiting some time
which we also do not want.
I kind of like your idea of having a `numBuffersPerAllocation`. Let's keep
this in our heads and evaluate the current solution first to see whether we
need this addition.
Regarding the formula (which I took from the network FLIP):
- from the FLIP with regards to the buffer distribution strategy: `Design
rationale 2: Each channel always tries to maintain a credit of
âbacklog_length + initialCreditâ. That means that each channel tries to
build the receive window for its current backlog as much as possible from the
floating buffers, and use the exclusive âinitialCreditâ buffers as a means
to grow the window.` That way we always have some buffers available immediately
on the receiver side so the sender can continue sending new buffers immediately
(as long as there are buffers available on the receiver) and we do not have to
wait for the exclusive buffers to come back.
- Note that this would have to be changed in the various checks for
`availableBuffers.size() >= senderBacklog.get()`, e.g. in
`RemoteInputChannel#notifyBufferAvailable()`.
- Similarly, `RemoteInputChannel#recycle()` needs to be adapted, in case
our exclusive buffers are in use and we requested `backlog_length +
initialCredit - currentCredit` *floating* buffers in order not to stack up
`2*initialCredit` buffers once `backlog == 0` again. (+ an according unit test)
- what do you mean with `backlog-currentCredit` not being very accurate? We
guarantee that there are no more than `currentCredit` buffers on the wire (some
alraedy in the channel, some only announced) and, at the time the buffer was
sent, `backlog` additional buffers were queued so in order to send them, we
always need `backlog-currentCredit` irrespective of how much credit is
announced vs. being on the wire. Or am I not seeing something here?
---