This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 02978da  [FLINK-16645][network] Limit the maximum backlogs in 
subpartitions
02978da is described below

commit 02978da7c49b33def54f8cb8cb4ceb57b022cfd5
Author: Jiayi Liao <[email protected]>
AuthorDate: Mon Mar 30 20:55:30 2020 +0800

    [FLINK-16645][network] Limit the maximum backlogs in subpartitions
    
    In the case of data skew, most of the buffers in #LocalBufferPool can be 
requested away by one
    certain partition, which increases the in-flight data and slow down the 
buffer alignment.
    
    To solve this problem, we add a config to limit the maximum backlog per 
subpartition. In this
    way, we can check subpartition's backlog and make the stream task 
unavailable if the
    limit is exceeded.
---
 .../generated/all_taskmanager_network_section.html |   8 +-
 .../mesos_task_manager_configuration.html          |   6 -
 .../netty_shuffle_environment_configuration.html   |   8 +-
 .../NettyShuffleEnvironmentOptions.java            |  21 ++-
 .../io/network/NettyShuffleServiceFactory.java     |   3 +-
 .../network/api/writer/BroadcastRecordWriter.java  |   2 +-
 .../api/writer/ChannelSelectorRecordWriter.java    |   2 +-
 .../io/network/api/writer/RecordWriter.java        |  15 +--
 .../network/api/writer/ResultPartitionWriter.java  |   4 +-
 .../runtime/io/network/buffer/BufferBuilder.java   |  11 ++
 .../runtime/io/network/buffer/BufferPool.java      |   2 +
 .../io/network/buffer/BufferPoolFactory.java       |  13 +-
 .../runtime/io/network/buffer/BufferProvider.java  |  32 ++++-
 .../runtime/io/network/buffer/LocalBufferPool.java | 143 ++++++++++++++++++---
 .../io/network/buffer/NetworkBufferPool.java       |  33 ++++-
 .../network/partition/PipelinedSubpartition.java   |   2 +-
 .../io/network/partition/ResultPartition.java      |   8 +-
 .../network/partition/ResultPartitionFactory.java  |  10 +-
 ...bleNotifyingResultPartitionWriterDecorator.java |   8 +-
 .../NettyShuffleEnvironmentConfiguration.java      |  18 ++-
 .../io/network/NettyShuffleEnvironmentBuilder.java |  10 +-
 .../AbstractCollectingResultPartitionWriter.java   |   8 +-
 .../api/writer/RecordWriterDelegateTest.java       |   2 +-
 .../io/network/api/writer/RecordWriterTest.java    |  46 ++++---
 .../io/network/buffer/LocalBufferPoolTest.java     |  37 ++++++
 .../io/network/buffer/NetworkBufferPoolTest.java   |   4 +-
 .../runtime/io/network/buffer/NoOpBufferPool.java  |  18 +++
 .../partition/MockResultPartitionWriter.java       |   5 +-
 .../PartialConsumePipelinedResultTest.java         |   2 +-
 .../network/partition/ResultPartitionBuilder.java  |   5 +-
 .../partition/ResultPartitionFactoryTest.java      |   3 +-
 .../io/network/partition/ResultPartitionTest.java  |  32 ++++-
 .../io/network/util/TestPooledBufferProvider.java  |  10 ++
 33 files changed, 425 insertions(+), 106 deletions(-)

diff --git a/docs/_includes/generated/all_taskmanager_network_section.html 
b/docs/_includes/generated/all_taskmanager_network_section.html
index 04a8566..797ac6f 100644
--- a/docs/_includes/generated/all_taskmanager_network_section.html
+++ b/docs/_includes/generated/all_taskmanager_network_section.html
@@ -30,7 +30,7 @@
             <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
             <td style="word-wrap: break-word;">2</td>
             <td>Integer</td>
-            <td>Maximum number of network buffers to use for each 
outgoing/incoming channel (subpartition/input channel).In credit-based flow 
control mode, this indicates how many credits are exclusive in each input 
channel. It should be configured at least 2 for good performance. 1 buffer is 
for receiving in-flight data in the subpartition and 1 buffer is for parallel 
serialization.</td>
+            <td>Number of exclusive network buffers to use for each 
outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow 
control model. It should be configured at least 2 for good performance. 1 
buffer is for receiving in-flight data in the subpartition and 1 buffer is for 
parallel serialization.</td>
         </tr>
         <tr>
             
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
@@ -39,6 +39,12 @@
             <td>Number of extra network buffers to use for each 
outgoing/incoming gate (result partition/input gate). In credit-based flow 
control mode, this indicates how many floating credits are shared among all the 
input channels. The floating buffers are distributed based on backlog 
(real-time output buffers in the subpartition) feedback, and can help relieve 
back-pressure caused by unbalanced data distribution among the subpartitions. 
This value should be increased in case of highe [...]
         </tr>
         <tr>
+            
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>Number of max buffers that can be used for each channel. If a 
channel exceeds the number of max buffers, it will make the task become 
unavailable, cause the back pressure and block the data processing. This might 
speed up checkpoint alignment by preventing excessive growth of the buffered 
in-flight data in case of data skew and high number of configured floating 
buffers. This limit is not strictly guaranteed, and can be ignored by things 
like flatMap operators, records sp [...]
+        </tr>
+        <tr>
             
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
             <td style="word-wrap: break-word;">120</td>
             <td>Integer</td>
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html 
b/docs/_includes/generated/mesos_task_manager_configuration.html
index aa24d11..5d78138 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -86,11 +86,5 @@
             <td>String</td>
             <td>A comma separated list of URIs of custom artifacts to be 
downloaded into the sandbox of Mesos workers.</td>
         </tr>
-        <tr>
-            <td><h5>taskmanager.numberOfTaskSlots</h5></td>
-            <td style="word-wrap: break-word;">1</td>
-            <td>Integer</td>
-            <td>The number of parallel operator or user function instances 
that a single TaskManager can run. If this value is larger than 1, a single 
TaskManager takes multiple instances of a function or operator. That way, the 
TaskManager can utilize multiple CPU cores, but at the same time, the available 
memory is divided between the different operator or function instances. This 
value is typically proportional to the number of physical CPU cores that the 
TaskManager's machine has (e. [...]
-        </tr>
     </tbody>
 </table>
diff --git 
a/docs/_includes/generated/netty_shuffle_environment_configuration.html 
b/docs/_includes/generated/netty_shuffle_environment_configuration.html
index 18a4028..5e00e18 100644
--- a/docs/_includes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/_includes/generated/netty_shuffle_environment_configuration.html
@@ -48,7 +48,7 @@
             <td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
             <td style="word-wrap: break-word;">2</td>
             <td>Integer</td>
-            <td>Maximum number of network buffers to use for each 
outgoing/incoming channel (subpartition/input channel).In credit-based flow 
control mode, this indicates how many credits are exclusive in each input 
channel. It should be configured at least 2 for good performance. 1 buffer is 
for receiving in-flight data in the subpartition and 1 buffer is for parallel 
serialization.</td>
+            <td>Number of exclusive network buffers to use for each 
outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow 
control model. It should be configured at least 2 for good performance. 1 
buffer is for receiving in-flight data in the subpartition and 1 buffer is for 
parallel serialization.</td>
         </tr>
         <tr>
             
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
@@ -57,6 +57,12 @@
             <td>Number of extra network buffers to use for each 
