[
https://issues.apache.org/jira/browse/FLINK-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326435#comment-16326435
]
ASF GitHub Bot commented on FLINK-7456:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4552#discussion_r161559690
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
---
@@ -71,4 +77,95 @@ public void testProducerFailedException() throws
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse)
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+ /**
+ * Tests {@link
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+ * verifying the reader would be enqueued in the pipeline if the next
sending buffer is event, even
+ * though it has no available credits.
+ */
+ @Test
+ public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+ // setup
+ final ResultSubpartitionView view =
mock(ResultSubpartitionView.class);
+ when(view.nextBufferIsEvent()).thenReturn(true);
+
+ final ResultPartitionID partitionId = new ResultPartitionID();
+ final ResultPartitionProvider partitionProvider =
mock(ResultPartitionProvider.class);
+ when(partitionProvider.createSubpartitionView(
+ eq(partitionId),
+ eq(0),
+
any(BufferAvailabilityListener.class))).thenReturn(view);
--- End diff --
let's remove that mock ...
> Implement Netty sender incoming pipeline for credit-based
> ---------------------------------------------------------
>
> Key: FLINK-7456
> URL: https://issues.apache.org/jira/browse/FLINK-7456
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Priority: Major
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> On sender side, each subpartition view maintains an atomic integer
> {{currentCredit}} from receiver. Once receiving the messages of
> {{PartitionRequest}} and {{AddCredit}}, the {{currentCredit}} is added by
> deltas.
> Each view also maintains an atomic boolean field to mark it as registered
> available for transfer to make sure it is enqueued in handler only once. If
> the {{currentCredit}} increases from zero and there are available buffers in
> the subpartition, the corresponding view will be enqueued for transferring
> data.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)