zhijiangW commented on a change in pull request #12994:
URL: https://github.com/apache/flink/pull/12994#discussion_r466927353
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
##########
@@ -161,13 +155,13 @@ public void recycle(MemorySegment segment) {
}
@Override
- public List<MemorySegment> requestMemorySegments() throws IOException {
+ public List<MemorySegment> requestMemorySegments(int
numberOfSegmentsToRequest) throws IOException {
Review comment:
Do you think we should check the passed argument more than `0` in
advance?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##########
@@ -1121,14 +1118,15 @@ public void testUnblockReleasedChannel() throws
Exception {
remoteChannel.resumeConsumption();
}
- //
---------------------------------------------------------------------------------------------
-
- public static RemoteInputChannel
createRemoteInputChannel(SingleInputGate inputGate, int numExclusiveSegments) {
- return InputChannelBuilder.newBuilder()
- .setNetworkBuffersPerChannel(numExclusiveSegments)
- .buildRemoteChannel(inputGate);
+ @Test(expected = IllegalStateException.class)
+ public void testIllegalNumberOfExclusiveBuffers() throws Exception {
Review comment:
I guess this test should be adjusted as well based on above comments.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -128,8 +128,9 @@ public RemoteInputChannel(
void assignExclusiveSegments() throws IOException {
checkState(bufferManager.unsynchronizedGetAvailableExclusiveBuffers() == 0,
"Bug in input channel setup logic: exclusive buffers
have already been set for this input channel.");
+ checkState(initialCredit > 0, "Number of exclusive buffers must
be larger than 0.");
Review comment:
nit: if we want to check this argument, it would be better to be done in
the initialization of constructor.
I think we can remove it directly since the passed
`networkBuffersPerChannel` in constructor should already be checked in previous
component.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
##########
@@ -1121,14 +1118,15 @@ public void testUnblockReleasedChannel() throws
Exception {
remoteChannel.resumeConsumption();
}
- //
---------------------------------------------------------------------------------------------
-
- public static RemoteInputChannel
createRemoteInputChannel(SingleInputGate inputGate, int numExclusiveSegments) {
Review comment:
This change seems unnecessary?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]