outgoing/incoming gate (result partition/input gate). In credit-based flow 
control mode, this indicates how many floating credits are shared among all the 
input channels. The floating buffers are distributed based on backlog 
(real-time output buffers in the subpartition) feedback, and can help relieve 
back-pressure caused by unbalanced data distribution among the subpartitions. 
This value should be increased in case of highe [...]
         </tr>
         <tr>
+            
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>Number of max buffers that can be used for each channel. If a 
channel exceeds the number of max buffers, it will make the task become 
unavailable, cause the back pressure and block the data processing. This might 
speed up checkpoint alignment by preventing excessive growth of the buffered 
in-flight data in case of data skew and high number of configured floating 
buffers. This limit is not strictly guaranteed, and can be ignored by things 
like flatMap operators, records sp [...]
+        </tr>
+        <tr>
             
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
             <td style="word-wrap: break-word;">120</td>
             <td>Integer</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index c84aa34..cd6ce69 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -156,10 +156,9 @@ public class NettyShuffleEnvironmentOptions {
        public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
                key("taskmanager.network.memory.buffers-per-channel")
                        .defaultValue(2)
-                       .withDescription("Maximum number of network buffers to 
use for each outgoing/incoming channel (subpartition/input channel)." +
-                               "In credit-based flow control mode, this 
indicates how many credits are exclusive in each input channel. It should be" +
-                               " configured at least 2 for good performance. 1 
buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
-                               " for parallel serialization.");
+                       .withDescription("Number of exclusive network buffers 
to use for each outgoing/incoming channel (subpartition/inputchannel)" +
+                               " in the credit-based flow control model. It 
should be configured at least 2 for good performance." +
+                               " 1 buffer is for receiving in-flight data in 
the subpartition and 1 buffer is for parallel serialization.");
 
        /**
         * Number of extra network buffers to use for each outgoing/incoming 
gate (result partition/input gate).
@@ -175,6 +174,20 @@ public class NettyShuffleEnvironmentOptions {
                                " increased in case of higher round trip times 
between nodes and/or larger number of machines in the cluster.");
 
        /**
+        * Number of max buffers can be used for each output subparition.
+        */
+       @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+       public static final ConfigOption<Integer> 
NETWORK_MAX_BUFFERS_PER_CHANNEL =
+               key("taskmanager.network.memory.max-buffers-per-channel")
+                       .defaultValue(10)
+                       .withDescription("Number of max buffers that can be 
used for each channel. If a channel exceeds the number of max" +
+                               " buffers, it will make the task become 
unavailable, cause the back pressure and block the data processing. This" +
+                               " might speed up checkpoint alignment by 
preventing excessive growth of the buffered in-flight data in" +
+                               " case of data skew and high number of 
configured floating buffers. This limit is not strictly guaranteed," +
+                               " and can be ignored by things like flatMap 
operators, records spanning multiple buffers or single timer" +
+                               " producing large amount of data.");
+
+       /**
         * The timeout for requesting exclusive buffers for each channel.
         */
        @Documentation.ExcludeFromDocumentation("This option is purely 
implementation related, and may be removed as the implementation changes.")
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..9c4f95b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -107,7 +107,8 @@ public class NettyShuffleServiceFactory implements 
ShuffleServiceFactory<NettySh
                        config.networkBufferSize(),
                        config.isForcePartitionReleaseOnConsumption(),
                        config.isBlockingShuffleCompressionEnabled(),
-                       config.getCompressionCodec());
+                       config.getCompressionCodec(),
+                       config.getMaxBuffersPerChannel());
 
                SingleInputGateFactory singleInputGateFactory = new 
SingleInputGateFactory(
                        taskExecutorResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 93bc2c4..9964b20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -128,7 +128,7 @@ public final class BroadcastRecordWriter<T extends 
IOReadableWritable> extends R
        public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
                checkState(bufferBuilder == null || bufferBuilder.isFinished());
 
-               BufferBuilder builder = getBufferBuilder();
+               BufferBuilder builder = 
super.requestNewBufferBuilder(targetChannel);
                if (randomTriggered) {
                        addBufferConsumer(builder.createBufferConsumer(), 
targetChannel);
                } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 8393409..298e357 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -100,7 +100,7 @@ public final class ChannelSelectorRecordWriter<T extends 
IOReadableWritable> ext
        public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
                checkState(bufferBuilders[targetChannel] == null || 
bufferBuilders[targetChannel].isFinished());
 
-               BufferBuilder bufferBuilder = getBufferBuilder();
+               BufferBuilder bufferBuilder = 
super.requestNewBufferBuilder(targetChannel);
                addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
                bufferBuilders[targetChannel] = bufferBuilder;
                return bufferBuilder;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index a678e5a..7d0f7de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -225,11 +225,6 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
        abstract BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException;
 
        /**
-        * Requests a new {@link BufferBuilder} for the target channel and 
returns it.
-        */
-       abstract BufferBuilder requestNewBufferBuilder(int targetChannel) 
throws IOException, InterruptedException;
-
-       /**
         * Marks the current {@link BufferBuilder} as finished if present and 
clears the state for next one.
         */
        abstract void tryFinishCurrentBufferBuilder(int targetChannel);
@@ -289,12 +284,14 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
                targetPartition.addBufferConsumer(consumer, targetChannel);
        }
 
-       @VisibleForTesting
-       public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-               BufferBuilder builder = targetPartition.tryGetBufferBuilder();
+       /**
+        * Requests a new {@link BufferBuilder} for the target channel and 
returns it.
+        */
+       public BufferBuilder requestNewBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+               BufferBuilder builder = 
targetPartition.tryGetBufferBuilder(targetChannel);
                if (builder == null) {
                        long start = System.currentTimeMillis();
-                       builder = targetPartition.getBufferBuilder();
+                       builder = 
targetPartition.getBufferBuilder(targetChannel);
                        
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
                }
                return builder;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index efb35dc..f589e07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -59,7 +59,7 @@ public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvid
        /**
         * Requests a {@link BufferBuilder} from this partition for writing 
data.
         */
-       BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException;
+       BufferBuilder getBufferBuilder(int targetChannel) throws IOException, 
InterruptedException;
 
 
        /**
@@ -67,7 +67,7 @@ public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvid
         *
         * <p>Returns <code>null</code> if no buffer is available or the buffer 
provider has been destroyed.
         */
-       BufferBuilder tryGetBufferBuilder() throws IOException;
+       BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException;
 
        /**
         * Adds the bufferConsumer to the subpartition with the given index.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 77d3526..b18569e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -126,6 +127,16 @@ public class BufferBuilder {
                return memorySegment.size();
        }
 
+       @VisibleForTesting
+       public BufferRecycler getRecycler() {
+               return recycler;
+       }
+
+       @VisibleForTesting
+       public MemorySegment getMemorySegment() {
+               return memorySegment;
+       }
+
        /**
         * Holds a reference to the current writer position. Negative values 
indicate that writer ({@link BufferBuilder}
         * has finished. Value {@code Integer.MIN_VALUE} represents finished 
empty buffer.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 511eb65..c1112f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -73,4 +73,6 @@ public interface BufferPool extends BufferProvider, 
BufferRecycler {
         * Returns the number of used buffers of this buffer pool.
         */
        int bestEffortGetNumOfUsedBuffers();
+
+       BufferRecycler[] getSubpartitionBufferRecyclers();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index 7d623f8..1f600a1 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -48,10 +48,19 @@ public interface BufferPoolFactory {
         *              minimum number of network buffers in this pool
         * @param maxUsedBuffers
         *              maximum number of network buffers this pool offers
-        *      @param bufferPoolOwner
+        * @param bufferPoolOwner
         *          the owner of this buffer pool to release memory when needed
+        * @param numSubpartitions
+        *              number of subpartitions in this pool
+        * @param maxBuffersPerChannel
+        *              maximum number of buffers to use for each channel
         */
-       BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, 
BufferPoolOwner bufferPoolOwner) throws IOException;
+       BufferPool createBufferPool(
+               int numRequiredBuffers,
+               int maxUsedBuffers,
+               BufferPoolOwner bufferPoolOwner,
+               int numSubpartitions,
+               int maxBuffersPerChannel) throws IOException;
 
        /**
         * Destroy callback for updating factory book keeping.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 0551804..f2f7f5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.runtime.io.AvailabilityProvider;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -33,26 +35,46 @@ public interface BufferProvider extends 
AvailabilityProvider {
        /**
         * Returns a {@link Buffer} instance from the buffer provider, if one 
is available.
         *
-        * <p>Returns <code>null</code> if no buffer is available or the buffer 
provider has been destroyed.
+        * @return {@code null} if no buffer is available or the buffer 
provider has been destroyed.
         */
-       Buffer requestBuffer() throws IOException;
+       @Nullable Buffer requestBuffer() throws IOException;
 
        /**
-        * Returns a {@link BufferBuilder} instance from the buffer provider.
+        * Returns a {@link BufferBuilder} instance from the buffer provider. 
This equals to {@link #requestBufferBuilder(int)}
+        * with unknown target channel.
         *
-        * <p>Returns <code>null</code> if no buffer is available or the buffer 
provider has been destroyed.
+        * @return {@code null} if no buffer is available or the buffer 
provider has been destroyed.
         */
-       BufferBuilder requestBufferBuilder() throws IOException;
+       @Nullable BufferBuilder requestBufferBuilder() throws IOException;
 
        /**
         * Returns a {@link BufferBuilder} instance from the buffer provider.
         *
+        * @param targetChannel to which the request will be accounted to.
+        * @return {@code null} if no buffer is available or the buffer 
provider has been destroyed.
+        */
+       @Nullable BufferBuilder requestBufferBuilder(int targetChannel) throws 
IOException;
+
+       /**
+        * Returns a {@link BufferBuilder} instance from the buffer provider. 
This equals to {@link #requestBufferBuilderBlocking(int)}
+        * with unknown target channel.
+        *
         * <p>If there is no buffer available, the call will block until one 
becomes available again or the
         * buffer provider has been destroyed.
         */
        BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException;
 
        /**
+        * Returns a {@link BufferBuilder} instance from the buffer provider.
+        *
+        * <p>If there is no buffer available, the call will block until one 
becomes available again or the
+        * buffer provider has been destroyed.
+        *
+        * @param targetChannel to which the request will be accounted to.
+        */
+       BufferBuilder requestBufferBuilderBlocking(int targetChannel) throws 
IOException, InterruptedException;
+
+       /**
         * Adds a buffer availability listener to the buffer provider.
         *
         * <p>The operation fails with return value <code>false</code>, when 
there is a buffer available or
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a6af471..73e01b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
 import 
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.util.ExceptionUtils;
@@ -51,6 +52,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 class LocalBufferPool implements BufferPool {
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalBufferPool.class);
 
+       private static final int UNKNOWN_CHANNEL = -1;
+
        /** Global network buffer pool to get buffers from. */
        private final NetworkBufferPool networkBufferPool;
 
@@ -87,6 +90,14 @@ class LocalBufferPool implements BufferPool {
         */
        private int numberOfRequestedMemorySegments;
 
+       private final int maxBuffersPerChannel;
+
+       private final int[] subpartitionBuffersCount;
+
+       private final BufferRecycler[] subpartitionBufferRecyclers;
+
+       private int unavailableSubpartitionsCount = 0;
+
        private boolean isDestroyed;
 
        @Nullable
@@ -104,7 +115,13 @@ class LocalBufferPool implements BufferPool {
         *              minimum number of network buffers
         */
        LocalBufferPool(NetworkBufferPool networkBufferPool, int 
numberOfRequiredMemorySegments) {
-               this(networkBufferPool, numberOfRequiredMemorySegments, 
Integer.MAX_VALUE, null);
+               this(
+                       networkBufferPool,
+                       numberOfRequiredMemorySegments,
+                       Integer.MAX_VALUE,
+                       null,
+                       0,
+                       Integer.MAX_VALUE);
        }
 
        /**
@@ -120,7 +137,13 @@ class LocalBufferPool implements BufferPool {
         */
        LocalBufferPool(NetworkBufferPool networkBufferPool, int 
numberOfRequiredMemorySegments,
                        int maxNumberOfMemorySegments) {
-               this(networkBufferPool, numberOfRequiredMemorySegments, 
maxNumberOfMemorySegments, null);
+               this(
+                       networkBufferPool,
+                       numberOfRequiredMemorySegments,
+                       maxNumberOfMemorySegments,
+                       null,
+                       0,
+                       Integer.MAX_VALUE);
        }
 
        /**
@@ -133,14 +156,20 @@ class LocalBufferPool implements BufferPool {
         *              minimum number of network buffers
         * @param maxNumberOfMemorySegments
         *              maximum number of network buffers to allocate
-        *      @param bufferPoolOwner
+        * @param bufferPoolOwner
         *              the owner of this buffer pool to release memory when 
needed
+        * @param numberOfSubpartitions
+        *              number of subpartitions
+        * @param maxBuffersPerChannel
+        *              maximum number of buffers to use for each channel
         */
        LocalBufferPool(
-               NetworkBufferPool networkBufferPool,
-               int numberOfRequiredMemorySegments,
-               int maxNumberOfMemorySegments,
-               @Nullable BufferPoolOwner bufferPoolOwner) {
+                       NetworkBufferPool networkBufferPool,
+                       int numberOfRequiredMemorySegments,
+                       int maxNumberOfMemorySegments,
+                       @Nullable BufferPoolOwner bufferPoolOwner,
+                       int numberOfSubpartitions,
+                       int maxBuffersPerChannel) {
                checkArgument(maxNumberOfMemorySegments >= 
numberOfRequiredMemorySegments,
                        "Maximum number of memory segments (%s) should not be 
smaller than minimum (%s).",
                        maxNumberOfMemorySegments, 
numberOfRequiredMemorySegments);
@@ -157,6 +186,19 @@ class LocalBufferPool implements BufferPool {
                this.currentPoolSize = numberOfRequiredMemorySegments;
                this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
                this.bufferPoolOwner = bufferPoolOwner;
+
+               if (numberOfSubpartitions > 0) {
+                       checkArgument(maxBuffersPerChannel > 0,
+                               "Maximum number of buffers for each channel 
(%s) should be larger than 0.",
+                               maxBuffersPerChannel);
+               }
+
+               this.subpartitionBuffersCount = new int[numberOfSubpartitions];
+               subpartitionBufferRecyclers = new 
BufferRecycler[numberOfSubpartitions];
+               for (int i = 0; i < subpartitionBufferRecyclers.length; i++) {
+                       subpartitionBufferRecyclers[i] = new 
SubpartitionBufferRecycler(i, this);
+               }
+               this.maxBuffersPerChannel = maxBuffersPerChannel;
        }
 
        // 
------------------------------------------------------------------------
@@ -206,12 +248,22 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public BufferBuilder requestBufferBuilder() throws IOException {
-               return toBufferBuilder(requestMemorySegment());
+               return toBufferBuilder(requestMemorySegment(UNKNOWN_CHANNEL), 
UNKNOWN_CHANNEL);
+       }
+
+       @Override
+       public BufferBuilder requestBufferBuilder(int targetChannel) throws 
IOException {
+               return toBufferBuilder(requestMemorySegment(targetChannel), 
targetChannel);
        }
 
        @Override
        public BufferBuilder requestBufferBuilderBlocking() throws IOException, 
InterruptedException {
-               return toBufferBuilder(requestMemorySegmentBlocking());
+               return 
toBufferBuilder(requestMemorySegmentBlocking(UNKNOWN_CHANNEL), UNKNOWN_CHANNEL);
+       }
+
+       @Override
+       public BufferBuilder requestBufferBuilderBlocking(int targetChannel) 
throws IOException, InterruptedException {
+               return 
toBufferBuilder(requestMemorySegmentBlocking(targetChannel), targetChannel);
        }
 
        private Buffer toBuffer(MemorySegment memorySegment) {
@@ -221,16 +273,21 @@ class LocalBufferPool implements BufferPool {
                return new NetworkBuffer(memorySegment, this);
        }
 
-       private BufferBuilder toBufferBuilder(MemorySegment memorySegment) {
+       private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int 
targetChannel) {
                if (memorySegment == null) {
                        return null;
                }
-               return new BufferBuilder(memorySegment, this);
+
+               if (targetChannel == UNKNOWN_CHANNEL) {
+                       return new BufferBuilder(memorySegment, this);
+               } else {
+                       return new BufferBuilder(memorySegment, 
subpartitionBufferRecyclers[targetChannel]);
+               }
        }
 
-       private MemorySegment requestMemorySegmentBlocking() throws 
InterruptedException, IOException {
+       private MemorySegment requestMemorySegmentBlocking(int targetChannel) 
throws InterruptedException, IOException {
                MemorySegment segment;
-               while ((segment = requestMemorySegment()) == null) {
+               while ((segment = requestMemorySegment(targetChannel)) == null) 
{
                        try {
                                // wait until available
                                getAvailableFuture().get();
@@ -243,7 +300,7 @@ class LocalBufferPool implements BufferPool {
        }
 
        @Nullable
-       private MemorySegment requestMemorySegment() throws IOException {
+       private MemorySegment requestMemorySegment(int targetChannel) throws 
IOException {
                MemorySegment segment = null;
                synchronized (availableMemorySegments) {
                        returnExcessMemorySegments();
@@ -258,11 +315,23 @@ class LocalBufferPool implements BufferPool {
                        if (segment == null) {
                                availabilityHelper.resetUnavailable();
                        }
+
+                       if (segment != null && targetChannel != 
UNKNOWN_CHANNEL) {
+                               if (subpartitionBuffersCount[targetChannel]++ 
== maxBuffersPerChannel) {
+                                       unavailableSubpartitionsCount++;
+                                       availabilityHelper.resetUnavailable();
+                               }
+                       }
                }
                return segment;
        }
 
        @Nullable
+       private MemorySegment requestMemorySegment() throws IOException {
+               return requestMemorySegment(UNKNOWN_CHANNEL);
+       }
+
+       @Nullable
        private MemorySegment requestMemorySegmentFromGlobal() throws 
IOException {
                assert Thread.holdsLock(availableMemorySegments);
 
@@ -287,20 +356,32 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public void recycle(MemorySegment segment) {
+               recycle(segment, UNKNOWN_CHANNEL);
+       }
+
+       private void recycle(MemorySegment segment, int channel) {
                BufferListener listener;
                CompletableFuture<?> toNotify = null;
                NotificationResult notificationResult = 
NotificationResult.BUFFER_NOT_USED;
                while (!notificationResult.isBufferUsed()) {
                        synchronized (availableMemorySegments) {
+                               final int oldUnavailableSubpartitionsCount = 
unavailableSubpartitionsCount;
+                               if (channel != UNKNOWN_CHANNEL) {
+                                       if (--subpartitionBuffersCount[channel] 
== maxBuffersPerChannel) {
+                                               unavailableSubpartitionsCount--;
+                                       }
+                               }
+
                                if (isDestroyed || 
numberOfRequestedMemorySegments > currentPoolSize) {
                                        returnMemorySegment(segment);
                                        return;
                                } else {
                                        listener = registeredListeners.poll();
                                        if (listener == null) {
-                                               boolean wasUnavailable = 
availableMemorySegments.isEmpty();
+                                               boolean wasUnavailable = 
availableMemorySegments.isEmpty() || oldUnavailableSubpartitionsCount > 0;
                                                
availableMemorySegments.add(segment);
-                                               if (wasUnavailable) {
+                                               // only need to check 
unavailableSubpartitionsCount here because availableMemorySegments is not empty
+                                               if (wasUnavailable && 
unavailableSubpartitionsCount == 0) {
                                                        toNotify = 
availabilityHelper.getUnavailableToResetAvailable();
                                                }
                                                break;
@@ -413,7 +494,7 @@ class LocalBufferPool implements BufferPool {
 
        @Override
        public CompletableFuture<?> getAvailableFuture() {
-               if (numberOfRequestedMemorySegments >= currentPoolSize) {
+               if (numberOfRequestedMemorySegments >= currentPoolSize || 
unavailableSubpartitionsCount > 0) {
                        return availabilityHelper.getAvailableFuture();
                } else if (availabilityHelper.isApproximatelyAvailable() || 
networkBufferPool.isApproximatelyAvailable()) {
                        return AVAILABLE;
@@ -426,9 +507,11 @@ class LocalBufferPool implements BufferPool {
        public String toString() {
                synchronized (availableMemorySegments) {
                        return String.format(
-                               "[size: %d, required: %d, requested: %d, 
available: %d, max: %d, listeners: %d, destroyed: %s]",
+                               "[size: %d, required: %d, requested: %d, 
available: %d, max: %d, listeners: %d," +
+                                               "subpartitions: %d, 
maxBuffersPerChannel: %d, destroyed: %s]",
                                currentPoolSize, 
numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
-                               availableMemorySegments.size(), 
maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
+                               availableMemorySegments.size(), 
maxNumberOfMemorySegments, registeredListeners.size(),
+                                       subpartitionBuffersCount.length, 
maxBuffersPerChannel, isDestroyed);
                }
        }
 
@@ -463,4 +546,26 @@ class LocalBufferPool implements BufferPool {
                        returnMemorySegment(segment);
                }
        }
+
+       @VisibleForTesting
+       @Override
+       public BufferRecycler[] getSubpartitionBufferRecyclers() {
+               return subpartitionBufferRecyclers;
+       }
+
+       private static class SubpartitionBufferRecycler implements 
BufferRecycler {
+
+               private int channel;
+               private LocalBufferPool bufferPool;
+
+               SubpartitionBufferRecycler(int channel, LocalBufferPool 
bufferPool) {
+                       this.channel = channel;
+                       this.bufferPool = bufferPool;
+               }
+
+               @Override
+               public void recycle(MemorySegment memorySegment) {
+                       bufferPool.recycle(memorySegment, channel);
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 6e0f19b..513308c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -314,18 +314,35 @@ public class NetworkBufferPool implements 
BufferPoolFactory, MemorySegmentProvid
 
        @Override
        public BufferPool createBufferPool(int numRequiredBuffers, int 
maxUsedBuffers) throws IOException {
-               return internalCreateBufferPool(numRequiredBuffers, 
maxUsedBuffers, null);
+               return internalCreateBufferPool(
+                       numRequiredBuffers,
+                       maxUsedBuffers,
+                       null,
+                       0,
+                       Integer.MAX_VALUE);
        }
 
        @Override
-       public BufferPool createBufferPool(int numRequiredBuffers, int 
maxUsedBuffers, BufferPoolOwner bufferPoolOwner) throws IOException {
-               return internalCreateBufferPool(numRequiredBuffers, 
maxUsedBuffers, bufferPoolOwner);
+       public BufferPool createBufferPool(
+                       int numRequiredBuffers,
+                       int maxUsedBuffers,
+                       BufferPoolOwner bufferPoolOwner,
+                       int numSubpartitions,
+                       int maxBuffersPerChannel) throws IOException {
+               return internalCreateBufferPool(
+                       numRequiredBuffers,
+                       maxUsedBuffers,
+                       bufferPoolOwner,
+                       numSubpartitions,
+                       maxBuffersPerChannel);
        }
 
        private BufferPool internalCreateBufferPool(
                        int numRequiredBuffers,
                        int maxUsedBuffers,
-                       @Nullable BufferPoolOwner bufferPoolOwner) throws 
IOException {
+                       @Nullable BufferPoolOwner bufferPoolOwner,
+                       int numSubpartitions,
+                       int maxBuffersPerChannel) throws IOException {
 
                // It is necessary to use a separate lock from the one used for 
buffer
                // requests to ensure deadlock freedom for failure cases.
@@ -349,7 +366,13 @@ public class NetworkBufferPool implements 
BufferPoolFactory, MemorySegmentProvid
                        // We are good to go, create a new buffer pool and 
redistribute
                        // non-fixed size buffers.
                        LocalBufferPool localBufferPool =
-                               new LocalBufferPool(this, numRequiredBuffers, 
maxUsedBuffers, bufferPoolOwner);
+                               new LocalBufferPool(
+                                       this,
+                                       numRequiredBuffers,
+                                       maxUsedBuffers,
+                                       bufferPoolOwner,
+                                       numSubpartitions,
+                                       maxBuffersPerChannel);
 
                        allBufferPools.add(localBufferPool);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 95f1168..fa1da53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -104,7 +104,7 @@ public class PipelinedSubpartition extends 
ResultSubpartition {
        @Override
        public void initializeState(ChannelStateReader stateReader) throws 
IOException, InterruptedException {
                for (ReadResult readResult = ReadResult.HAS_MORE_DATA; 
readResult == ReadResult.HAS_MORE_DATA;) {
-                       BufferBuilder bufferBuilder = 
parent.getBufferPool().requestBufferBuilderBlocking();
+                       BufferBuilder bufferBuilder = 
parent.getBufferPool().requestBufferBuilderBlocking(subpartitionInfo.getSubPartitionIdx());
                        BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
                        readResult = 
stateReader.readOutputData(subpartitionInfo, bufferBuilder);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index a89b564..a8926dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -206,15 +206,15 @@ public class ResultPartition implements 
ResultPartitionWriter, BufferPoolOwner {
        // 
------------------------------------------------------------------------
 
        @Override
-       public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
+       public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
                checkInProduceState();
 
-               return bufferPool.requestBufferBuilderBlocking();
+               return bufferPool.requestBufferBuilderBlocking(targetChannel);
        }
 
        @Override
-       public BufferBuilder tryGetBufferBuilder() throws IOException {
-               BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder();
+       public BufferBuilder tryGetBufferBuilder(int targetChannel) throws 
IOException {
+               BufferBuilder bufferBuilder = 
bufferPool.requestBufferBuilder(targetChannel);
                return bufferBuilder;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index f52ffed..07e39e7 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -64,6 +64,8 @@ public class ResultPartitionFactory {
 
        private final String compressionCodec;
 
+       private final int maxBuffersPerChannel;
+
        public ResultPartitionFactory(
                ResultPartitionManager partitionManager,
                FileChannelManager channelManager,
@@ -74,7 +76,8 @@ public class ResultPartitionFactory {
                int networkBufferSize,
                boolean forcePartitionReleaseOnConsumption,
                boolean blockingShuffleCompressionEnabled,
-               String compressionCodec) {
+               String compressionCodec,
+               int maxBuffersPerChannel) {
 
                this.partitionManager = partitionManager;
                this.channelManager = channelManager;
@@ -86,6 +89,7 @@ public class ResultPartitionFactory {
                this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                this.compressionCodec = compressionCodec;
+               this.maxBuffersPerChannel = maxBuffersPerChannel;
        }
 
        public ResultPartition create(
@@ -220,7 +224,9 @@ public class ResultPartitionFactory {
                        return bufferPoolFactory.createBufferPool(
                                numberOfSubpartitions + 1,
                                maxNumberOfMemorySegments,
-                               type.hasBackPressure() ? null : 
bufferPoolOwner);
+                               type.hasBackPressure() ? null : bufferPoolOwner,
+                               numberOfSubpartitions,
+                               maxBuffersPerChannel);
                };
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 6dd507b..f2370aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -66,13 +66,13 @@ public class 
ConsumableNotifyingResultPartitionWriterDecorator implements Result
        }
 
        @Override
-       public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-               return partitionWriter.getBufferBuilder();
+       public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+               return partitionWriter.getBufferBuilder(targetChannel);
        }
 
        @Override
-       public BufferBuilder tryGetBufferBuilder() throws IOException {
-               return partitionWriter.tryGetBufferBuilder();
+       public BufferBuilder tryGetBufferBuilder(int targetChannel) throws 
IOException {
+               return partitionWriter.tryGetBufferBuilder(targetChannel);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index c1c3056..feca864 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -74,6 +74,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
        private final String compressionCodec;
 
+       private final int maxBuffersPerChannel;
+
        public NettyShuffleEnvironmentConfiguration(
                        int numNetworkBuffers,
                        int networkBufferSize,
@@ -88,7 +90,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        BoundedBlockingSubpartitionType 
blockingSubpartitionType,
                        boolean forcePartitionReleaseOnConsumption,
                        boolean blockingShuffleCompressionEnabled,
-                       String compressionCodec) {
+                       String compressionCodec,
+                       int maxBuffersPerChannel) {
 
                this.numNetworkBuffers = numNetworkBuffers;
                this.networkBufferSize = networkBufferSize;
@@ -104,6 +107,7 @@ public class NettyShuffleEnvironmentConfiguration {
                this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                this.compressionCodec = 
Preconditions.checkNotNull(compressionCodec);
+               this.maxBuffersPerChannel = maxBuffersPerChannel;
        }
 
        // 
------------------------------------------------------------------------
@@ -164,6 +168,10 @@ public class NettyShuffleEnvironmentConfiguration {
                return compressionCodec;
        }
 
+       public int getMaxBuffersPerChannel() {
+               return maxBuffersPerChannel;
+       }
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -199,6 +207,8 @@ public class NettyShuffleEnvironmentConfiguration {
                int buffersPerChannel = 
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
                int extraBuffersPerGate = 
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
 
+               int maxBuffersPerChannel = 
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_MAX_BUFFERS_PER_CHANNEL);
+
                boolean isNetworkDetailedMetrics = 
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
 
                String[] tempDirs = 
ConfigurationUtils.parseTempDirectories(configuration);
@@ -229,7 +239,8 @@ public class NettyShuffleEnvironmentConfiguration {
                        blockingSubpartitionType,
                        forcePartitionReleaseOnConsumption,
                        blockingShuffleCompressionEnabled,
-                       compressionCodec);
+                       compressionCodec,
+                       maxBuffersPerChannel);
        }
 
        /**
@@ -351,6 +362,7 @@ public class NettyShuffleEnvironmentConfiguration {
                result = 31 * result + (forcePartitionReleaseOnConsumption ? 1 
: 0);
                result = 31 * result + (blockingShuffleCompressionEnabled ? 1 : 
0);
                result = 31 * result + Objects.hashCode(compressionCodec);
+               result = 31 * result + maxBuffersPerChannel;
                return result;
        }
 
@@ -376,6 +388,7 @@ public class NettyShuffleEnvironmentConfiguration {
                                        Arrays.equals(this.tempDirs, 
that.tempDirs) &&
                                        this.forcePartitionReleaseOnConsumption 
== that.forcePartitionReleaseOnConsumption &&
                                        this.blockingShuffleCompressionEnabled 
== that.blockingShuffleCompressionEnabled &&
+                                       this.maxBuffersPerChannel == 
that.maxBuffersPerChannel &&
                                        Objects.equals(this.compressionCodec, 
that.compressionCodec);
                }
        }
@@ -395,6 +408,7 @@ public class NettyShuffleEnvironmentConfiguration {
                                ", forcePartitionReleaseOnConsumption=" + 
forcePartitionReleaseOnConsumption +
                                ", blockingShuffleCompressionEnabled=" + 
blockingShuffleCompressionEnabled +
                                ", compressionCodec=" + compressionCodec +
+                               ", maxBuffersPerChannel=" + 
maxBuffersPerChannel +
                                '}';
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..3a283a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -49,6 +49,8 @@ public class NettyShuffleEnvironmentBuilder {
 
        private int floatingNetworkBuffersPerGate = 8;
 
+       private int maxBuffersPerChannel = Integer.MAX_VALUE;
+
        private boolean blockingShuffleCompressionEnabled = false;
 
        private String compressionCodec = "LZ4";
@@ -89,6 +91,11 @@ public class NettyShuffleEnvironmentBuilder {
                return this;
        }
 
+       public NettyShuffleEnvironmentBuilder setMaxBuffersPerChannel(int 
maxBuffersPerChannel) {
+               this.maxBuffersPerChannel = maxBuffersPerChannel;
+               return this;
+       }
+
        public NettyShuffleEnvironmentBuilder 
setBlockingShuffleCompressionEnabled(boolean blockingShuffleCompressionEnabled) 
{
                this.blockingShuffleCompressionEnabled = 
blockingShuffleCompressionEnabled;
                return this;
@@ -125,7 +132,8 @@ public class NettyShuffleEnvironmentBuilder {
                                BoundedBlockingSubpartitionType.AUTO,
                                false,
                                blockingShuffleCompressionEnabled,
-                               compressionCodec),
+                               compressionCodec,
+                               maxBuffersPerChannel),
                        taskManagerLocation,
                        new TaskEventDispatcher(),
                        metricGroup);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 7b12a98..a6517c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -45,13 +45,13 @@ public abstract class 
AbstractCollectingResultPartitionWriter extends MockResult
        }
 
        @Override
-       public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-               return bufferProvider.requestBufferBuilderBlocking();
+       public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+               return 
bufferProvider.requestBufferBuilderBlocking(targetChannel);
        }
 
        @Override
-       public BufferBuilder tryGetBufferBuilder() throws IOException {
-               return bufferProvider.requestBufferBuilder();
+       public BufferBuilder tryGetBufferBuilder(int targetChannel) throws 
IOException {
+               return bufferProvider.requestBufferBuilder(targetChannel);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 4f6f7f8..97d8053 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -125,7 +125,7 @@ public class RecordWriterDelegateTest extends TestLogger {
        }
 
        private RecordWriter createRecordWriter(NetworkBufferPool globalPool) 
throws Exception {
-               final BufferPool localPool = globalPool.createBufferPool(1, 1);
+               final BufferPool localPool = globalPool.createBufferPool(1, 1, 
null, 1, Integer.MAX_VALUE);
                final ResultPartitionWriter partition = new 
ResultPartitionBuilder()
                        .setBufferPoolFactory(p -> localPool)
                        .build();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ff06045..9beb01a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -413,7 +413,7 @@ public class RecordWriterTest {
        public void testIsAvailableOrNot() throws Exception {
                // setup
                final NetworkBufferPool globalPool = new NetworkBufferPool(10, 
128, 2);
-               final BufferPool localPool = globalPool.createBufferPool(1, 1);
+               final BufferPool localPool = globalPool.createBufferPool(1, 1, 
null, 1, Integer.MAX_VALUE);
                final ResultPartitionWriter resultPartition = new 
ResultPartitionBuilder()
                        .setBufferPoolFactory(p -> localPool)
                        .build();
@@ -430,7 +430,7 @@ public class RecordWriterTest {
                        assertTrue(recordWriter.getAvailableFuture().isDone());
 
                        // request one buffer from the local pool to make it 
unavailable afterwards
-                       final BufferBuilder bufferBuilder = 
resultPartition.getBufferBuilder();
+                       final BufferBuilder bufferBuilder = 
resultPartition.getBufferBuilder(0);
                        assertNotNull(bufferBuilder);
                        assertFalse(recordWriter.getAvailableFuture().isDone());
 
@@ -484,7 +484,11 @@ public class RecordWriterTest {
                                while ((bufferAndBacklog = 
view.getNextBuffer()) != null) {
                                        Buffer buffer = 
bufferAndBacklog.buffer();
                                        int[] expected = numConsumedBuffers < 
totalStates ? states : expectedRecordsInBuffer[numConsumedBuffers - 
totalStates];
-                                       
BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool(), 
expected);
+                                       
BufferBuilderAndConsumerTest.assertContent(
+                                               buffer,
+                                               partition.getBufferPool()
+                                                       
.getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()],
+                                               expected);
 
                                        buffer.recycleBuffer();
                                        numConsumedBuffers++;
@@ -503,7 +507,7 @@ public class RecordWriterTest {
        public void testIdleTime() throws IOException, InterruptedException {
                // setup
                final NetworkBufferPool globalPool = new NetworkBufferPool(10, 
128, 2);
-               final BufferPool localPool = globalPool.createBufferPool(1, 1);
+               final BufferPool localPool = globalPool.createBufferPool(1, 1, 
null, 1, Integer.MAX_VALUE);
                final ResultPartitionWriter resultPartition = new 
ResultPartitionBuilder()
                        .setBufferPoolFactory(p -> localPool)
                        .build();
@@ -514,7 +518,9 @@ public class RecordWriterTest {
                        resultPartition,
                        new NoOpResultPartitionConsumableNotifier());
                final RecordWriter recordWriter = 
createRecordWriter(partitionWrapper);
-               BufferBuilder builder = recordWriter.getBufferBuilder();
+               BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
+               final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffer(builder);
+               builder.finish();
 
                // idle time is zero when there is buffer available.
                assertEquals(0, 
recordWriter.getIdleTimeMsPerSecond().getCount());
@@ -528,7 +534,7 @@ public class RecordWriterTest {
                                        // notify that the request thread start 
to run.
                                        syncLock.countDown();
                                        // wait for buffer.
-                                       
asyncRequestResult.set(recordWriter.getBufferBuilder());
+                                       
asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
                                } catch (Exception e) {
                                }
                        }
@@ -539,15 +545,13 @@ public class RecordWriterTest {
                syncLock.await();
 
                Thread.sleep(10);
-               //recycle the buffer
-               final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffer(builder);
 
+               //recycle the buffer
                buffer.recycleBuffer();
                requestThread.join();
 
                assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(), 
Matchers.greaterThan(0L));
                assertNotNull(asyncRequestResult.get());
-
        }
 
        private void verifyBroadcastBufferOrEventIndependence(boolean 
broadcastEvent) throws Exception {
@@ -634,13 +638,13 @@ public class RecordWriterTest {
                }
 
                @Override
-               public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-                       return bufferProvider.requestBufferBuilderBlocking();
+               public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+                       return 
bufferProvider.requestBufferBuilderBlocking(targetChannel);
                }
 
                @Override
-               public BufferBuilder tryGetBufferBuilder() throws IOException {
-                       return bufferProvider.requestBufferBuilder();
+               public BufferBuilder tryGetBufferBuilder(int targetChannel) 
throws IOException {
+                       return 
bufferProvider.requestBufferBuilder(targetChannel);
                }
 
                @Override
@@ -672,13 +676,13 @@ public class RecordWriterTest {
                }
 
                @Override
-               public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-                       return bufferProvider.requestBufferBuilderBlocking();
+               public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+                       return 
bufferProvider.requestBufferBuilderBlocking(targetChannel);
                }
 
                @Override
-               public BufferBuilder tryGetBufferBuilder() throws IOException {
-                       return bufferProvider.requestBufferBuilder();
+               public BufferBuilder tryGetBufferBuilder(int targetChannel) 
throws IOException {
+                       return 
bufferProvider.requestBufferBuilder(targetChannel);
                }
        }
 
@@ -691,13 +695,13 @@ public class RecordWriterTest {
                }
 
                @Override
-               public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
-                       return bufferProvider.requestBufferBuilderBlocking();
+               public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
+                       return 
bufferProvider.requestBufferBuilderBlocking(targetChannel);
                }
 
                @Override
-               public BufferBuilder tryGetBufferBuilder() throws IOException {
-                       return bufferProvider.requestBufferBuilder();
+               public BufferBuilder tryGetBufferBuilder(int targetChannel) 
throws IOException {
+                       return 
bufferProvider.requestBufferBuilder(targetChannel);
                }
 
                @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index a0d1612..f263330 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -407,6 +407,43 @@ public class LocalBufferPoolTest extends TestLogger {
        }
 
        @Test
+       public void testMaxBuffersPerChannelAndAvailability() throws 
IOException, InterruptedException {
+               localBufferPool.lazyDestroy();
+               localBufferPool = new LocalBufferPool(networkBufferPool, 1, 
Integer.MAX_VALUE, null, 3, 1);
+               localBufferPool.setNumBuffers(10);
+
+               assertTrue(localBufferPool.getAvailableFuture().isDone());
+
+               // request one segment from subpartitin-0 and subpartition-1 
respectively
+               final BufferBuilder bufferBuilder01 = 
localBufferPool.requestBufferBuilderBlocking(0);
+               final BufferBuilder bufferBuilder11 = 
localBufferPool.requestBufferBuilderBlocking(1);
+               assertTrue(localBufferPool.getAvailableFuture().isDone());
+
+               // request one segment from subpartition-0
+               final BufferBuilder bufferBuilder02 = 
localBufferPool.requestBufferBuilderBlocking(0);
+               assertFalse(localBufferPool.getAvailableFuture().isDone());
+
+               final BufferBuilder bufferBuilder03 = 
localBufferPool.requestBufferBuilderBlocking(0);
+               final BufferBuilder bufferBuilder21 = 
localBufferPool.requestBufferBuilderBlocking(2);
+               final BufferBuilder bufferBuilder22 = 
localBufferPool.requestBufferBuilderBlocking(2);
+               assertFalse(localBufferPool.getAvailableFuture().isDone());
+
+               // recycle segments
+               
bufferBuilder11.getRecycler().recycle(bufferBuilder11.getMemorySegment());
+               assertFalse(localBufferPool.getAvailableFuture().isDone());
+               
bufferBuilder21.getRecycler().recycle(bufferBuilder21.getMemorySegment());
+               assertFalse(localBufferPool.getAvailableFuture().isDone());
+               
bufferBuilder02.getRecycler().recycle(bufferBuilder02.getMemorySegment());
+               assertFalse(localBufferPool.getAvailableFuture().isDone());
+               
bufferBuilder01.getRecycler().recycle(bufferBuilder01.getMemorySegment());
+               assertTrue(localBufferPool.getAvailableFuture().isDone());
+               
bufferBuilder03.getRecycler().recycle(bufferBuilder03.getMemorySegment());
+               assertTrue(localBufferPool.getAvailableFuture().isDone());
+               
bufferBuilder22.getRecycler().recycle(bufferBuilder22.getMemorySegment());
+               assertTrue(localBufferPool.getAvailableFuture().isDone());
+       }
+
+       @Test
        public void testIsAvailableOrNot() throws Exception {
 
                // the local buffer pool should be in available state initially
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 7bdb884..8cbe0d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -327,7 +327,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                        // make releaseMemory calls always fail:
                        numBuffersToRecycle -> {
                                throw new TestIOException();
-               });
+               }, 0, Integer.MAX_VALUE);
 
                try {
                        // take all but one buffer
@@ -366,7 +366,7 @@ public class NetworkBufferPoolTest extends TestLogger {
                BufferPool bufferPool = networkBufferPool.createBufferPool(1, 
numBuffers,
                        numBuffersToRecycle -> {
                                throw new TestIOException();
-               });
+               }, 0, Integer.MAX_VALUE);
 
                try {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index aa23744..cffb35c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
@@ -50,6 +52,16 @@ public class NoOpBufferPool implements BufferPool {
        }
 
        @Override
+       public BufferBuilder requestBufferBuilder(int targetChannel) throws 
IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public BufferBuilder requestBufferBuilderBlocking(int targetChannel) 
throws IOException, InterruptedException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public boolean addBufferListener(BufferListener listener) {
                throw new UnsupportedOperationException();
        }
@@ -89,6 +101,12 @@ public class NoOpBufferPool implements BufferPool {
                throw new UnsupportedOperationException();
        }
 
+       @Nullable
+       @Override
+       public BufferRecycler[] getSubpartitionBufferRecyclers() {
+               return new BufferRecycler[0];
+       }
+
        @Override
        public void recycle(MemorySegment memorySegment) {
                throw new UnsupportedOperationException();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index c2ee06f..5432fd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -70,7 +70,7 @@ public class MockResultPartitionWriter implements 
ResultPartitionWriter {
        }
 
        @Override
-       public BufferBuilder getBufferBuilder() throws IOException, 
InterruptedException {
+       public BufferBuilder getBufferBuilder(int targetChannel) throws 
IOException, InterruptedException {
                throw new UnsupportedOperationException();
        }
 
@@ -79,8 +79,7 @@ public class MockResultPartitionWriter implements 
ResultPartitionWriter {
                throw new UnsupportedOperationException();
        }
 
-       @Override
-       public BufferBuilder tryGetBufferBuilder() throws IOException {
+       public BufferBuilder tryGetBufferBuilder(int targetChannel) throws 
IOException {
                throw new UnsupportedOperationException();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 4a400cb..010aada 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -121,7 +121,7 @@ public class PartialConsumePipelinedResultTest extends 
TestLogger {
                        final ResultPartitionWriter writer = 
getEnvironment().getWriter(0);
 
                        for (int i = 0; i < 8; i++) {
-                               final BufferBuilder bufferBuilder = 
writer.getBufferBuilder();
+                               final BufferBuilder bufferBuilder = 
writer.getBufferBuilder(0);
                                
writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
                                Thread.sleep(50);
                                bufferBuilder.finish();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index a0c9faa..7c8966d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -56,6 +56,8 @@ public class ResultPartitionBuilder {
 
        private int floatingNetworkBuffersPerGate = 1;
 
+       private int maxBuffersPerChannel = Integer.MAX_VALUE;
+
        private int networkBufferSize = 1;
 
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@@ -167,7 +169,8 @@ public class ResultPartitionBuilder {
                        networkBufferSize,
                        releasedOnConsumption,
                        blockingShuffleCompressionEnabled,
-                       compressionCodec);
+                       compressionCodec,
+                       maxBuffersPerChannel);
 
                FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
factory = bufferPoolFactory.orElseGet(() ->
                        
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions, 
partitionType));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 7f27dbd..f5570c7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -100,7 +100,8 @@ public class ResultPartitionFactoryTest extends TestLogger {
                        SEGMENT_SIZE,
                        releasePartitionOnConsumption,
                        false,
-                       "LZ4");
+                       "LZ4",
+                       Integer.MAX_VALUE);
 
                final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(
                        PartitionDescriptorBuilder
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f3e512f..0dabb21 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -360,6 +360,32 @@ public class ResultPartitionTest {
                }
        }
 
+       /**
+        * Tests {@link ResultPartition#getAvailableFuture()}.
+        */
+       @Test
+       public void testIsAvailableOrNot() throws IOException, 
InterruptedException {
+               final int numAllBuffers = 10;
+               final NettyShuffleEnvironment network = new 
NettyShuffleEnvironmentBuilder()
+                               .setNumNetworkBuffers(numAllBuffers).build();
+               final ResultPartition resultPartition = 
createPartition(network, ResultPartitionType.PIPELINED, 1);
+
+               try {
+                       resultPartition.setup();
+
+                       resultPartition.getBufferPool().setNumBuffers(2);
+
+                       
assertTrue(resultPartition.getAvailableFuture().isDone());
+
+                       resultPartition.getBufferBuilder(0);
+                       resultPartition.getBufferBuilder(0);
+                       
assertFalse(resultPartition.getAvailableFuture().isDone());
+               } finally {
+                       resultPartition.release();
+                       network.close();
+               }
+       }
+
        @Test
        public void testPipelinedPartitionBufferPool() throws Exception {
                testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
@@ -470,7 +496,11 @@ public class ResultPartitionTest {
                                                
ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer();
                                                if (bufferAndBacklog != null) {
                                                        Buffer buffer = 
bufferAndBacklog.buffer();
-                                                       
BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool(), 
states);
+                                                       
BufferBuilderAndConsumerTest.assertContent(
+                                                               buffer,
+                                                               
partition.getBufferPool()
+                                                                       
.getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()],
+                                                               states);
                                                        buffer.recycleBuffer();
                                                        numConsumedBuffers++;
                                                } else {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index e0ed290..4c2196d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -75,6 +75,11 @@ public class TestPooledBufferProvider implements 
BufferProvider {
                return null;
        }
 
+       @Override
+       public BufferBuilder requestBufferBuilder(int targetChannel) throws 
IOException {
+               return requestBufferBuilder();
+       }
+
        private Buffer requestBufferBlocking() throws IOException, 
InterruptedException {
                Buffer buffer = buffers.poll();
                if (buffer != null) {
@@ -96,6 +101,11 @@ public class TestPooledBufferProvider implements 
BufferProvider {
        }
 
        @Override
+       public BufferBuilder requestBufferBuilderBlocking(int targetChannel) 
throws IOException, InterruptedException {
+               return requestBufferBuilderBlocking();
+       }
+
+       @Override
        public boolean addBufferListener(BufferListener listener) {
                return bufferRecycler.registerListener(listener);
        }

Reply via email to