Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4552#discussion_r152985049
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
---
@@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException
{
BufferPool bufferPool = null;
try {
- if
(gate.getConsumedPartitionType().isCreditBased()) {
- // Create a fixed-size buffer
pool for floating buffers and assign exclusive buffers to input channels
directly
- bufferPool =
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
extraNetworkBuffersPerGate);
-
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
- } else {
- int maxNumberOfMemorySegments =
gate.getConsumedPartitionType().isBounded() ?
-
gate.getNumberOfInputChannels() * networkBuffersPerChannel +
-
extraNetworkBuffersPerGate : Integer.MAX_VALUE;
- bufferPool =
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
-
maxNumberOfMemorySegments);
- }
+ // Create a fixed-size buffer pool for
floating buffers and assign exclusive buffers to input channels directly
+ bufferPool =
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
extraNetworkBuffersPerGate);
+
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --
What about the non-bounded partition type that we use for batch processing?
Shouldn't we use an unbounded number of floating buffers there, as previously?
---