[FLINK-7699][core] Define the BufferListener interface to replace EventlListener in BufferProvider
This closes #4735. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8706c6f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8706c6f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8706c6f4 Branch: refs/heads/master Commit: 8706c6f445fc92159d15f610500d4ae5d2c49757 Parents: d3cbba5 Author: Zhijiang <[email protected]> Authored: Wed Sep 27 18:24:15 2017 +0800 Committer: zentol <[email protected]> Committed: Tue Oct 10 16:53:20 2017 +0200 ---------------------------------------------------------------------- .../io/network/buffer/BufferListener.java | 39 +++++++++++++ .../io/network/buffer/BufferProvider.java | 21 +++---- .../io/network/buffer/LocalBufferPool.java | 17 +++--- .../netty/PartitionRequestClientHandler.java | 61 ++++++++++---------- .../partition/consumer/RemoteInputChannel.java | 53 ++++++++++++++++- .../io/network/buffer/LocalBufferPoolTest.java | 61 ++++++++++++++------ .../PartitionRequestClientHandlerTest.java | 14 ++--- .../util/TestInfiniteBufferProvider.java | 4 +- .../network/util/TestPooledBufferProvider.java | 12 ++-- 9 files changed, 196 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java new file mode 100644 index 0000000..05b4156 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferListener.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.buffer; + +/** + * Interface of the availability of buffers. Listeners can opt for a one-time only + * notification or to be notified repeatedly. + */ +public interface BufferListener { + + /** + * Notification callback if a buffer is recycled and becomes available in buffer pool. + * + * @param buffer buffer that becomes available in buffer pool. + * @return true if the listener wants to be notified next time. + */ + boolean notifyBufferAvailable(Buffer buffer); + + /** + * Notification callback if the buffer provider is destroyed. + */ + void notifyBufferDestroyed(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java index c3373fa..9782584 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java @@ -18,43 +18,38 @@ package org.apache.flink.runtime.io.network.buffer; -import org.apache.flink.runtime.util.event.EventListener; - import java.io.IOException; /** * A buffer provider to request buffers from in a synchronous or asynchronous fashion. * - * <p> The data producing side (result partition writers) request buffers in a synchronous fashion, + * <p>The data producing side (result partition writers) request buffers in a synchronous fashion, * whereas the input side requests asynchronously. */ public interface BufferProvider { /** * Returns a {@link Buffer} instance from the buffer provider, if one is available. - * <p> - * Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed. + * + * <p>Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed. */ Buffer requestBuffer() throws IOException; /** * Returns a {@link Buffer} instance from the buffer provider. - * <p> - * If there is no buffer available, the call will block until one becomes available again or the + * + * <p>If there is no buffer available, the call will block until one becomes available again or the * buffer provider has been destroyed. */ Buffer requestBufferBlocking() throws IOException, InterruptedException; /** * Adds a buffer availability listener to the buffer provider. - * <p> - * The operation fails with return value <code>false</code>, when there is a buffer available or + * + * <p>The operation fails with return value <code>false</code>, when there is a buffer available or * the buffer provider has been destroyed. - * <p> - * If the buffer provider gets destroyed while the listener is registered the listener will be - * notified with a <code>null</code> value. */ - boolean addListener(EventListener<Buffer> listener); + boolean addBufferListener(BufferListener listener); /** * Returns whether the buffer provider has been destroyed. http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index b485fd1..a66373c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -19,13 +19,11 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.util.event.EventListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayDeque; -import java.util.Queue; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,7 +62,7 @@ class LocalBufferPool implements BufferPool { * Buffer availability listeners, which need to be notified when a Buffer becomes available. * Listeners can only be registered at a time/state where no Buffer instance was available. */ - private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>(); + private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>(); /** Maximum number of network buffers to allocate. */ private final int maxNumberOfMemorySegments; @@ -239,7 +237,7 @@ class LocalBufferPool implements BufferPool { returnMemorySegment(segment); } else { - EventListener<Buffer> listener = registeredListeners.poll(); + BufferListener listener = registeredListeners.poll(); if (listener == null) { availableMemorySegments.add(segment); @@ -247,7 +245,10 @@ class LocalBufferPool implements BufferPool { } else { try { - listener.onEvent(new Buffer(segment, this)); + boolean needMoreBuffers = listener.notifyBufferAvailable(new Buffer(segment, this)); + if (needMoreBuffers) { + registeredListeners.add(listener); + } } catch (Throwable ignored) { availableMemorySegments.add(segment); @@ -270,9 +271,9 @@ class LocalBufferPool implements BufferPool { returnMemorySegment(segment); } - EventListener<Buffer> listener; + BufferListener listener; while ((listener = registeredListeners.poll()) != null) { - listener.onEvent(null); + listener.notifyBufferDestroyed(); } isDestroyed = true; @@ -283,7 +284,7 @@ class LocalBufferPool implements BufferPool { } @Override - public boolean addListener(EventListener<Buffer> listener) { + public boolean addBufferListener(BufferListener listener) { synchronized (availableMemorySegments) { if (!availableMemorySegments.isEmpty() || isDestroyed) { return false; http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index e3097ba..566b215 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException; @@ -29,7 +30,6 @@ import org.apache.flink.runtime.io.network.netty.exception.TransportException; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; @@ -347,15 +347,15 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { /** * A buffer availability listener, which subscribes/unsubscribes the NIO * read event. - * <p> - * If no buffer is available, the channel read event will be unsubscribed + * + * <p>If no buffer is available, the channel read event will be unsubscribed * until one becomes available again. - * <p> - * After a buffer becomes available again, the buffer is handed over by - * the thread calling {@link #onEvent(Buffer)} to the network I/O + * + * <p>After a buffer becomes available again, the buffer is handed over by + * the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O * thread, which then continues the processing of the staged buffer. */ - private class BufferListenerTask implements EventListener<Buffer>, Runnable { + private class BufferListenerTask implements BufferListener, Runnable { private final AtomicReference<Buffer> availableBuffer = new AtomicReference<Buffer>(); @@ -365,7 +365,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { stagedBufferResponse = bufferResponse; - if (bufferProvider.addListener(this)) { + if (bufferProvider.addBufferListener(this)) { if (ctx.channel().config().isAutoRead()) { ctx.channel().config().setAutoRead(false); } @@ -383,34 +383,33 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { return stagedBufferResponse != null; } + public void notifyBufferDestroyed() { + // The buffer pool has been destroyed + stagedBufferResponse = null; + + if (stagedMessages.isEmpty()) { + ctx.channel().config().setAutoRead(true); + ctx.channel().read(); + } + else { + ctx.channel().eventLoop().execute(stagedMessagesHandler); + } + } + // Called by the recycling thread (not network I/O thread) @Override - public void onEvent(Buffer buffer) { + public boolean notifyBufferAvailable(Buffer buffer) { boolean success = false; try { - if (buffer != null) { - if (availableBuffer.compareAndSet(null, buffer)) { - ctx.channel().eventLoop().execute(this); + if (availableBuffer.compareAndSet(null, buffer)) { + ctx.channel().eventLoop().execute(this); - success = true; - } - else { - throw new IllegalStateException("Received a buffer notification, " + - " but the previous one has not been handled yet."); - } + success = true; } else { - // The buffer pool has been destroyed - stagedBufferResponse = null; - - if (stagedMessages.isEmpty()) { - ctx.channel().config().setAutoRead(true); - ctx.channel().read(); - } - else { - ctx.channel().eventLoop().execute(stagedMessagesHandler); - } + throw new IllegalStateException("Received a buffer notification, " + + " but the previous one has not been handled yet."); } } catch (Throwable t) { @@ -423,12 +422,14 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter { } } } + + return false; } /** * Continues the decoding of a staged buffer after a buffer has become available again. - * <p> - * This task is executed by the network I/O thread. + * + * <p>This task is executed by the network I/O thread. */ @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index ee6bfda..4e1eaef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; @@ -46,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * An input channel, which requests a remote partition queue. */ -public class RemoteInputChannel extends InputChannel implements BufferRecycler { +public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener { /** ID to distinguish this channel from other channels sharing the same TCP connection. */ private final InputChannelID id = new InputChannelID(); @@ -87,6 +88,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler { /** The number of available buffers that have not been announced to the producer yet. */ private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + /** The number of unsent buffers in the producer's sub partition. */ + private final AtomicInteger senderBacklog = new AtomicInteger(0); + + /** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */ + private final AtomicBoolean isWaitingForFloatingBuffers = new AtomicBoolean(false); + public RemoteInputChannel( SingleInputGate inputGate, int channelIndex, @@ -313,6 +320,50 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler { } } + /** + * The Buffer pool notifies this channel of an available floating buffer. If the channel is released or + * currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, + * the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is + * increased by one. + * + * @param buffer Buffer that becomes available in buffer pool. + * @return True when this channel is waiting for more floating buffers, otherwise false. + */ + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + checkState(isWaitingForFloatingBuffers.get(), "This channel should be waiting for floating buffers."); + + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + if (isReleased.get() || availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + buffer.recycle(); + + return false; + } + + availableBuffers.add(buffer); + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + + if (availableBuffers.size() >= senderBacklog.get()) { + isWaitingForFloatingBuffers.set(false); + return false; + } else { + return true; + } + } + } + + @Override + public void notifyBufferDestroyed() { + if (!isWaitingForFloatingBuffers.compareAndSet(true, false)) { + throw new IllegalStateException("This channel should be waiting for floating buffers currently."); + } + } + // ------------------------------------------------------------------------ // Network I/O notifications (called by network I/O thread) // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index 03f82d8..7a309d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.buffer; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; @@ -27,7 +26,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; -import org.mockito.Matchers; import org.mockito.Mockito; import java.io.IOException; @@ -46,6 +44,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; @@ -210,32 +209,39 @@ public class LocalBufferPoolTest { // ------------------------------------------------------------------------ @Test - public void testPendingRequestWithListenerAfterRecycle() throws Exception { - EventListener<Buffer> listener = spy(new EventListener<Buffer>() { - @Override - public void onEvent(Buffer buffer) { - buffer.recycle(); - } - }); + public void testPendingRequestWithListenersAfterRecycle() throws Exception { + BufferListener twoTimesListener = createBufferListener(2); + BufferListener oneTimeListener = createBufferListener(1); - localBufferPool.setNumBuffers(1); + localBufferPool.setNumBuffers(2); - Buffer available = localBufferPool.requestBuffer(); - Buffer unavailable = localBufferPool.requestBuffer(); + Buffer available1 = localBufferPool.requestBuffer(); + Buffer available2 = localBufferPool.requestBuffer(); - assertNull(unavailable); + assertNull(localBufferPool.requestBuffer()); - assertTrue(localBufferPool.addListener(listener)); + assertTrue(localBufferPool.addBufferListener(twoTimesListener)); + assertTrue(localBufferPool.addBufferListener(oneTimeListener)); - available.recycle(); + // Recycle the first buffer to notify both of the above listeners once + // and the twoTimesListener will be added into the registeredListeners + // queue of buffer pool again + available1.recycle(); + + verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class)); + verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class)); - verify(listener, times(1)).onEvent(Matchers.any(Buffer.class)); + // Recycle the second buffer to only notify the twoTimesListener + available2.recycle(); + + verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class)); + verify(twoTimesListener, times(2)).notifyBufferAvailable(any(Buffer.class)); } @Test @SuppressWarnings("unchecked") public void testCancelPendingRequestsAfterDestroy() throws IOException { - EventListener<Buffer> listener = Mockito.mock(EventListener.class); + BufferListener listener = Mockito.mock(BufferListener.class); localBufferPool.setNumBuffers(1); @@ -244,13 +250,13 @@ public class LocalBufferPoolTest { assertNull(unavailable); - localBufferPool.addListener(listener); + localBufferPool.addBufferListener(listener); localBufferPool.lazyDestroy(); available.recycle(); - verify(listener, times(1)).onEvent(null); + verify(listener, times(1)).notifyBufferDestroyed(); } // ------------------------------------------------------------------------ @@ -396,6 +402,23 @@ public class LocalBufferPoolTest { return networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(); } + private BufferListener createBufferListener(int notificationTimes) { + return spy(new BufferListener() { + int times = 0; + + @Override + public boolean notifyBufferAvailable(Buffer buffer) { + times++; + buffer.recycle(); + return times < notificationTimes; + } + + @Override + public void notifyBufferDestroyed() { + } + }); + } + private static class BufferRequesterTask implements Callable<Boolean> { private final BufferProvider bufferProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java index f3f6feb..e1e5bd3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.netty; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse; import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse; @@ -30,7 +31,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.io.network.util.TestBufferFactory; import org.apache.flink.runtime.testutils.DiscardingRecycler; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; @@ -74,7 +74,7 @@ public class PartitionRequestClientHandlerTest { final BufferProvider bufferProvider = mock(BufferProvider.class); when(bufferProvider.requestBuffer()).thenReturn(null); when(bufferProvider.isDestroyed()).thenReturn(true); - when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false); + when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false); final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID()); @@ -179,14 +179,14 @@ public class PartitionRequestClientHandlerTest { PartitionRequestClientHandler handler = new PartitionRequestClientHandler(); EmbeddedChannel channel = new EmbeddedChannel(handler); - final AtomicReference<EventListener<Buffer>> listener = new AtomicReference<>(); + final AtomicReference<BufferListener> listener = new AtomicReference<>(); BufferProvider bufferProvider = mock(BufferProvider.class); - when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new Answer<Boolean>() { + when(bufferProvider.addBufferListener(any(BufferListener.class))).thenAnswer(new Answer<Boolean>() { @Override @SuppressWarnings("unchecked") public Boolean answer(InvocationOnMock invocation) throws Throwable { - listener.set((EventListener<Buffer>) invocation.getArguments()[0]); + listener.set((BufferListener) invocation.getArguments()[0]); return true; } }); @@ -221,11 +221,11 @@ public class PartitionRequestClientHandlerTest { // Notify about buffer => handle 1st msg Buffer availableBuffer = createBuffer(false); - listener.get().onEvent(availableBuffer); + listener.get().notifyBufferAvailable(availableBuffer); // Start processing of staged buffers (in run pending tasks). Make // sure that the buffer provider acts like it's destroyed. - when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false); + when(bufferProvider.addBufferListener(any(BufferListener.class))).thenReturn(false); when(bufferProvider.isDestroyed()).thenReturn(true); // Execute all tasks that are scheduled in the event loop. Further http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java index 976e63d..ad40a54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestInfiniteBufferProvider.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.util.event.EventListener; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedQueue; @@ -51,7 +51,7 @@ public class TestInfiniteBufferProvider implements BufferProvider { } @Override - public boolean addListener(EventListener<Buffer> listener) { + public boolean addBufferListener(BufferListener listener) { return false; } http://git-wip-us.apache.org/repos/asf/flink/blob/8706c6f4/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java index d7e9643..c354eeb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java @@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.util; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferListener; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; -import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.shaded.guava18.com.google.common.collect.Queues; @@ -91,7 +91,7 @@ public class TestPooledBufferProvider implements BufferProvider { } @Override - public boolean addListener(EventListener<Buffer> listener) { + public boolean addBufferListener(BufferListener listener) { return bufferRecycler.registerListener(listener); } @@ -115,7 +115,7 @@ public class TestPooledBufferProvider implements BufferProvider { private final Queue<Buffer> buffers; - private final ConcurrentLinkedQueue<EventListener<Buffer>> registeredListeners = + private final ConcurrentLinkedQueue<BufferListener> registeredListeners = Queues.newConcurrentLinkedQueue(); public PooledBufferProviderRecycler(Queue<Buffer> buffers) { @@ -127,18 +127,18 @@ public class TestPooledBufferProvider implements BufferProvider { synchronized (listenerRegistrationLock) { final Buffer buffer = new Buffer(segment, this); - EventListener<Buffer> listener = registeredListeners.poll(); + BufferListener listener = registeredListeners.poll(); if (listener == null) { buffers.add(buffer); } else { - listener.onEvent(buffer); + listener.notifyBufferAvailable(buffer); } } } - boolean registerListener(EventListener<Buffer> listener) { + boolean registerListener(BufferListener listener) { synchronized (listenerRegistrationLock) { if (buffers.isEmpty()) { registeredListeners.add(listener);
