[
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315835#comment-16315835
]
ASF GitHub Bot commented on FLINK-7456:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4552#discussion_r157760652
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
---
@@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
MemorySegment memSeg =
MemorySegmentFactory.wrap(byteArray);
Buffer buffer = new Buffer(memSeg,
FreeingBufferRecycler.INSTANCE, false);
- inputChannel.onBuffer(buffer,
bufferOrEvent.sequenceNumber, -1);
-
- return true;
- }
- }
- finally {
- if (releaseNettyBuffer) {
- bufferOrEvent.releaseBuffer();
+ inputChannel.onBuffer(buffer,
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
}
+ } finally {
+ bufferOrEvent.releaseBuffer();
}
}
- /**
- * This class would be replaced by CreditBasedClientHandler in the
final,
- * so we only implement this method in CreditBasedClientHandler.
- */
- void notifyCreditAvailable(RemoteInputChannel inputChannel) {
- }
-
- private class AsyncErrorNotificationTask implements Runnable {
-
- private final Throwable error;
-
- public AsyncErrorNotificationTask(Throwable error) {
- this.error = error;
- }
-
- @Override
- public void run() {
- notifyAllChannelsOfErrorAndClose(error);
- }
- }
-
- /**
- * A buffer availability listener, which subscribes/unsubscribes the NIO
- * read event.
- *
- * <p>If no buffer is available, the channel read event will be
unsubscribed
- * until one becomes available again.
- *
- * <p>After a buffer becomes available again, the buffer is handed over
by
- * the thread calling {@link #notifyBufferAvailable(Buffer)} to the
network I/O
- * thread, which then continues the processing of the staged buffer.
- */
- private class BufferListenerTask implements BufferListener, Runnable {
-
- private final AtomicReference<Buffer> availableBuffer = new
AtomicReference<Buffer>();
-
- private NettyMessage.BufferResponse stagedBufferResponse;
-
- private boolean waitForBuffer(BufferProvider bufferProvider,
NettyMessage.BufferResponse bufferResponse) {
-
- stagedBufferResponse = bufferResponse;
-
- if (bufferProvider.addBufferListener(this)) {
- if (ctx.channel().config().isAutoRead()) {
-
ctx.channel().config().setAutoRead(false);
- }
-
- return true;
- }
- else {
- stagedBufferResponse = null;
-
- return false;
- }
- }
-
- private boolean hasStagedBufferOrEvent() {
- return stagedBufferResponse != null;
- }
-
- public void notifyBufferDestroyed() {
- // The buffer pool has been destroyed
- stagedBufferResponse = null;
-
- if (stagedMessages.isEmpty()) {
- ctx.channel().config().setAutoRead(true);
- ctx.channel().read();
- }
- else {
-
ctx.channel().eventLoop().execute(stagedMessagesHandler);
- }
+ private void writeAndFlushNextMessageIfPossible(Channel channel) {
--- End diff --
please re-add the comment here, too
> Implement Netty sender incoming pipeline for credit-based
> ---------------------------------------------------------
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer
> {{currentCredit}} from receiver. Once receiving the messages of
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered
> available for transfer to make sure it is enqueued in handler only once. If
> the {{currentCredit}} increases from zero and there are available buffers in
> the subpartition, the corresponding view will be enqueued for transferring
> data.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)