Repository: asterixdb Updated Branches: refs/heads/master 43e0b15bc -> f184a1e7b
[ASTERIXDB-2478][NET] Calculate Buffer Remaining Before Reusing It - user model changes: no - storage format changes: no - interface changes: no Details: - When recycling a buffer, calculate the buffer remaining before releasing it for reuse to prevent other threads from changing its remaining. - Add test case. Change-Id: Icca3284feae800dd6c37694bdefec3516cd4c506 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3036 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/821c0723 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/821c0723 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/821c0723 Branch: refs/heads/master Commit: 821c072313633f1ef052b167fd0eb679bb7c6e52 Parents: 7311b03 Author: Murtadha Hubail <[email protected]> Authored: Tue Nov 20 14:05:53 2018 +0300 Committer: Murtadha Hubail <[email protected]> Committed: Wed Nov 28 12:43:39 2018 -0800 ---------------------------------------------------------------------- hyracks-fullstack/hyracks/hyracks-net/pom.xml | 6 + .../muxdemux/FullFrameChannelReadInterface.java | 4 +- .../FullFrameChannelReadInterfaceTest.java | 180 +++++++++++++++++++ 3 files changed, 188 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/821c0723/hyracks-fullstack/hyracks/hyracks-net/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml index 1040e81..4ca20ca 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml @@ -60,5 +60,11 @@ <artifactId>hyracks-util</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>2.0.2-beta</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/821c0723/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java index 32bf77e..3ba8627 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java @@ -36,7 +36,7 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface private final BlockingDeque<ByteBuffer> riEmptyStack; private final IChannelControlBlock ccb; - FullFrameChannelReadInterface(IChannelControlBlock ccb) { + public FullFrameChannelReadInterface(IChannelControlBlock ccb) { this.ccb = ccb; riEmptyStack = new LinkedBlockingDeque<>(); credits = 0; @@ -45,8 +45,8 @@ public class FullFrameChannelReadInterface extends AbstractChannelReadInterface if (ccb.isRemotelyClosed()) { return; } - riEmptyStack.push(buffer); final int delta = buffer.remaining(); + riEmptyStack.push(buffer); ccb.addPendingCredits(delta); }; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/821c0723/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java new file mode 100644 index 0000000..f9a610c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.net.tests; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hyracks.api.comm.IBufferFactory; +import org.apache.hyracks.api.comm.IChannelControlBlock; +import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; +import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface; +import org.apache.hyracks.util.StorageUtil; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +@RunWith(Parameterized.class) +public class FullFrameChannelReadInterfaceTest { + + private static final int TEST_RUNS = 100; + private static final int RECEIVER_BUFFER_COUNT = 50; + private static int FRAMES_TO_READ_COUNT = 10000; + private static final int FRAME_SIZE = StorageUtil.getIntSizeInBytes(32, StorageUtil.StorageUnit.KILOBYTE); + private static final int EXPECTED_CHANNEL_CREDIT = FRAME_SIZE * RECEIVER_BUFFER_COUNT; + + @Parameterized.Parameters + public static Object[][] data() { + return new Object[TEST_RUNS][0]; + } + + @Test + public void bufferRecycleTest() throws Exception { + final AtomicInteger channelCredit = new AtomicInteger(); + final IChannelControlBlock ccb = mockChannelControlBlock(channelCredit); + final ReadBufferFactory bufferFactory = new ReadBufferFactory(RECEIVER_BUFFER_COUNT, FRAME_SIZE); + final FullFrameChannelReadInterface readInterface = new FullFrameChannelReadInterface(ccb); + final LinkedBlockingDeque<ByteBuffer> fullBufferQ = new LinkedBlockingDeque<>(); + readInterface.setFullBufferAcceptor(new ReadFullBufferAcceptor(fullBufferQ)); + readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, FRAME_SIZE); + Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get()); + final SocketChannel socketChannel = mockSocketChannel(ccb); + final Thread networkFrameReader = new Thread(() -> { + try { + int framesRead = FRAMES_TO_READ_COUNT; + while (framesRead > 0) { + while (channelCredit.get() == 0) { + synchronized (channelCredit) { + channelCredit.wait(10000); + if (channelCredit.get() == 0) { + System.err.println("Sender doesn't have any write credit"); + System.exit(1); + } + } + } + readInterface.read(socketChannel, FRAME_SIZE); + framesRead--; + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + + final Thread frameProcessor = new Thread(() -> { + int framesProcessed = 0; + try { + while (true) { + final ByteBuffer fullFrame = fullBufferQ.take(); + fullFrame.clear(); + readInterface.getEmptyBufferAcceptor().accept(fullFrame); + framesProcessed++; + if (framesProcessed == FRAMES_TO_READ_COUNT) { + return; + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + networkFrameReader.start(); + frameProcessor.start(); + networkFrameReader.join(); + frameProcessor.join(); + if (channelCredit.get() != EXPECTED_CHANNEL_CREDIT) { + System.err + .println("Expected channel credit " + EXPECTED_CHANNEL_CREDIT + " , found " + channelCredit.get()); + System.exit(1); + } + } + + private IChannelControlBlock mockChannelControlBlock(AtomicInteger credit) { + final ChannelControlBlock ccb = Mockito.mock(ChannelControlBlock.class); + Mockito.when(ccb.isRemotelyClosed()).thenReturn(false); + Mockito.doAnswer(invocation -> { + final Integer delta = invocation.getArgumentAt(0, Integer.class); + credit.addAndGet(delta); + synchronized (credit) { + credit.notifyAll(); + } + return null; + }).when(ccb).addPendingCredits(Mockito.anyInt()); + return ccb; + } + + private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException { + final SocketChannel sc = Mockito.mock(SocketChannel.class); + Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> { + ccb.addPendingCredits(-FRAME_SIZE); + final ByteBuffer buffer = invocation.getArgumentAt(0, ByteBuffer.class); + while (buffer.hasRemaining()) { + buffer.put((byte) 0); + } + return FRAME_SIZE; + }); + return sc; + } + + private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor { + private final BlockingQueue<ByteBuffer> fullBufferQ; + + ReadFullBufferAcceptor(BlockingQueue<ByteBuffer> fullBuffer) { + this.fullBufferQ = fullBuffer; + } + + @Override + public void accept(ByteBuffer buffer) { + fullBufferQ.add(buffer); + } + + @Override + public void close() { + } + + @Override + public void error(int ecode) { + } + } + + public class ReadBufferFactory implements IBufferFactory { + private final int limit; + private final int frameSize; + private int counter = 0; + + ReadBufferFactory(int limit, int frameSize) { + this.limit = limit; + this.frameSize = frameSize; + } + + @Override + public ByteBuffer createBuffer() { + if (counter >= limit) { + throw new IllegalStateException("Buffer limit exceeded"); + } + counter++; + return ByteBuffer.allocate(frameSize); + } + } +} \ No newline at end of file
