Repository: flink Updated Branches: refs/heads/master a803dc7e7 -> 8706c6f44
[FLINK-7394][core] Manage exclusive buffers in RemoteInputChannel This closes #4499. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3cbba5a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3cbba5a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3cbba5a Branch: refs/heads/master Commit: d3cbba5ab4be498526ae028c2b3a5d5b8dfe4bbd Parents: 064a1e6 Author: Zhijiang <[email protected]> Authored: Mon Aug 14 14:30:47 2017 +0800 Committer: zentol <[email protected]> Committed: Tue Oct 10 16:53:19 2017 +0200 ---------------------------------------------------------------------- .../partition/consumer/RemoteInputChannel.java | 107 ++++++++++++++++++- .../consumer/RemoteInputChannelTest.java | 79 ++++++++++++++ 2 files changed, 182 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3cbba5a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 58c9484..ee6bfda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -24,23 +24,29 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; +import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.util.ArrayDeque; import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; /** * An input channel, which requests a remote partition queue. */ -public class RemoteInputChannel extends InputChannel { +public class RemoteInputChannel extends InputChannel implements BufferRecycler { /** ID to distinguish this channel from other channels sharing the same TCP connection. */ private final InputChannelID id = new InputChannelID(); @@ -72,6 +78,15 @@ public class RemoteInputChannel extends InputChannel { */ private int expectedSequenceNumber = 0; + /** The initial number of exclusive buffers assigned to this channel. */ + private int initialCredit; + + /** The current available buffers including both exclusive buffers and requested floating buffers. */ + private final ArrayDeque<Buffer> availableBuffers = new ArrayDeque<>(); + + /** The number of available buffers that have not been announced to the producer yet. */ + private final AtomicInteger unannouncedCredit = new AtomicInteger(0); + public RemoteInputChannel( SingleInputGate inputGate, int channelIndex, @@ -99,8 +114,24 @@ public class RemoteInputChannel extends InputChannel { this.connectionManager = checkNotNull(connectionManager); } + /** + * Assigns exclusive buffers to this input channel, and this method should be called only once + * after this input channel is created. + */ void assignExclusiveSegments(List<MemorySegment> segments) { - // TODO in next PR + checkState(this.initialCredit == 0, "Bug in input channel setup logic: exclusive buffers have " + + "already been set for this input channel."); + + checkNotNull(segments); + checkArgument(segments.size() > 0, "The number of exclusive buffers per channel should be larger than 0."); + + this.initialCredit = segments.size(); + + synchronized(availableBuffers) { + for (MemorySegment segment : segments) { + availableBuffers.add(new Buffer(segment, this)); + } + } } // ------------------------------------------------------------------------ @@ -183,18 +214,41 @@ public class RemoteInputChannel extends InputChannel { } /** - * Releases all received buffers and closes the partition request client. + * Releases all exclusive and floating buffers, closes the partition request client. */ @Override void releaseAllResources() throws IOException { if (isReleased.compareAndSet(false, true)) { + + // Gather all exclusive buffers and recycle them to global pool in batch + final List<MemorySegment> exclusiveRecyclingSegments = new ArrayList<>(); + synchronized (receivedBuffers) { Buffer buffer; while ((buffer = receivedBuffers.poll()) != null) { - buffer.recycle(); + if (buffer.getRecycler() == this) { + exclusiveRecyclingSegments.add(buffer.getMemorySegment()); + } else { + buffer.recycle(); + } + } + } + + synchronized (availableBuffers) { + Buffer buffer; + while ((buffer = availableBuffers.poll()) != null) { + if (buffer.getRecycler() == this) { + exclusiveRecyclingSegments.add(buffer.getMemorySegment()); + } else { + buffer.recycle(); + } } } + if (exclusiveRecyclingSegments.size() > 0) { + inputGate.returnExclusiveSegments(exclusiveRecyclingSegments); + } + // The released flag has to be set before closing the connection to ensure that // buffers received concurrently with closing are properly recycled. if (partitionRequestClient != null) { @@ -215,6 +269,51 @@ public class RemoteInputChannel extends InputChannel { } // ------------------------------------------------------------------------ + // Credit-based + // ------------------------------------------------------------------------ + + /** + * Enqueue this input channel in the pipeline for sending unannounced credits to producer. + */ + void notifyCreditAvailable() { + //TODO in next PR + } + + /** + * Exclusive buffer is recycled to this input channel directly and it may trigger notify + * credit to producer. + * + * @param segment The exclusive segment of this channel. + */ + @Override + public void recycle(MemorySegment segment) { + synchronized (availableBuffers) { + // Important: the isReleased check should be inside the synchronized block. + // that way the segment can also be returned to global pool after added into + // the available queue during releasing all resources. + if (isReleased.get()) { + try { + inputGate.returnExclusiveSegments(Arrays.asList(segment)); + return; + } catch (Throwable t) { + ExceptionUtils.rethrow(t); + } + } + availableBuffers.add(new Buffer(segment, this)); + } + + if (unannouncedCredit.getAndAdd(1) == 0) { + notifyCreditAvailable(); + } + } + + public int getNumberOfAvailableBuffers() { + synchronized (availableBuffers) { + return availableBuffers.size(); + } + } + + // ------------------------------------------------------------------------ // Network I/O notifications (called by network I/O thread) // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d3cbba5a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 08bf5eb..bced9ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.ConnectionManager; @@ -34,6 +36,7 @@ import org.junit.Test; import scala.Tuple2; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -45,8 +48,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -295,6 +300,80 @@ public class RemoteInputChannelTest { ch.getNextBuffer(); } + /** + * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is + * recycled to available buffers directly and it triggers notify of announced credit. + */ + @Test + public void testRecycleExclusiveBufferBeforeReleased() throws Exception { + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + // Recycle exclusive segment + inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should be one buffer available after recycle.", + 1, inputChannel.getNumberOfAvailableBuffers()); + verify(inputChannel, times(1)).notifyCreditAvailable(); + + inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("There should be two buffers available after recycle.", + 2, inputChannel.getNumberOfAvailableBuffers()); + // It should be called only once when increased from zero. + verify(inputChannel, times(1)).notifyCreditAvailable(); + } + + /** + * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying the exclusive segment is + * recycled to global pool via input gate when channel is released. + */ + @Test + public void testRecycleExclusiveBufferAfterReleased() throws Exception { + // Setup + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = spy(createRemoteInputChannel(inputGate)); + + inputChannel.releaseAllResources(); + + // Recycle exclusive segment after channel released + inputChannel.recycle(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + + assertEquals("Resource leak during recycling buffer after channel is released.", + 0, inputChannel.getNumberOfAvailableBuffers()); + verify(inputChannel, times(0)).notifyCreditAvailable(); + verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); + } + + /** + * Tests {@link RemoteInputChannel#releaseAllResources()}, verifying the exclusive segments are + * recycled to global pool via input gate and no resource leak. + */ + @Test + public void testReleaseExclusiveBuffers() throws Exception { + // Setup + final SingleInputGate inputGate = mock(SingleInputGate.class); + final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate); + + // Assign exclusive segments to channel + final List<MemorySegment> exclusiveSegments = new ArrayList<>(); + final int numExclusiveBuffers = 2; + for (int i = 0; i < numExclusiveBuffers; i++) { + exclusiveSegments.add(MemorySegmentFactory.allocateUnpooledSegment(1024, inputChannel)); + } + inputChannel.assignExclusiveSegments(exclusiveSegments); + + assertEquals("The number of available buffers is not equal to the assigned amount.", + numExclusiveBuffers, inputChannel.getNumberOfAvailableBuffers()); + + // Release this channel + inputChannel.releaseAllResources(); + + assertEquals("Resource leak after channel is released.", + 0, inputChannel.getNumberOfAvailableBuffers()); + verify(inputGate, times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class)); + } + // --------------------------------------------------------------------------------------------- private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
