This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 02978da [FLINK-16645][network] Limit the maximum backlogs in
subpartitions
02978da is described below
commit 02978da7c49b33def54f8cb8cb4ceb57b022cfd5
Author: Jiayi Liao <[email protected]>
AuthorDate: Mon Mar 30 20:55:30 2020 +0800
[FLINK-16645][network] Limit the maximum backlogs in subpartitions
In the case of data skew, most of the buffers in #LocalBufferPool can be
requested away by one
certain partition, which increases the in-flight data and slow down the
buffer alignment.
To solve this problem, we add a config to limit the maximum backlog per
subpartition. In this
way, we can check subpartition's backlog and make the stream task
unavailable if the
limit is exceeded.
---
.../generated/all_taskmanager_network_section.html | 8 +-
.../mesos_task_manager_configuration.html | 6 -
.../netty_shuffle_environment_configuration.html | 8 +-
.../NettyShuffleEnvironmentOptions.java | 21 ++-
.../io/network/NettyShuffleServiceFactory.java | 3 +-
.../network/api/writer/BroadcastRecordWriter.java | 2 +-
.../api/writer/ChannelSelectorRecordWriter.java | 2 +-
.../io/network/api/writer/RecordWriter.java | 15 +--
.../network/api/writer/ResultPartitionWriter.java | 4 +-
.../runtime/io/network/buffer/BufferBuilder.java | 11 ++
.../runtime/io/network/buffer/BufferPool.java | 2 +
.../io/network/buffer/BufferPoolFactory.java | 13 +-
.../runtime/io/network/buffer/BufferProvider.java | 32 ++++-
.../runtime/io/network/buffer/LocalBufferPool.java | 143 ++++++++++++++++++---
.../io/network/buffer/NetworkBufferPool.java | 33 ++++-
.../network/partition/PipelinedSubpartition.java | 2 +-
.../io/network/partition/ResultPartition.java | 8 +-
.../network/partition/ResultPartitionFactory.java | 10 +-
...bleNotifyingResultPartitionWriterDecorator.java | 8 +-
.../NettyShuffleEnvironmentConfiguration.java | 18 ++-
.../io/network/NettyShuffleEnvironmentBuilder.java | 10 +-
.../AbstractCollectingResultPartitionWriter.java | 8 +-
.../api/writer/RecordWriterDelegateTest.java | 2 +-
.../io/network/api/writer/RecordWriterTest.java | 46 ++++---
.../io/network/buffer/LocalBufferPoolTest.java | 37 ++++++
.../io/network/buffer/NetworkBufferPoolTest.java | 4 +-
.../runtime/io/network/buffer/NoOpBufferPool.java | 18 +++
.../partition/MockResultPartitionWriter.java | 5 +-
.../PartialConsumePipelinedResultTest.java | 2 +-
.../network/partition/ResultPartitionBuilder.java | 5 +-
.../partition/ResultPartitionFactoryTest.java | 3 +-
.../io/network/partition/ResultPartitionTest.java | 32 ++++-
.../io/network/util/TestPooledBufferProvider.java | 10 ++
33 files changed, 425 insertions(+), 106 deletions(-)
diff --git a/docs/_includes/generated/all_taskmanager_network_section.html
b/docs/_includes/generated/all_taskmanager_network_section.html
index 04a8566..797ac6f 100644
--- a/docs/_includes/generated/all_taskmanager_network_section.html
+++ b/docs/_includes/generated/all_taskmanager_network_section.html
@@ -30,7 +30,7 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
- <td>Maximum number of network buffers to use for each
outgoing/incoming channel (subpartition/input channel).In credit-based flow
control mode, this indicates how many credits are exclusive in each input
channel. It should be configured at least 2 for good performance. 1 buffer is
for receiving in-flight data in the subpartition and 1 buffer is for parallel
serialization.</td>
+ <td>Number of exclusive network buffers to use for each
outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow
control model. It should be configured at least 2 for good performance. 1
buffer is for receiving in-flight data in the subpartition and 1 buffer is for
parallel serialization.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
@@ -39,6 +39,12 @@
<td>Number of extra network buffers to use for each
outgoing/incoming gate (result partition/input gate). In credit-based flow
control mode, this indicates how many floating credits are shared among all the
input channels. The floating buffers are distributed based on backlog
(real-time output buffers in the subpartition) feedback, and can help relieve
back-pressure caused by unbalanced data distribution among the subpartitions.
This value should be increased in case of highe [...]
</tr>
<tr>
+
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>Number of max buffers that can be used for each channel. If a
channel exceeds the number of max buffers, it will make the task become
unavailable, cause the back pressure and block the data processing. This might
speed up checkpoint alignment by preventing excessive growth of the buffered
in-flight data in case of data skew and high number of configured floating
buffers. This limit is not strictly guaranteed, and can be ignored by things
like flatMap operators, records sp [...]
+ </tr>
+ <tr>
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
<td style="word-wrap: break-word;">120</td>
<td>Integer</td>
diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html
b/docs/_includes/generated/mesos_task_manager_configuration.html
index aa24d11..5d78138 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -86,11 +86,5 @@
<td>String</td>
<td>A comma separated list of URIs of custom artifacts to be
downloaded into the sandbox of Mesos workers.</td>
</tr>
- <tr>
- <td><h5>taskmanager.numberOfTaskSlots</h5></td>
- <td style="word-wrap: break-word;">1</td>
- <td>Integer</td>
- <td>The number of parallel operator or user function instances
that a single TaskManager can run. If this value is larger than 1, a single
TaskManager takes multiple instances of a function or operator. That way, the
TaskManager can utilize multiple CPU cores, but at the same time, the available
memory is divided between the different operator or function instances. This
value is typically proportional to the number of physical CPU cores that the
TaskManager's machine has (e. [...]
- </tr>
</tbody>
</table>
diff --git
a/docs/_includes/generated/netty_shuffle_environment_configuration.html
b/docs/_includes/generated/netty_shuffle_environment_configuration.html
index 18a4028..5e00e18 100644
--- a/docs/_includes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/_includes/generated/netty_shuffle_environment_configuration.html
@@ -48,7 +48,7 @@
<td><h5>taskmanager.network.memory.buffers-per-channel</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
- <td>Maximum number of network buffers to use for each
outgoing/incoming channel (subpartition/input channel).In credit-based flow
control mode, this indicates how many credits are exclusive in each input
channel. It should be configured at least 2 for good performance. 1 buffer is
for receiving in-flight data in the subpartition and 1 buffer is for parallel
serialization.</td>
+ <td>Number of exclusive network buffers to use for each
outgoing/incoming channel (subpartition/inputchannel) in the credit-based flow
control model. It should be configured at least 2 for good performance. 1
buffer is for receiving in-flight data in the subpartition and 1 buffer is for
parallel serialization.</td>
</tr>
<tr>
<td><h5>taskmanager.network.memory.floating-buffers-per-gate</h5></td>
@@ -57,6 +57,12 @@
<td>Number of extra network buffers to use for each
outgoing/incoming gate (result partition/input gate). In credit-based flow
control mode, this indicates how many floating credits are shared among all the
input channels. The floating buffers are distributed based on backlog
(real-time output buffers in the subpartition) feedback, and can help relieve
back-pressure caused by unbalanced data distribution among the subpartitions.
This value should be increased in case of highe [...]
</tr>
<tr>
+
<td><h5>taskmanager.network.memory.max-buffers-per-channel</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>Number of max buffers that can be used for each channel. If a
channel exceeds the number of max buffers, it will make the task become
unavailable, cause the back pressure and block the data processing. This might
speed up checkpoint alignment by preventing excessive growth of the buffered
in-flight data in case of data skew and high number of configured floating
buffers. This limit is not strictly guaranteed, and can be ignored by things
like flatMap operators, records sp [...]
+ </tr>
+ <tr>
<td><h5>taskmanager.network.netty.client.connectTimeoutSec</h5></td>
<td style="word-wrap: break-word;">120</td>
<td>Integer</td>
diff --git
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index c84aa34..cd6ce69 100644
---
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -156,10 +156,9 @@ public class NettyShuffleEnvironmentOptions {
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2)
- .withDescription("Maximum number of network buffers to
use for each outgoing/incoming channel (subpartition/input channel)." +
- "In credit-based flow control mode, this
indicates how many credits are exclusive in each input channel. It should be" +
- " configured at least 2 for good performance. 1
buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
- " for parallel serialization.");
+ .withDescription("Number of exclusive network buffers
to use for each outgoing/incoming channel (subpartition/inputchannel)" +
+ " in the credit-based flow control model. It
should be configured at least 2 for good performance." +
+ " 1 buffer is for receiving in-flight data in
the subpartition and 1 buffer is for parallel serialization.");
/**
* Number of extra network buffers to use for each outgoing/incoming
gate (result partition/input gate).
@@ -175,6 +174,20 @@ public class NettyShuffleEnvironmentOptions {
" increased in case of higher round trip times
between nodes and/or larger number of machines in the cluster.");
/**
+ * Number of max buffers can be used for each output subparition.
+ */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ public static final ConfigOption<Integer>
NETWORK_MAX_BUFFERS_PER_CHANNEL =
+ key("taskmanager.network.memory.max-buffers-per-channel")
+ .defaultValue(10)
+ .withDescription("Number of max buffers that can be
used for each channel. If a channel exceeds the number of max" +
+ " buffers, it will make the task become
unavailable, cause the back pressure and block the data processing. This" +
+ " might speed up checkpoint alignment by
preventing excessive growth of the buffered in-flight data in" +
+ " case of data skew and high number of
configured floating buffers. This limit is not strictly guaranteed," +
+ " and can be ignored by things like flatMap
operators, records spanning multiple buffers or single timer" +
+ " producing large amount of data.");
+
+ /**
* The timeout for requesting exclusive buffers for each channel.
*/
@Documentation.ExcludeFromDocumentation("This option is purely
implementation related, and may be removed as the implementation changes.")
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index afbaba2..9c4f95b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -107,7 +107,8 @@ public class NettyShuffleServiceFactory implements
ShuffleServiceFactory<NettySh
config.networkBufferSize(),
config.isForcePartitionReleaseOnConsumption(),
config.isBlockingShuffleCompressionEnabled(),
- config.getCompressionCodec());
+ config.getCompressionCodec(),
+ config.getMaxBuffersPerChannel());
SingleInputGateFactory singleInputGateFactory = new
SingleInputGateFactory(
taskExecutorResourceId,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 93bc2c4..9964b20 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -128,7 +128,7 @@ public final class BroadcastRecordWriter<T extends
IOReadableWritable> extends R
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
checkState(bufferBuilder == null || bufferBuilder.isFinished());
- BufferBuilder builder = getBufferBuilder();
+ BufferBuilder builder =
super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
addBufferConsumer(builder.createBufferConsumer(),
targetChannel);
} else {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 8393409..298e357 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -100,7 +100,7 @@ public final class ChannelSelectorRecordWriter<T extends
IOReadableWritable> ext
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
checkState(bufferBuilders[targetChannel] == null ||
bufferBuilders[targetChannel].isFinished());
- BufferBuilder bufferBuilder = getBufferBuilder();
+ BufferBuilder bufferBuilder =
super.requestNewBufferBuilder(targetChannel);
addBufferConsumer(bufferBuilder.createBufferConsumer(),
targetChannel);
bufferBuilders[targetChannel] = bufferBuilder;
return bufferBuilder;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index a678e5a..7d0f7de 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -225,11 +225,6 @@ public abstract class RecordWriter<T extends
IOReadableWritable> implements Avai
abstract BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException;
/**
- * Requests a new {@link BufferBuilder} for the target channel and
returns it.
- */
- abstract BufferBuilder requestNewBufferBuilder(int targetChannel)
throws IOException, InterruptedException;
-
- /**
* Marks the current {@link BufferBuilder} as finished if present and
clears the state for next one.
*/
abstract void tryFinishCurrentBufferBuilder(int targetChannel);
@@ -289,12 +284,14 @@ public abstract class RecordWriter<T extends
IOReadableWritable> implements Avai
targetPartition.addBufferConsumer(consumer, targetChannel);
}
- @VisibleForTesting
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- BufferBuilder builder = targetPartition.tryGetBufferBuilder();
+ /**
+ * Requests a new {@link BufferBuilder} for the target channel and
returns it.
+ */
+ public BufferBuilder requestNewBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ BufferBuilder builder =
targetPartition.tryGetBufferBuilder(targetChannel);
if (builder == null) {
long start = System.currentTimeMillis();
- builder = targetPartition.getBufferBuilder();
+ builder =
targetPartition.getBufferBuilder(targetChannel);
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
}
return builder;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index efb35dc..f589e07 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -59,7 +59,7 @@ public interface ResultPartitionWriter extends AutoCloseable,
AvailabilityProvid
/**
* Requests a {@link BufferBuilder} from this partition for writing
data.
*/
- BufferBuilder getBufferBuilder() throws IOException,
InterruptedException;
+ BufferBuilder getBufferBuilder(int targetChannel) throws IOException,
InterruptedException;
/**
@@ -67,7 +67,7 @@ public interface ResultPartitionWriter extends AutoCloseable,
AvailabilityProvid
*
* <p>Returns <code>null</code> if no buffer is available or the buffer
provider has been destroyed.
*/
- BufferBuilder tryGetBufferBuilder() throws IOException;
+ BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException;
/**
* Adds the bufferConsumer to the subpartition with the given index.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 77d3526..b18569e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import javax.annotation.concurrent.NotThreadSafe;
@@ -126,6 +127,16 @@ public class BufferBuilder {
return memorySegment.size();
}
+ @VisibleForTesting
+ public BufferRecycler getRecycler() {
+ return recycler;
+ }
+
+ @VisibleForTesting
+ public MemorySegment getMemorySegment() {
+ return memorySegment;
+ }
+
/**
* Holds a reference to the current writer position. Negative values
indicate that writer ({@link BufferBuilder}
* has finished. Value {@code Integer.MIN_VALUE} represents finished
empty buffer.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 511eb65..c1112f1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -73,4 +73,6 @@ public interface BufferPool extends BufferProvider,
BufferRecycler {
* Returns the number of used buffers of this buffer pool.
*/
int bestEffortGetNumOfUsedBuffers();
+
+ BufferRecycler[] getSubpartitionBufferRecyclers();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index 7d623f8..1f600a1 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -48,10 +48,19 @@ public interface BufferPoolFactory {
* minimum number of network buffers in this pool
* @param maxUsedBuffers
* maximum number of network buffers this pool offers
- * @param bufferPoolOwner
+ * @param bufferPoolOwner
* the owner of this buffer pool to release memory when needed
+ * @param numSubpartitions
+ * number of subpartitions in this pool
+ * @param maxBuffersPerChannel
+ * maximum number of buffers to use for each channel
*/
- BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers,
BufferPoolOwner bufferPoolOwner) throws IOException;
+ BufferPool createBufferPool(
+ int numRequiredBuffers,
+ int maxUsedBuffers,
+ BufferPoolOwner bufferPoolOwner,
+ int numSubpartitions,
+ int maxBuffersPerChannel) throws IOException;
/**
* Destroy callback for updating factory book keeping.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 0551804..f2f7f5d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.runtime.io.AvailabilityProvider;
+import javax.annotation.Nullable;
+
import java.io.IOException;
/**
@@ -33,26 +35,46 @@ public interface BufferProvider extends
AvailabilityProvider {
/**
* Returns a {@link Buffer} instance from the buffer provider, if one
is available.
*
- * <p>Returns <code>null</code> if no buffer is available or the buffer
provider has been destroyed.
+ * @return {@code null} if no buffer is available or the buffer
provider has been destroyed.
*/
- Buffer requestBuffer() throws IOException;
+ @Nullable Buffer requestBuffer() throws IOException;
/**
- * Returns a {@link BufferBuilder} instance from the buffer provider.
+ * Returns a {@link BufferBuilder} instance from the buffer provider.
This equals to {@link #requestBufferBuilder(int)}
+ * with unknown target channel.
*
- * <p>Returns <code>null</code> if no buffer is available or the buffer
provider has been destroyed.
+ * @return {@code null} if no buffer is available or the buffer
provider has been destroyed.
*/
- BufferBuilder requestBufferBuilder() throws IOException;
+ @Nullable BufferBuilder requestBufferBuilder() throws IOException;
/**
* Returns a {@link BufferBuilder} instance from the buffer provider.
*
+ * @param targetChannel to which the request will be accounted to.
+ * @return {@code null} if no buffer is available or the buffer
provider has been destroyed.
+ */
+ @Nullable BufferBuilder requestBufferBuilder(int targetChannel) throws
IOException;
+
+ /**
+ * Returns a {@link BufferBuilder} instance from the buffer provider.
This equals to {@link #requestBufferBuilderBlocking(int)}
+ * with unknown target channel.
+ *
* <p>If there is no buffer available, the call will block until one
becomes available again or the
* buffer provider has been destroyed.
*/
BufferBuilder requestBufferBuilderBlocking() throws IOException,
InterruptedException;
/**
+ * Returns a {@link BufferBuilder} instance from the buffer provider.
+ *
+ * <p>If there is no buffer available, the call will block until one
becomes available again or the
+ * buffer provider has been destroyed.
+ *
+ * @param targetChannel to which the request will be accounted to.
+ */
+ BufferBuilder requestBufferBuilderBlocking(int targetChannel) throws
IOException, InterruptedException;
+
+ /**
* Adds a buffer availability listener to the buffer provider.
*
* <p>The operation fails with return value <code>false</code>, when
there is a buffer available or
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a6af471..73e01b1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.buffer;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
import org.apache.flink.util.ExceptionUtils;
@@ -51,6 +52,8 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
class LocalBufferPool implements BufferPool {
private static final Logger LOG =
LoggerFactory.getLogger(LocalBufferPool.class);
+ private static final int UNKNOWN_CHANNEL = -1;
+
/** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;
@@ -87,6 +90,14 @@ class LocalBufferPool implements BufferPool {
*/
private int numberOfRequestedMemorySegments;
+ private final int maxBuffersPerChannel;
+
+ private final int[] subpartitionBuffersCount;
+
+ private final BufferRecycler[] subpartitionBufferRecyclers;
+
+ private int unavailableSubpartitionsCount = 0;
+
private boolean isDestroyed;
@Nullable
@@ -104,7 +115,13 @@ class LocalBufferPool implements BufferPool {
* minimum number of network buffers
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int
numberOfRequiredMemorySegments) {
- this(networkBufferPool, numberOfRequiredMemorySegments,
Integer.MAX_VALUE, null);
+ this(
+ networkBufferPool,
+ numberOfRequiredMemorySegments,
+ Integer.MAX_VALUE,
+ null,
+ 0,
+ Integer.MAX_VALUE);
}
/**
@@ -120,7 +137,13 @@ class LocalBufferPool implements BufferPool {
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int
numberOfRequiredMemorySegments,
int maxNumberOfMemorySegments) {
- this(networkBufferPool, numberOfRequiredMemorySegments,
maxNumberOfMemorySegments, null);
+ this(
+ networkBufferPool,
+ numberOfRequiredMemorySegments,
+ maxNumberOfMemorySegments,
+ null,
+ 0,
+ Integer.MAX_VALUE);
}
/**
@@ -133,14 +156,20 @@ class LocalBufferPool implements BufferPool {
* minimum number of network buffers
* @param maxNumberOfMemorySegments
* maximum number of network buffers to allocate
- * @param bufferPoolOwner
+ * @param bufferPoolOwner
* the owner of this buffer pool to release memory when
needed
+ * @param numberOfSubpartitions
+ * number of subpartitions
+ * @param maxBuffersPerChannel
+ * maximum number of buffers to use for each channel
*/
LocalBufferPool(
- NetworkBufferPool networkBufferPool,
- int numberOfRequiredMemorySegments,
- int maxNumberOfMemorySegments,
- @Nullable BufferPoolOwner bufferPoolOwner) {
+ NetworkBufferPool networkBufferPool,
+ int numberOfRequiredMemorySegments,
+ int maxNumberOfMemorySegments,
+ @Nullable BufferPoolOwner bufferPoolOwner,
+ int numberOfSubpartitions,
+ int maxBuffersPerChannel) {
checkArgument(maxNumberOfMemorySegments >=
numberOfRequiredMemorySegments,
"Maximum number of memory segments (%s) should not be
smaller than minimum (%s).",
maxNumberOfMemorySegments,
numberOfRequiredMemorySegments);
@@ -157,6 +186,19 @@ class LocalBufferPool implements BufferPool {
this.currentPoolSize = numberOfRequiredMemorySegments;
this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
this.bufferPoolOwner = bufferPoolOwner;
+
+ if (numberOfSubpartitions > 0) {
+ checkArgument(maxBuffersPerChannel > 0,
+ "Maximum number of buffers for each channel
(%s) should be larger than 0.",
+ maxBuffersPerChannel);
+ }
+
+ this.subpartitionBuffersCount = new int[numberOfSubpartitions];
+ subpartitionBufferRecyclers = new
BufferRecycler[numberOfSubpartitions];
+ for (int i = 0; i < subpartitionBufferRecyclers.length; i++) {
+ subpartitionBufferRecyclers[i] = new
SubpartitionBufferRecycler(i, this);
+ }
+ this.maxBuffersPerChannel = maxBuffersPerChannel;
}
//
------------------------------------------------------------------------
@@ -206,12 +248,22 @@ class LocalBufferPool implements BufferPool {
@Override
public BufferBuilder requestBufferBuilder() throws IOException {
- return toBufferBuilder(requestMemorySegment());
+ return toBufferBuilder(requestMemorySegment(UNKNOWN_CHANNEL),
UNKNOWN_CHANNEL);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilder(int targetChannel) throws
IOException {
+ return toBufferBuilder(requestMemorySegment(targetChannel),
targetChannel);
}
@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException,
InterruptedException {
- return toBufferBuilder(requestMemorySegmentBlocking());
+ return
toBufferBuilder(requestMemorySegmentBlocking(UNKNOWN_CHANNEL), UNKNOWN_CHANNEL);
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
throws IOException, InterruptedException {
+ return
toBufferBuilder(requestMemorySegmentBlocking(targetChannel), targetChannel);
}
private Buffer toBuffer(MemorySegment memorySegment) {
@@ -221,16 +273,21 @@ class LocalBufferPool implements BufferPool {
return new NetworkBuffer(memorySegment, this);
}
- private BufferBuilder toBufferBuilder(MemorySegment memorySegment) {
+ private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int
targetChannel) {
if (memorySegment == null) {
return null;
}
- return new BufferBuilder(memorySegment, this);
+
+ if (targetChannel == UNKNOWN_CHANNEL) {
+ return new BufferBuilder(memorySegment, this);
+ } else {
+ return new BufferBuilder(memorySegment,
subpartitionBufferRecyclers[targetChannel]);
+ }
}
- private MemorySegment requestMemorySegmentBlocking() throws
InterruptedException, IOException {
+ private MemorySegment requestMemorySegmentBlocking(int targetChannel)
throws InterruptedException, IOException {
MemorySegment segment;
- while ((segment = requestMemorySegment()) == null) {
+ while ((segment = requestMemorySegment(targetChannel)) == null)
{
try {
// wait until available
getAvailableFuture().get();
@@ -243,7 +300,7 @@ class LocalBufferPool implements BufferPool {
}
@Nullable
- private MemorySegment requestMemorySegment() throws IOException {
+ private MemorySegment requestMemorySegment(int targetChannel) throws
IOException {
MemorySegment segment = null;
synchronized (availableMemorySegments) {
returnExcessMemorySegments();
@@ -258,11 +315,23 @@ class LocalBufferPool implements BufferPool {
if (segment == null) {
availabilityHelper.resetUnavailable();
}
+
+ if (segment != null && targetChannel !=
UNKNOWN_CHANNEL) {
+ if (subpartitionBuffersCount[targetChannel]++
== maxBuffersPerChannel) {
+ unavailableSubpartitionsCount++;
+ availabilityHelper.resetUnavailable();
+ }
+ }
}
return segment;
}
@Nullable
+ private MemorySegment requestMemorySegment() throws IOException {
+ return requestMemorySegment(UNKNOWN_CHANNEL);
+ }
+
+ @Nullable
private MemorySegment requestMemorySegmentFromGlobal() throws
IOException {
assert Thread.holdsLock(availableMemorySegments);
@@ -287,20 +356,32 @@ class LocalBufferPool implements BufferPool {
@Override
public void recycle(MemorySegment segment) {
+ recycle(segment, UNKNOWN_CHANNEL);
+ }
+
+ private void recycle(MemorySegment segment, int channel) {
BufferListener listener;
CompletableFuture<?> toNotify = null;
NotificationResult notificationResult =
NotificationResult.BUFFER_NOT_USED;
while (!notificationResult.isBufferUsed()) {
synchronized (availableMemorySegments) {
+ final int oldUnavailableSubpartitionsCount =
unavailableSubpartitionsCount;
+ if (channel != UNKNOWN_CHANNEL) {
+ if (--subpartitionBuffersCount[channel]
== maxBuffersPerChannel) {
+ unavailableSubpartitionsCount--;
+ }
+ }
+
if (isDestroyed ||
numberOfRequestedMemorySegments > currentPoolSize) {
returnMemorySegment(segment);
return;
} else {
listener = registeredListeners.poll();
if (listener == null) {
- boolean wasUnavailable =
availableMemorySegments.isEmpty();
+ boolean wasUnavailable =
availableMemorySegments.isEmpty() || oldUnavailableSubpartitionsCount > 0;
availableMemorySegments.add(segment);
- if (wasUnavailable) {
+ // only need to check
unavailableSubpartitionsCount here because availableMemorySegments is not empty
+ if (wasUnavailable &&
unavailableSubpartitionsCount == 0) {
toNotify =
availabilityHelper.getUnavailableToResetAvailable();
}
break;
@@ -413,7 +494,7 @@ class LocalBufferPool implements BufferPool {
@Override
public CompletableFuture<?> getAvailableFuture() {
- if (numberOfRequestedMemorySegments >= currentPoolSize) {
+ if (numberOfRequestedMemorySegments >= currentPoolSize ||
unavailableSubpartitionsCount > 0) {
return availabilityHelper.getAvailableFuture();
} else if (availabilityHelper.isApproximatelyAvailable() ||
networkBufferPool.isApproximatelyAvailable()) {
return AVAILABLE;
@@ -426,9 +507,11 @@ class LocalBufferPool implements BufferPool {
public String toString() {
synchronized (availableMemorySegments) {
return String.format(
- "[size: %d, required: %d, requested: %d,
available: %d, max: %d, listeners: %d, destroyed: %s]",
+ "[size: %d, required: %d, requested: %d,
available: %d, max: %d, listeners: %d," +
+ "subpartitions: %d,
maxBuffersPerChannel: %d, destroyed: %s]",
currentPoolSize,
numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
- availableMemorySegments.size(),
maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
+ availableMemorySegments.size(),
maxNumberOfMemorySegments, registeredListeners.size(),
+ subpartitionBuffersCount.length,
maxBuffersPerChannel, isDestroyed);
}
}
@@ -463,4 +546,26 @@ class LocalBufferPool implements BufferPool {
returnMemorySegment(segment);
}
}
+
+ @VisibleForTesting
+ @Override
+ public BufferRecycler[] getSubpartitionBufferRecyclers() {
+ return subpartitionBufferRecyclers;
+ }
+
+ private static class SubpartitionBufferRecycler implements
BufferRecycler {
+
+ private int channel;
+ private LocalBufferPool bufferPool;
+
+ SubpartitionBufferRecycler(int channel, LocalBufferPool
bufferPool) {
+ this.channel = channel;
+ this.bufferPool = bufferPool;
+ }
+
+ @Override
+ public void recycle(MemorySegment memorySegment) {
+ bufferPool.recycle(memorySegment, channel);
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 6e0f19b..513308c 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -314,18 +314,35 @@ public class NetworkBufferPool implements
BufferPoolFactory, MemorySegmentProvid
@Override
public BufferPool createBufferPool(int numRequiredBuffers, int
maxUsedBuffers) throws IOException {
- return internalCreateBufferPool(numRequiredBuffers,
maxUsedBuffers, null);
+ return internalCreateBufferPool(
+ numRequiredBuffers,
+ maxUsedBuffers,
+ null,
+ 0,
+ Integer.MAX_VALUE);
}
@Override
- public BufferPool createBufferPool(int numRequiredBuffers, int
maxUsedBuffers, BufferPoolOwner bufferPoolOwner) throws IOException {
- return internalCreateBufferPool(numRequiredBuffers,
maxUsedBuffers, bufferPoolOwner);
+ public BufferPool createBufferPool(
+ int numRequiredBuffers,
+ int maxUsedBuffers,
+ BufferPoolOwner bufferPoolOwner,
+ int numSubpartitions,
+ int maxBuffersPerChannel) throws IOException {
+ return internalCreateBufferPool(
+ numRequiredBuffers,
+ maxUsedBuffers,
+ bufferPoolOwner,
+ numSubpartitions,
+ maxBuffersPerChannel);
}
private BufferPool internalCreateBufferPool(
int numRequiredBuffers,
int maxUsedBuffers,
- @Nullable BufferPoolOwner bufferPoolOwner) throws
IOException {
+ @Nullable BufferPoolOwner bufferPoolOwner,
+ int numSubpartitions,
+ int maxBuffersPerChannel) throws IOException {
// It is necessary to use a separate lock from the one used for
buffer
// requests to ensure deadlock freedom for failure cases.
@@ -349,7 +366,13 @@ public class NetworkBufferPool implements
BufferPoolFactory, MemorySegmentProvid
// We are good to go, create a new buffer pool and
redistribute
// non-fixed size buffers.
LocalBufferPool localBufferPool =
- new LocalBufferPool(this, numRequiredBuffers,
maxUsedBuffers, bufferPoolOwner);
+ new LocalBufferPool(
+ this,
+ numRequiredBuffers,
+ maxUsedBuffers,
+ bufferPoolOwner,
+ numSubpartitions,
+ maxBuffersPerChannel);
allBufferPools.add(localBufferPool);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 95f1168..fa1da53 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -104,7 +104,7 @@ public class PipelinedSubpartition extends
ResultSubpartition {
@Override
public void initializeState(ChannelStateReader stateReader) throws
IOException, InterruptedException {
for (ReadResult readResult = ReadResult.HAS_MORE_DATA;
readResult == ReadResult.HAS_MORE_DATA;) {
- BufferBuilder bufferBuilder =
parent.getBufferPool().requestBufferBuilderBlocking();
+ BufferBuilder bufferBuilder =
parent.getBufferPool().requestBufferBuilderBlocking(subpartitionInfo.getSubPartitionIdx());
BufferConsumer bufferConsumer =
bufferBuilder.createBufferConsumer();
readResult =
stateReader.readOutputData(subpartitionInfo, bufferBuilder);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index a89b564..a8926dc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -206,15 +206,15 @@ public class ResultPartition implements
ResultPartitionWriter, BufferPoolOwner {
//
------------------------------------------------------------------------
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
checkInProduceState();
- return bufferPool.requestBufferBuilderBlocking();
+ return bufferPool.requestBufferBuilderBlocking(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel) throws
IOException {
+ BufferBuilder bufferBuilder =
bufferPool.requestBufferBuilder(targetChannel);
return bufferBuilder;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index f52ffed..07e39e7 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -64,6 +64,8 @@ public class ResultPartitionFactory {
private final String compressionCodec;
+ private final int maxBuffersPerChannel;
+
public ResultPartitionFactory(
ResultPartitionManager partitionManager,
FileChannelManager channelManager,
@@ -74,7 +76,8 @@ public class ResultPartitionFactory {
int networkBufferSize,
boolean forcePartitionReleaseOnConsumption,
boolean blockingShuffleCompressionEnabled,
- String compressionCodec) {
+ String compressionCodec,
+ int maxBuffersPerChannel) {
this.partitionManager = partitionManager;
this.channelManager = channelManager;
@@ -86,6 +89,7 @@ public class ResultPartitionFactory {
this.forcePartitionReleaseOnConsumption =
forcePartitionReleaseOnConsumption;
this.blockingShuffleCompressionEnabled =
blockingShuffleCompressionEnabled;
this.compressionCodec = compressionCodec;
+ this.maxBuffersPerChannel = maxBuffersPerChannel;
}
public ResultPartition create(
@@ -220,7 +224,9 @@ public class ResultPartitionFactory {
return bufferPoolFactory.createBufferPool(
numberOfSubpartitions + 1,
maxNumberOfMemorySegments,
- type.hasBackPressure() ? null :
bufferPoolOwner);
+ type.hasBackPressure() ? null : bufferPoolOwner,
+ numberOfSubpartitions,
+ maxBuffersPerChannel);
};
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 6dd507b..f2370aa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -66,13 +66,13 @@ public class
ConsumableNotifyingResultPartitionWriterDecorator implements Result
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- return partitionWriter.getBufferBuilder();
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ return partitionWriter.getBufferBuilder(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- return partitionWriter.tryGetBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel) throws
IOException {
+ return partitionWriter.tryGetBufferBuilder(targetChannel);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index c1c3056..feca864 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -74,6 +74,8 @@ public class NettyShuffleEnvironmentConfiguration {
private final String compressionCodec;
+ private final int maxBuffersPerChannel;
+
public NettyShuffleEnvironmentConfiguration(
int numNetworkBuffers,
int networkBufferSize,
@@ -88,7 +90,8 @@ public class NettyShuffleEnvironmentConfiguration {
BoundedBlockingSubpartitionType
blockingSubpartitionType,
boolean forcePartitionReleaseOnConsumption,
boolean blockingShuffleCompressionEnabled,
- String compressionCodec) {
+ String compressionCodec,
+ int maxBuffersPerChannel) {
this.numNetworkBuffers = numNetworkBuffers;
this.networkBufferSize = networkBufferSize;
@@ -104,6 +107,7 @@ public class NettyShuffleEnvironmentConfiguration {
this.forcePartitionReleaseOnConsumption =
forcePartitionReleaseOnConsumption;
this.blockingShuffleCompressionEnabled =
blockingShuffleCompressionEnabled;
this.compressionCodec =
Preconditions.checkNotNull(compressionCodec);
+ this.maxBuffersPerChannel = maxBuffersPerChannel;
}
//
------------------------------------------------------------------------
@@ -164,6 +168,10 @@ public class NettyShuffleEnvironmentConfiguration {
return compressionCodec;
}
+ public int getMaxBuffersPerChannel() {
+ return maxBuffersPerChannel;
+ }
+
//
------------------------------------------------------------------------
/**
@@ -199,6 +207,8 @@ public class NettyShuffleEnvironmentConfiguration {
int buffersPerChannel =
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
int extraBuffersPerGate =
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+ int maxBuffersPerChannel =
configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_MAX_BUFFERS_PER_CHANNEL);
+
boolean isNetworkDetailedMetrics =
configuration.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_DETAILED_METRICS);
String[] tempDirs =
ConfigurationUtils.parseTempDirectories(configuration);
@@ -229,7 +239,8 @@ public class NettyShuffleEnvironmentConfiguration {
blockingSubpartitionType,
forcePartitionReleaseOnConsumption,
blockingShuffleCompressionEnabled,
- compressionCodec);
+ compressionCodec,
+ maxBuffersPerChannel);
}
/**
@@ -351,6 +362,7 @@ public class NettyShuffleEnvironmentConfiguration {
result = 31 * result + (forcePartitionReleaseOnConsumption ? 1
: 0);
result = 31 * result + (blockingShuffleCompressionEnabled ? 1 :
0);
result = 31 * result + Objects.hashCode(compressionCodec);
+ result = 31 * result + maxBuffersPerChannel;
return result;
}
@@ -376,6 +388,7 @@ public class NettyShuffleEnvironmentConfiguration {
Arrays.equals(this.tempDirs,
that.tempDirs) &&
this.forcePartitionReleaseOnConsumption
== that.forcePartitionReleaseOnConsumption &&
this.blockingShuffleCompressionEnabled
== that.blockingShuffleCompressionEnabled &&
+ this.maxBuffersPerChannel ==
that.maxBuffersPerChannel &&
Objects.equals(this.compressionCodec,
that.compressionCodec);
}
}
@@ -395,6 +408,7 @@ public class NettyShuffleEnvironmentConfiguration {
", forcePartitionReleaseOnConsumption=" +
forcePartitionReleaseOnConsumption +
", blockingShuffleCompressionEnabled=" +
blockingShuffleCompressionEnabled +
", compressionCodec=" + compressionCodec +
+ ", maxBuffersPerChannel=" +
maxBuffersPerChannel +
'}';
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index ec29f82..3a283a1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -49,6 +49,8 @@ public class NettyShuffleEnvironmentBuilder {
private int floatingNetworkBuffersPerGate = 8;
+ private int maxBuffersPerChannel = Integer.MAX_VALUE;
+
private boolean blockingShuffleCompressionEnabled = false;
private String compressionCodec = "LZ4";
@@ -89,6 +91,11 @@ public class NettyShuffleEnvironmentBuilder {
return this;
}
+ public NettyShuffleEnvironmentBuilder setMaxBuffersPerChannel(int
maxBuffersPerChannel) {
+ this.maxBuffersPerChannel = maxBuffersPerChannel;
+ return this;
+ }
+
public NettyShuffleEnvironmentBuilder
setBlockingShuffleCompressionEnabled(boolean blockingShuffleCompressionEnabled)
{
this.blockingShuffleCompressionEnabled =
blockingShuffleCompressionEnabled;
return this;
@@ -125,7 +132,8 @@ public class NettyShuffleEnvironmentBuilder {
BoundedBlockingSubpartitionType.AUTO,
false,
blockingShuffleCompressionEnabled,
- compressionCodec),
+ compressionCodec,
+ maxBuffersPerChannel),
taskManagerLocation,
new TaskEventDispatcher(),
metricGroup);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 7b12a98..a6517c7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -45,13 +45,13 @@ public abstract class
AbstractCollectingResultPartitionWriter extends MockResult
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking();
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ return
bufferProvider.requestBufferBuilderBlocking(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- return bufferProvider.requestBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel) throws
IOException {
+ return bufferProvider.requestBufferBuilder(targetChannel);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 4f6f7f8..97d8053 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -125,7 +125,7 @@ public class RecordWriterDelegateTest extends TestLogger {
}
private RecordWriter createRecordWriter(NetworkBufferPool globalPool)
throws Exception {
- final BufferPool localPool = globalPool.createBufferPool(1, 1);
+ final BufferPool localPool = globalPool.createBufferPool(1, 1,
null, 1, Integer.MAX_VALUE);
final ResultPartitionWriter partition = new
ResultPartitionBuilder()
.setBufferPoolFactory(p -> localPool)
.build();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ff06045..9beb01a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -413,7 +413,7 @@ public class RecordWriterTest {
public void testIsAvailableOrNot() throws Exception {
// setup
final NetworkBufferPool globalPool = new NetworkBufferPool(10,
128, 2);
- final BufferPool localPool = globalPool.createBufferPool(1, 1);
+ final BufferPool localPool = globalPool.createBufferPool(1, 1,
null, 1, Integer.MAX_VALUE);
final ResultPartitionWriter resultPartition = new
ResultPartitionBuilder()
.setBufferPoolFactory(p -> localPool)
.build();
@@ -430,7 +430,7 @@ public class RecordWriterTest {
assertTrue(recordWriter.getAvailableFuture().isDone());
// request one buffer from the local pool to make it
unavailable afterwards
- final BufferBuilder bufferBuilder =
resultPartition.getBufferBuilder();
+ final BufferBuilder bufferBuilder =
resultPartition.getBufferBuilder(0);
assertNotNull(bufferBuilder);
assertFalse(recordWriter.getAvailableFuture().isDone());
@@ -484,7 +484,11 @@ public class RecordWriterTest {
while ((bufferAndBacklog =
view.getNextBuffer()) != null) {
Buffer buffer =
bufferAndBacklog.buffer();
int[] expected = numConsumedBuffers <
totalStates ? states : expectedRecordsInBuffer[numConsumedBuffers -
totalStates];
-
BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool(),
expected);
+
BufferBuilderAndConsumerTest.assertContent(
+ buffer,
+ partition.getBufferPool()
+
.getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()],
+ expected);
buffer.recycleBuffer();
numConsumedBuffers++;
@@ -503,7 +507,7 @@ public class RecordWriterTest {
public void testIdleTime() throws IOException, InterruptedException {
// setup
final NetworkBufferPool globalPool = new NetworkBufferPool(10,
128, 2);
- final BufferPool localPool = globalPool.createBufferPool(1, 1);
+ final BufferPool localPool = globalPool.createBufferPool(1, 1,
null, 1, Integer.MAX_VALUE);
final ResultPartitionWriter resultPartition = new
ResultPartitionBuilder()
.setBufferPoolFactory(p -> localPool)
.build();
@@ -514,7 +518,9 @@ public class RecordWriterTest {
resultPartition,
new NoOpResultPartitionConsumableNotifier());
final RecordWriter recordWriter =
createRecordWriter(partitionWrapper);
- BufferBuilder builder = recordWriter.getBufferBuilder();
+ BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
+ final Buffer buffer =
BufferBuilderTestUtils.buildSingleBuffer(builder);
+ builder.finish();
// idle time is zero when there is buffer available.
assertEquals(0,
recordWriter.getIdleTimeMsPerSecond().getCount());
@@ -528,7 +534,7 @@ public class RecordWriterTest {
// notify that the request thread start
to run.
syncLock.countDown();
// wait for buffer.
-
asyncRequestResult.set(recordWriter.getBufferBuilder());
+
asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
} catch (Exception e) {
}
}
@@ -539,15 +545,13 @@ public class RecordWriterTest {
syncLock.await();
Thread.sleep(10);
- //recycle the buffer
- final Buffer buffer =
BufferBuilderTestUtils.buildSingleBuffer(builder);
+ //recycle the buffer
buffer.recycleBuffer();
requestThread.join();
assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(),
Matchers.greaterThan(0L));
assertNotNull(asyncRequestResult.get());
-
}
private void verifyBroadcastBufferOrEventIndependence(boolean
broadcastEvent) throws Exception {
@@ -634,13 +638,13 @@ public class RecordWriterTest {
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking();
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ return
bufferProvider.requestBufferBuilderBlocking(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- return bufferProvider.requestBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel)
throws IOException {
+ return
bufferProvider.requestBufferBuilder(targetChannel);
}
@Override
@@ -672,13 +676,13 @@ public class RecordWriterTest {
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking();
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ return
bufferProvider.requestBufferBuilderBlocking(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- return bufferProvider.requestBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel)
throws IOException {
+ return
bufferProvider.requestBufferBuilder(targetChannel);
}
}
@@ -691,13 +695,13 @@ public class RecordWriterTest {
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- return bufferProvider.requestBufferBuilderBlocking();
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
+ return
bufferProvider.requestBufferBuilderBlocking(targetChannel);
}
@Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
- return bufferProvider.requestBufferBuilder();
+ public BufferBuilder tryGetBufferBuilder(int targetChannel)
throws IOException {
+ return
bufferProvider.requestBufferBuilder(targetChannel);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index a0d1612..f263330 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -407,6 +407,43 @@ public class LocalBufferPoolTest extends TestLogger {
}
@Test
+ public void testMaxBuffersPerChannelAndAvailability() throws
IOException, InterruptedException {
+ localBufferPool.lazyDestroy();
+ localBufferPool = new LocalBufferPool(networkBufferPool, 1,
Integer.MAX_VALUE, null, 3, 1);
+ localBufferPool.setNumBuffers(10);
+
+ assertTrue(localBufferPool.getAvailableFuture().isDone());
+
+ // request one segment from subpartitin-0 and subpartition-1
respectively
+ final BufferBuilder bufferBuilder01 =
localBufferPool.requestBufferBuilderBlocking(0);
+ final BufferBuilder bufferBuilder11 =
localBufferPool.requestBufferBuilderBlocking(1);
+ assertTrue(localBufferPool.getAvailableFuture().isDone());
+
+ // request one segment from subpartition-0
+ final BufferBuilder bufferBuilder02 =
localBufferPool.requestBufferBuilderBlocking(0);
+ assertFalse(localBufferPool.getAvailableFuture().isDone());
+
+ final BufferBuilder bufferBuilder03 =
localBufferPool.requestBufferBuilderBlocking(0);
+ final BufferBuilder bufferBuilder21 =
localBufferPool.requestBufferBuilderBlocking(2);
+ final BufferBuilder bufferBuilder22 =
localBufferPool.requestBufferBuilderBlocking(2);
+ assertFalse(localBufferPool.getAvailableFuture().isDone());
+
+ // recycle segments
+
bufferBuilder11.getRecycler().recycle(bufferBuilder11.getMemorySegment());
+ assertFalse(localBufferPool.getAvailableFuture().isDone());
+
bufferBuilder21.getRecycler().recycle(bufferBuilder21.getMemorySegment());
+ assertFalse(localBufferPool.getAvailableFuture().isDone());
+
bufferBuilder02.getRecycler().recycle(bufferBuilder02.getMemorySegment());
+ assertFalse(localBufferPool.getAvailableFuture().isDone());
+
bufferBuilder01.getRecycler().recycle(bufferBuilder01.getMemorySegment());
+ assertTrue(localBufferPool.getAvailableFuture().isDone());
+
bufferBuilder03.getRecycler().recycle(bufferBuilder03.getMemorySegment());
+ assertTrue(localBufferPool.getAvailableFuture().isDone());
+
bufferBuilder22.getRecycler().recycle(bufferBuilder22.getMemorySegment());
+ assertTrue(localBufferPool.getAvailableFuture().isDone());
+ }
+
+ @Test
public void testIsAvailableOrNot() throws Exception {
// the local buffer pool should be in available state initially
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 7bdb884..8cbe0d9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -327,7 +327,7 @@ public class NetworkBufferPoolTest extends TestLogger {
// make releaseMemory calls always fail:
numBuffersToRecycle -> {
throw new TestIOException();
- });
+ }, 0, Integer.MAX_VALUE);
try {
// take all but one buffer
@@ -366,7 +366,7 @@ public class NetworkBufferPoolTest extends TestLogger {
BufferPool bufferPool = networkBufferPool.createBufferPool(1,
numBuffers,
numBuffersToRecycle -> {
throw new TestIOException();
- });
+ }, 0, Integer.MAX_VALUE);
try {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index aa23744..cffb35c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -50,6 +52,16 @@ public class NoOpBufferPool implements BufferPool {
}
@Override
+ public BufferBuilder requestBufferBuilder(int targetChannel) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
throws IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean addBufferListener(BufferListener listener) {
throw new UnsupportedOperationException();
}
@@ -89,6 +101,12 @@ public class NoOpBufferPool implements BufferPool {
throw new UnsupportedOperationException();
}
+ @Nullable
+ @Override
+ public BufferRecycler[] getSubpartitionBufferRecyclers() {
+ return new BufferRecycler[0];
+ }
+
@Override
public void recycle(MemorySegment memorySegment) {
throw new UnsupportedOperationException();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index c2ee06f..5432fd8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -70,7 +70,7 @@ public class MockResultPartitionWriter implements
ResultPartitionWriter {
}
@Override
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
+ public BufferBuilder getBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
throw new UnsupportedOperationException();
}
@@ -79,8 +79,7 @@ public class MockResultPartitionWriter implements
ResultPartitionWriter {
throw new UnsupportedOperationException();
}
- @Override
- public BufferBuilder tryGetBufferBuilder() throws IOException {
+ public BufferBuilder tryGetBufferBuilder(int targetChannel) throws
IOException {
throw new UnsupportedOperationException();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 4a400cb..010aada 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -121,7 +121,7 @@ public class PartialConsumePipelinedResultTest extends
TestLogger {
final ResultPartitionWriter writer =
getEnvironment().getWriter(0);
for (int i = 0; i < 8; i++) {
- final BufferBuilder bufferBuilder =
writer.getBufferBuilder();
+ final BufferBuilder bufferBuilder =
writer.getBufferBuilder(0);
writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
Thread.sleep(50);
bufferBuilder.finish();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index a0c9faa..7c8966d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -56,6 +56,8 @@ public class ResultPartitionBuilder {
private int floatingNetworkBuffersPerGate = 1;
+ private int maxBuffersPerChannel = Integer.MAX_VALUE;
+
private int networkBufferSize = 1;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
@@ -167,7 +169,8 @@ public class ResultPartitionBuilder {
networkBufferSize,
releasedOnConsumption,
blockingShuffleCompressionEnabled,
- compressionCodec);
+ compressionCodec,
+ maxBuffersPerChannel);
FunctionWithException<BufferPoolOwner, BufferPool, IOException>
factory = bufferPoolFactory.orElseGet(() ->
resultPartitionFactory.createBufferPoolFactory(numberOfSubpartitions,
partitionType));
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 7f27dbd..f5570c7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -100,7 +100,8 @@ public class ResultPartitionFactoryTest extends TestLogger {
SEGMENT_SIZE,
releasePartitionOnConsumption,
false,
- "LZ4");
+ "LZ4",
+ Integer.MAX_VALUE);
final ResultPartitionDeploymentDescriptor descriptor = new
ResultPartitionDeploymentDescriptor(
PartitionDescriptorBuilder
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f3e512f..0dabb21 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -360,6 +360,32 @@ public class ResultPartitionTest {
}
}
+ /**
+ * Tests {@link ResultPartition#getAvailableFuture()}.
+ */
+ @Test
+ public void testIsAvailableOrNot() throws IOException,
InterruptedException {
+ final int numAllBuffers = 10;
+ final NettyShuffleEnvironment network = new
NettyShuffleEnvironmentBuilder()
+ .setNumNetworkBuffers(numAllBuffers).build();
+ final ResultPartition resultPartition =
createPartition(network, ResultPartitionType.PIPELINED, 1);
+
+ try {
+ resultPartition.setup();
+
+ resultPartition.getBufferPool().setNumBuffers(2);
+
+
assertTrue(resultPartition.getAvailableFuture().isDone());
+
+ resultPartition.getBufferBuilder(0);
+ resultPartition.getBufferBuilder(0);
+
assertFalse(resultPartition.getAvailableFuture().isDone());
+ } finally {
+ resultPartition.release();
+ network.close();
+ }
+ }
+
@Test
public void testPipelinedPartitionBufferPool() throws Exception {
testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
@@ -470,7 +496,11 @@ public class ResultPartitionTest {
ResultSubpartition.BufferAndBacklog bufferAndBacklog = view.getNextBuffer();
if (bufferAndBacklog != null) {
Buffer buffer =
bufferAndBacklog.buffer();
-
BufferBuilderAndConsumerTest.assertContent(buffer, partition.getBufferPool(),
states);
+
BufferBuilderAndConsumerTest.assertContent(
+ buffer,
+
partition.getBufferPool()
+
.getSubpartitionBufferRecyclers()[subpartition.getSubPartitionIndex()],
+ states);
buffer.recycleBuffer();
numConsumedBuffers++;
} else {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index e0ed290..4c2196d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -75,6 +75,11 @@ public class TestPooledBufferProvider implements
BufferProvider {
return null;
}
+ @Override
+ public BufferBuilder requestBufferBuilder(int targetChannel) throws
IOException {
+ return requestBufferBuilder();
+ }
+
private Buffer requestBufferBlocking() throws IOException,
InterruptedException {
Buffer buffer = buffers.poll();
if (buffer != null) {
@@ -96,6 +101,11 @@ public class TestPooledBufferProvider implements
BufferProvider {
}
@Override
+ public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
throws IOException, InterruptedException {
+ return requestBufferBuilderBlocking();
+ }
+
+ @Override
public boolean addBufferListener(BufferListener listener) {
return bufferRecycler.registerListener(listener);
}