Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r152859775
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -715,4 +686,58 @@ private RemoteInputChannel createRemoteInputChannel(
initialAndMaxRequestBackoff._2(),
new
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
+
+ private Callable recycleExclusiveBufferTask(RemoteInputChannel
inputChannel, int numExclusiveSegments) {
+ final List<Buffer> exclusiveBuffers = new
ArrayList<>(numExclusiveSegments);
+ // Exhaust all the exclusive buffers
+ for (int i = 0; i < numExclusiveSegments; i++) {
+ Buffer buffer = inputChannel.requestBuffer();
+ assertNotNull(buffer);
+ exclusiveBuffers.add(buffer);
+ }
+
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ for (Buffer buffer : exclusiveBuffers) {
+ buffer.recycle();
+ }
+
+ return null;
+ }
+ };
+ }
+
+ private Callable recycleFloatingBufferTask(BufferPool bufferPool, int
numFloatingBuffers) throws Exception {
+ final List<Buffer> floatingBuffers = new
ArrayList<>(numFloatingBuffers);
--- End diff --
please add a Javadoc
---