[
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274376#comment-16274376
]
ASF GitHub Bot commented on FLINK-7416:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4533#discussion_r154318950
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -280,94 +280,120 @@ public String toString() {
//
------------------------------------------------------------------------
/**
- * Enqueue this input channel in the pipeline for sending unannounced
credits to producer.
+ * Enqueue this input channel in the pipeline for notifying the
producer of unannounced credit.
*/
void notifyCreditAvailable() {
- //TODO in next PR
+ checkState(partitionRequestClient != null, "Tried to send task
event to producer before requesting a queue.");
+
+ // We should skip the notification if this channel is already
released.
+ if (!isReleased.get()) {
+ partitionRequestClient.notifyCreditAvailable(this);
+ }
}
/**
- * Exclusive buffer is recycled to this input channel directly and it
may trigger notify
- * credit to producer.
+ * Exclusive buffer is recycled to this input channel directly and it
may trigger return extra
+ * floating buffer and notify increased credit to the producer.
*
* @param segment The exclusive segment of this channel.
*/
@Override
public void recycle(MemorySegment segment) {
- synchronized (availableBuffers) {
- // Important: the isReleased check should be inside the
synchronized block.
- // that way the segment can also be returned to global
pool after added into
- // the available queue during releasing all resources.
+ int numAddedBuffers;
+
+ synchronized (bufferQueue) {
+ // Important: check the isReleased state inside
synchronized block, so there is no
+ // race condition when recycle and releaseAllResources
running in parallel.
if (isReleased.get()) {
try {
-
inputGate.returnExclusiveSegments(Arrays.asList(segment));
+
inputGate.returnExclusiveSegments(Collections.singletonList(segment));
return;
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
- availableBuffers.add(new Buffer(segment, this));
+ numAddedBuffers = bufferQueue.addExclusiveBuffer(new
Buffer(segment, this), numRequiredBuffers);
}
- if (unannouncedCredit.getAndAdd(1) == 0) {
+ if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(1) == 0)
{
notifyCreditAvailable();
}
}
public int getNumberOfAvailableBuffers() {
- synchronized (availableBuffers) {
- return availableBuffers.size();
+ synchronized (bufferQueue) {
+ return bufferQueue.getAvailableBufferSize();
}
}
+ @VisibleForTesting
+ public int getNumberOfRequiredBuffers() {
--- End diff --
this could be package-private (and then remove `@VisibleForTesting`)
> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> * We define a new message called {{AddCredit}} to notify the incremental
> credit during data shuffle.
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}}
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and
> contain as much credit as available for the channel at that point in time.
> Otherwise, it would only add latency to the announcements and not increase
> throughput.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)