[FLINK-7699][core] Define the BufferListener interface to replace 
EventlListener in BufferProvider

This closes #4735.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8706c6f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8706c6f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8706c6f4

Branch: refs/heads/master
Commit: 8706c6f445fc92159d15f610500d4ae5d2c49757
Parents: d3cbba5
Author: Zhijiang <[email protected]>
Authored: Wed Sep 27 18:24:15 2017 +0800
Committer: zentol <[email protected]>
Committed: Tue Oct 10 16:53:20 2017 +0200

----------------------------------------------------------------------
 .../io/network/buffer/BufferListener.java       | 39 +++++++++++++
 .../io/network/buffer/BufferProvider.java       | 21 +++----
 .../io/network/buffer/LocalBufferPool.java      | 17 +++---
 .../netty/PartitionRequestClientHandler.java    | 61 ++++++++++----------
 .../partition/consumer/RemoteInputChannel.java  | 53 ++++++++++++++++-
 .../io/network/buffer/LocalBufferPoolTest.java  | 61 ++++++++++++++------
 .../PartitionRequestClientHandlerTest.java      | 14 ++---
 .../util/TestInfiniteBufferProvider.java        |  4 +-
 .../network/util/TestPooledBufferProvider.java  | 12 ++--
 9 files changed, 196 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
