[ https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153156#comment-16153156 ]
ASF GitHub Bot commented on FLINK-7378: --------------------------------------- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4485#discussion_r136904188 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java --- @@ -372,6 +375,52 @@ public void testRequestBackoffConfiguration() throws Exception { } } + /** + * Tests that input gate requests and assigns network buffers for remote input channel, and triggers + * this process after unknown input channel updates to remote input channel. + */ + @Test + public void testRequestBuffersForInputChannel() throws Exception { + final TaskIOMetricGroup metrics = new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup(); + final SingleInputGate inputGate = new SingleInputGate( + "t1", + new JobID(), + new IntermediateDataSetID(), + ResultPartitionType.PIPELINED_CREDIT_BASED, + 0, + 1, + mock(TaskActions.class), + metrics); + RemoteInputChannel remote = mock(RemoteInputChannel.class); + inputGate.setInputChannel(new IntermediateResultPartitionID(), remote); + + final int buffersPerChannel = 2; + NetworkBufferPool network = mock(NetworkBufferPool.class); + inputGate.assignExclusiveSegments(network, buffersPerChannel); + + verify(network, times(1)).requestMemorySegments(buffersPerChannel); + verify(remote, times(1)).assignExclusiveSegments(anyList()); + + final UnknownInputChannel unknown = new UnknownInputChannel( + inputGate, + 0, + new ResultPartitionID(), + new ResultPartitionManager(), + new TaskEventDispatcher(), + new LocalConnectionManager(), + 0, + 0, + metrics); + inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown); + + // Update to a remote channel and verify that requesting buffers is triggered + inputGate.updateInputChannel(new InputChannelDeploymentDescriptor( + unknown.partitionId, + ResultPartitionLocation.createRemote(mock(ConnectionID.class)))); + + verify(network, times(2)).requestMemorySegments(buffersPerChannel); --- End diff -- In order to verify `assignExclusiveSegments` for `UnknownInputChannel#toRemoteInputChannel`, I modified the `current.getClass() == UnknowInputChannel.class` to `current instanceof UnknownInputChannel` in `SingleInputGate#updateInputChannel`. To do so, I think it is friendly and easy for tests to mock `UnknownInputChannel`. Do you have other concerns of this modification? > Create a fix size (non rebalancing) buffer pool type for the floating buffers > ----------------------------------------------------------------------------- > > Key: FLINK-7378 > URL: https://issues.apache.org/jira/browse/FLINK-7378 > Project: Flink > Issue Type: Sub-task > Components: Core > Reporter: zhijiang > Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the number of network buffers in {{LocalBufferPool}} for > {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a > is the number of exclusive buffers for each channel and b is the number of > floating buffers shared by all channels. > Considering the credit-based flow control feature, we want to create a fix > size buffer pool used to manage the floating buffers for {{SingleInputGate}}. > And the exclusive buffers are assigned to {{InputChannel}}s directly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)