Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4735#discussion_r141660332
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
---
@@ -209,31 +209,30 @@ public void testSetLessThanRequiredNumBuffers()
throws IOException {
//
------------------------------------------------------------------------
@Test
- public void testPendingRequestWithListenerAfterRecycle() throws
Exception {
- BufferListener listener = spy(new BufferListener() {
- @Override
- public boolean notifyBufferAvailable(Buffer buffer) {
- buffer.recycle();
- return false;
- }
+ public void testPendingRequestWithListenersAfterRecycle() throws
Exception {
+ BufferListener twoTimesListener = createBufferListener(2);
+ BufferListener oneTimeListener = createBufferListener(1);
- @Override
- public void notifyBufferDestroyed() {
- }
- });
+ localBufferPool.setNumBuffers(2);
- localBufferPool.setNumBuffers(1);
+ Buffer available1 = localBufferPool.requestBuffer();
+ Buffer available2 = localBufferPool.requestBuffer();
- Buffer available = localBufferPool.requestBuffer();
- Buffer unavailable = localBufferPool.requestBuffer();
+ assertNull(localBufferPool.requestBuffer());
- assertNull(unavailable);
+ assertTrue(localBufferPool.addBufferListener(twoTimesListener));
+ assertTrue(localBufferPool.addBufferListener(oneTimeListener));
- assertTrue(localBufferPool.addBufferListener(listener));
+ // Recycle the first buffer to notify both of the above
listeners and the
+ // <<tt>twoTimesListener</tt> will be added into the
<<tt>registeredListeners</tt>
+ // queue of buffer pool again
+ available1.recycle();
--- End diff --
can you also verify the notification right after this `recycle()` call? By
adding:
`verify(oneTimeListener,
times(1)).notifyBufferAvailable(any(Buffer.class));`
---