new file mode 100644
index 0000000..05b4156
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+/**
+ * Interface of the availability of buffers. Listeners can opt for a one-time 
only
+ * notification or to be notified repeatedly.
+ */
+public interface BufferListener {
+
+       /**
+        * Notification callback if a buffer is recycled and becomes available 
in buffer pool.
+        *
+        * @param buffer buffer that becomes available in buffer pool.
+        * @return true if the listener wants to be notified next time.
+        */
+       boolean notifyBufferAvailable(Buffer buffer);
+
+       /**
+        * Notification callback if the buffer provider is destroyed.
+        */
+       void notifyBufferDestroyed();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index c3373fa..9782584 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -18,43 +18,38 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.runtime.util.event.EventListener;
-
 import java.io.IOException;
 
 /**
  * A buffer provider to request buffers from in a synchronous or asynchronous 
fashion.
  *
- * <p> The data producing side (result partition writers) request buffers in a 
synchronous fashion,
+ * <p>The data producing side (result partition writers) request buffers in a 
synchronous fashion,
  * whereas the input side requests asynchronously.
  */
 public interface BufferProvider {
 
        /**
         * Returns a {@link Buffer} instance from the buffer provider, if one 
is available.
-        * <p>
-        * Returns <code>null</code> if no buffer is available or the buffer 
provider has been destroyed.
+        *
+        * <p>Returns <code>null</code> if no buffer is available or the buffer 
provider has been destroyed.
         */
        Buffer requestBuffer() throws IOException;
 
        /**
         * Returns a {@link Buffer} instance from the buffer provider.
-        * <p>
-        * If there is no buffer available, the call will block until one 
becomes available again or the
+        *
+        * <p>If there is no buffer available, the call will block until one 
becomes available again or the
         * buffer provider has been destroyed.
         */
        Buffer requestBufferBlocking() throws IOException, InterruptedException;
 
        /**
         * Adds a buffer availability listener to the buffer provider.
-        * <p>
-        * The operation fails with return value <code>false</code>, when there 
is a buffer available or
+        *
+        * <p>The operation fails with return value <code>false</code>, when 
there is a buffer available or
         * the buffer provider has been destroyed.
-        * <p>
-        * If the buffer provider gets destroyed while the listener is 
registered the listener will be
-        * notified with a <code>null</code> value.
         */
-       boolean addListener(EventListener<Buffer> listener);
+       boolean addBufferListener(BufferListener listener);
 
        /**
         * Returns whether the buffer provider has been destroyed.

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index b485fd1..a66373c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,13 +19,11 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.Queue;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,7 +62,7 @@ class LocalBufferPool implements BufferPool {
         * Buffer availability listeners, which need to be notified when a 
Buffer becomes available.
         * Listeners can only be registered at a time/state where no Buffer 
instance was available.
         */
-       private final Queue<EventListener<Buffer>> registeredListeners = new 
ArrayDeque<EventListener<Buffer>>();
+       private final ArrayDeque<BufferListener> registeredListeners = new 
ArrayDeque<>();
 
        /** Maximum number of network buffers to allocate. */
        private final int maxNumberOfMemorySegments;
@@ -239,7 +237,7 @@ class LocalBufferPool implements BufferPool {
                                returnMemorySegment(segment);
                        }
                        else {
-                               EventListener<Buffer> listener = 
registeredListeners.poll();
+                               BufferListener listener = 
registeredListeners.poll();
 
                                if (listener == null) {
                                        availableMemorySegments.add(segment);
@@ -247,7 +245,10 @@ class LocalBufferPool implements BufferPool {
                                }
                                else {
                                        try {
-                                               listener.onEvent(new 
Buffer(segment, this));
+                                               boolean needMoreBuffers = 
listener.notifyBufferAvailable(new Buffer(segment, this));
+                                               if (needMoreBuffers) {
+                                                       
registeredListeners.add(listener);
+                                               }
                                        }
                                        catch (Throwable ignored) {
                                                
availableMemorySegments.add(segment);
@@ -270,9 +271,9 @@ class LocalBufferPool implements BufferPool {
                                        returnMemorySegment(segment);
                                }
 
-                               EventListener<Buffer> listener;
+                               BufferListener listener;
                                while ((listener = registeredListeners.poll()) 
!= null) {
-                                       listener.onEvent(null);
+                                       listener.notifyBufferDestroyed();
                                }
 
                                isDestroyed = true;
@@ -283,7 +284,7 @@ class LocalBufferPool implements BufferPool {
        }
 
        @Override
-       public boolean addListener(EventListener<Buffer> listener) {
+       public boolean addBufferListener(BufferListener listener) {
                synchronized (availableMemorySegments) {
                        if (!availableMemorySegments.isEmpty() || isDestroyed) {
                                return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index e3097ba..566b215 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
@@ -29,7 +30,6 @@ import 
org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
@@ -347,15 +347,15 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
        /**
         * A buffer availability listener, which subscribes/unsubscribes the NIO
         * read event.
-        * <p>
-        * If no buffer is available, the channel read event will be 
unsubscribed
+        *
+        * <p>If no buffer is available, the channel read event will be 
unsubscribed
         * until one becomes available again.
-        * <p>
-        * After a buffer becomes available again, the buffer is handed over by
-        * the thread calling {@link #onEvent(Buffer)} to the network I/O
+        *
+        * <p>After a buffer becomes available again, the buffer is handed over 
by
+        * the thread calling {@link #notifyBufferAvailable(Buffer)} to the 
network I/O
         * thread, which then continues the processing of the staged buffer.
         */
-       private class BufferListenerTask implements EventListener<Buffer>, 
Runnable {
+       private class BufferListenerTask implements BufferListener, Runnable {
 
                private final AtomicReference<Buffer> availableBuffer = new 
AtomicReference<Buffer>();
 
@@ -365,7 +365,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
 
                        stagedBufferResponse = bufferResponse;
 
-                       if (bufferProvider.addListener(this)) {
+                       if (bufferProvider.addBufferListener(this)) {
                                if (ctx.channel().config().isAutoRead()) {
                                        
ctx.channel().config().setAutoRead(false);
                                }
@@ -383,34 +383,33 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
                        return stagedBufferResponse != null;
                }
 
+               public void notifyBufferDestroyed() {
+                       // The buffer pool has been destroyed
+                       stagedBufferResponse = null;
+
+                       if (stagedMessages.isEmpty()) {
+                               ctx.channel().config().setAutoRead(true);
+                               ctx.channel().read();
+                       }
+                       else {
+                               
ctx.channel().eventLoop().execute(stagedMessagesHandler);
+                       }
+               }
+
                // Called by the recycling thread (not network I/O thread)
                @Override
-               public void onEvent(Buffer buffer) {
+               public boolean notifyBufferAvailable(Buffer buffer) {
                        boolean success = false;
 
                        try {
-                               if (buffer != null) {
-                                       if (availableBuffer.compareAndSet(null, 
buffer)) {
-                                               
ctx.channel().eventLoop().execute(this);
+                               if (availableBuffer.compareAndSet(null, 
buffer)) {
+                                       ctx.channel().eventLoop().execute(this);
 
-                                               success = true;
-                                       }
-                                       else {
-                                               throw new 
IllegalStateException("Received a buffer notification, " +
-                                                               " but the 
previous one has not been handled yet.");
-                                       }
+                                       success = true;
                                }
                                else {
-                                       // The buffer pool has been destroyed
-                                       stagedBufferResponse = null;
-
-                                       if (stagedMessages.isEmpty()) {
-                                               
ctx.channel().config().setAutoRead(true);
-                                               ctx.channel().read();
-                                       }
-                                       else {
-                                               
ctx.channel().eventLoop().execute(stagedMessagesHandler);
-                                       }
+                                       throw new 
IllegalStateException("Received a buffer notification, " +
+                                                       " but the previous one 
has not been handled yet.");
                                }
                        }
                        catch (Throwable t) {
@@ -423,12 +422,14 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
                                        }
                                }
                        }
+
+                       return false;
                }
 
                /**
                 * Continues the decoding of a staged buffer after a buffer has 
become available again.
-                * <p>
-                * This task is executed by the network I/O thread.
+                *
+                * <p>This task is executed by the network I/O thread.
                 */
                @Override
                public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index ee6bfda..4e1eaef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
@@ -46,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * An input channel, which requests a remote partition queue.
  */
-public class RemoteInputChannel extends InputChannel implements BufferRecycler 
{
+public class RemoteInputChannel extends InputChannel implements 
BufferRecycler, BufferListener {
 
        /** ID to distinguish this channel from other channels sharing the same 
TCP connection. */
        private final InputChannelID id = new InputChannelID();
@@ -87,6 +88,12 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler {
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
 
+       /** The number of unsent buffers in the producer's sub partition. */
+       private final AtomicInteger senderBacklog = new AtomicInteger(0);
+
+       /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
+       private final AtomicBoolean isWaitingForFloatingBuffers = new 
AtomicBoolean(false);
+
        public RemoteInputChannel(
                SingleInputGate inputGate,
                int channelIndex,
@@ -313,6 +320,50 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler {
                }
        }
 
+       /**
+        * The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
+        * currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
+        * the buffer will be added into the <tt>availableBuffers</tt> queue 
and the unannounced credit is
+        * increased by one.
+        *
+        * @param buffer Buffer that becomes available in buffer pool.
+        * @return True when this channel is waiting for more floating buffers, 
otherwise false.
+        */
+       @Override
+       public boolean notifyBufferAvailable(Buffer buffer) {
+               checkState(isWaitingForFloatingBuffers.get(), "This channel 
should be waiting for floating buffers.");
+
+               synchronized (availableBuffers) {
+                       // Important: the isReleased check should be inside the 
synchronized block.
+                       if (isReleased.get() || availableBuffers.size() >= 
senderBacklog.get()) {
+                               isWaitingForFloatingBuffers.set(false);
+                               buffer.recycle();
+
+                               return false;
+                       }
+
+                       availableBuffers.add(buffer);
+
+                       if (unannouncedCredit.getAndAdd(1) == 0) {
+                               notifyCreditAvailable();
+                       }
+
+                       if (availableBuffers.size() >= senderBacklog.get()) {
+                               isWaitingForFloatingBuffers.set(false);
+                               return false;
+                       } else {
+                               return true;
+                       }
+               }
+       }
+
+       @Override
+       public void notifyBufferDestroyed() {
+               if (!isWaitingForFloatingBuffers.compareAndSet(true, false)) {
+                       throw new IllegalStateException("This channel should be 
waiting for floating buffers currently.");
+               }
+       }
+
        // 
------------------------------------------------------------------------
        // Network I/O notifications (called by network I/O thread)
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 03f82d8..7a309d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -27,7 +26,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Matchers;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -46,6 +44,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -210,32 +209,39 @@ public class LocalBufferPoolTest {
        // 
------------------------------------------------------------------------
 
        @Test
-       public void testPendingRequestWithListenerAfterRecycle() throws 
Exception {
-               EventListener<Buffer> listener = spy(new 
EventListener<Buffer>() {
-                       @Override
-                       public void onEvent(Buffer buffer) {
-                               buffer.recycle();
-                       }
-               });
+       public void testPendingRequestWithListenersAfterRecycle() throws 
Exception {
+               BufferListener twoTimesListener = createBufferListener(2);
+               BufferListener oneTimeListener = createBufferListener(1);
 
-               localBufferPool.setNumBuffers(1);
+               localBufferPool.setNumBuffers(2);
 
-               Buffer available = localBufferPool.requestBuffer();
-               Buffer unavailable = localBufferPool.requestBuffer();
+               Buffer available1 = localBufferPool.requestBuffer();
+               Buffer available2 = localBufferPool.requestBuffer();
 
-               assertNull(unavailable);
+               assertNull(localBufferPool.requestBuffer());
 
-               assertTrue(localBufferPool.addListener(listener));
+               assertTrue(localBufferPool.addBufferListener(twoTimesListener));
+               assertTrue(localBufferPool.addBufferListener(oneTimeListener));
 
-               available.recycle();
+               // Recycle the first buffer to notify both of the above 
listeners once
+               // and the twoTimesListener will be added into the 
registeredListeners
+               // queue of buffer pool again
+               available1.recycle();
+               
+               verify(oneTimeListener, 
times(1)).notifyBufferAvailable(any(Buffer.class));
+               verify(twoTimesListener, 
times(1)).notifyBufferAvailable(any(Buffer.class));
 
-               verify(listener, times(1)).onEvent(Matchers.any(Buffer.class));
+               // Recycle the second buffer to only notify the twoTimesListener
+               available2.recycle();
+
+               verify(oneTimeListener, 
times(1)).notifyBufferAvailable(any(Buffer.class));
+               verify(twoTimesListener, 
times(2)).notifyBufferAvailable(any(Buffer.class));
        }
 
        @Test
        @SuppressWarnings("unchecked")
        public void testCancelPendingRequestsAfterDestroy() throws IOException {
-               EventListener<Buffer> listener = 
Mockito.mock(EventListener.class);
+               BufferListener listener = Mockito.mock(BufferListener.class);
 
                localBufferPool.setNumBuffers(1);
 
@@ -244,13 +250,13 @@ public class LocalBufferPoolTest {
 
                assertNull(unavailable);
 
-               localBufferPool.addListener(listener);
+               localBufferPool.addBufferListener(listener);
 
                localBufferPool.lazyDestroy();
 
                available.recycle();
 
-               verify(listener, times(1)).onEvent(null);
+               verify(listener, times(1)).notifyBufferDestroyed();
        }
 
        // 
------------------------------------------------------------------------
@@ -396,6 +402,23 @@ public class LocalBufferPoolTest {
                return networkBufferPool.getTotalNumberOfMemorySegments() - 
networkBufferPool.getNumberOfAvailableMemorySegments();
        }
 
+       private BufferListener createBufferListener(int notificationTimes) {
+               return spy(new BufferListener() {
+                       int times = 0;
+
+                       @Override
+                       public boolean notifyBufferAvailable(Buffer buffer) {
+                               times++;
+                               buffer.recycle();
+                               return times < notificationTimes;
+                       }
+
+                       @Override
+                       public void notifyBufferDestroyed() {
+                       }
+               });
+       }
+
        private static class BufferRequesterTask implements Callable<Boolean> {
 
                private final BufferProvider bufferProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index f3f6feb..e1e5bd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
@@ -30,7 +31,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
@@ -74,7 +74,7 @@ public class PartitionRequestClientHandlerTest {
                final BufferProvider bufferProvider = 
mock(BufferProvider.class);
                when(bufferProvider.requestBuffer()).thenReturn(null);
                when(bufferProvider.isDestroyed()).thenReturn(true);
-               
when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
+               
when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false);
 
                final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
                when(inputChannel.getInputChannelId()).thenReturn(new 
InputChannelID());
@@ -179,14 +179,14 @@ public class PartitionRequestClientHandlerTest {
                PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
                EmbeddedChannel channel = new EmbeddedChannel(handler);
 
-               final AtomicReference<EventListener<Buffer>> listener = new 
AtomicReference<>();
+               final AtomicReference<BufferListener> listener = new 
AtomicReference<>();
 
                BufferProvider bufferProvider = mock(BufferProvider.class);
-               
when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new 
Answer<Boolean>() {
+               
when(bufferProvider.addBufferListener(any(BufferListener.class))).thenAnswer(new
 Answer<Boolean>() {
                        @Override
                        @SuppressWarnings("unchecked")
                        public Boolean answer(InvocationOnMock invocation) 
throws Throwable {
-                               listener.set((EventListener<Buffer>) 
invocation.getArguments()[0]);
+                               listener.set((BufferListener) 
invocation.getArguments()[0]);
                                return true;
                        }
                });
@@ -221,11 +221,11 @@ public class PartitionRequestClientHandlerTest {
 
                // Notify about buffer => handle 1st msg
                Buffer availableBuffer = createBuffer(false);
-               listener.get().onEvent(availableBuffer);
+               listener.get().notifyBufferAvailable(availableBuffer);
 
                // Start processing of staged buffers (in run pending tasks). 
Make
                // sure that the buffer provider acts like it's destroyed.
-               
when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
+               
when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false);
                when(bufferProvider.isDestroyed()).thenReturn(true);
 
                // Execute all tasks that are scheduled in the event loop. 
Further

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
index 976e63d..ad40a54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -51,7 +51,7 @@ public class TestInfiniteBufferProvider implements 
BufferProvider {
        }
 
        @Override
-       public boolean addListener(EventListener<Buffer> listener) {
+       public boolean addBufferListener(BufferListener listener) {
                return false;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index d7e9643..c354eeb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.util.event.EventListener;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
 
@@ -91,7 +91,7 @@ public class TestPooledBufferProvider implements 
BufferProvider {
        }
 
        @Override
-       public boolean addListener(EventListener<Buffer> listener) {
+       public boolean addBufferListener(BufferListener listener) {
                return bufferRecycler.registerListener(listener);
        }
 
@@ -115,7 +115,7 @@ public class TestPooledBufferProvider implements 
BufferProvider {
 
                private final Queue<Buffer> buffers;
 
-               private final ConcurrentLinkedQueue<EventListener<Buffer>> 
registeredListeners =
+               private final ConcurrentLinkedQueue<BufferListener> 
registeredListeners =
                                Queues.newConcurrentLinkedQueue();
 
                public PooledBufferProviderRecycler(Queue<Buffer> buffers) {
@@ -127,18 +127,18 @@ public class TestPooledBufferProvider implements 
BufferProvider {
                        synchronized (listenerRegistrationLock) {
                                final Buffer buffer = new Buffer(segment, this);
 
-                               EventListener<Buffer> listener = 
registeredListeners.poll();
+                               BufferListener listener = 
registeredListeners.poll();
 
                                if (listener == null) {
                                        buffers.add(buffer);
                                }
                                else {
-                                       listener.onEvent(buffer);
+                                       listener.notifyBufferAvailable(buffer);
                                }
                        }
                }
 
-               boolean registerListener(EventListener<Buffer> listener) {
+               boolean registerListener(BufferListener listener) {
                        synchronized (listenerRegistrationLock) {
                                if (buffers.isEmpty()) {
                                        registeredListeners.add(listener);

Reply via email to