This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9554871aaee87a3bd5e528123eca50aa212749af Author: Murtadha Hubail <mhub...@apache.org> AuthorDate: Fri Aug 16 13:53:25 2019 -0700 [NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic - user model changes: no - storage format changes: no - interface changes: no Details: - To avoid synchronization issues that might occur due to JVM reordering, ensure that both recycling read buffers and notifying the sender of their availability are done atomically before the next buffer is received from the sender. Change-Id: Ia3b1920f33bf7d4e7efbd2ea3405cbc4310a78c7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3520 Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Till Westmann <ti...@apache.org> (cherry picked from commit 32eed5f384c5851eae1c613fcb3b9532744ed595) Reviewed-on: https://asterix-gerrit.ics.uci.edu/3525 Reviewed-by: Michael Blow <mb...@apache.org> --- .../muxdemux/FullFrameChannelReadInterface.java | 90 +++++++++++----------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java index 3ba8627..53be212 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java @@ -21,8 +21,8 @@ package org.apache.hyracks.net.protocols.muxdemux; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.ArrayDeque; +import java.util.Deque; import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelControlBlock; @@ -33,65 +33,69 @@ import org.apache.logging.log4j.Logger; public class FullFrameChannelReadInterface extends AbstractChannelReadInterface { private static final Logger LOGGER = LogManager.getLogger(); - private final BlockingDeque<ByteBuffer> riEmptyStack; + private final Deque<ByteBuffer> riEmptyStack; private final IChannelControlBlock ccb; + private final Object bufferRecycleLock = new Object(); public FullFrameChannelReadInterface(IChannelControlBlock ccb) { this.ccb = ccb; - riEmptyStack = new LinkedBlockingDeque<>(); + riEmptyStack = new ArrayDeque<>(); credits = 0; - emptyBufferAcceptor = buffer -> { - if (ccb.isRemotelyClosed()) { - return; - } final int delta = buffer.remaining(); - riEmptyStack.push(buffer); - ccb.addPendingCredits(delta); + synchronized (bufferRecycleLock) { + if (ccb.isRemotelyClosed()) { + return; + } + riEmptyStack.push(buffer); + ccb.addPendingCredits(delta); + } }; } @Override public int read(SocketChannel sc, int size) throws IOException, NetException { - while (true) { - if (size <= 0) { - return size; - } - if (currentReadBuffer == null) { - currentReadBuffer = riEmptyStack.poll(); - //if current buffer == null and limit not reached - // factory.createBuffer factory - if (currentReadBuffer == null) { - currentReadBuffer = bufferFactory.createBuffer(); + synchronized (bufferRecycleLock) { + while (true) { + if (size <= 0) { + return size; } - } - if (currentReadBuffer == null) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size()); + if (currentReadBuffer == null) { + currentReadBuffer = riEmptyStack.poll(); + //if current buffer == null and limit not reached + // factory.createBuffer factory + if (currentReadBuffer == null) { + currentReadBuffer = bufferFactory.createBuffer(); + } } - throw new IllegalStateException(ccb + " read buffers exceeded"); - } - int rSize = Math.min(size, currentReadBuffer.remaining()); - if (rSize > 0) { - currentReadBuffer.limit(currentReadBuffer.position() + rSize); - int len; - try { - len = sc.read(currentReadBuffer); - if (len < 0) { - throw new NetException("Socket Closed"); + if (currentReadBuffer == null) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("{} read buffers exceeded. Current empty buffers: {}", ccb, riEmptyStack.size()); } - } finally { - currentReadBuffer.limit(currentReadBuffer.capacity()); + throw new IllegalStateException(ccb + " read buffers exceeded"); } - size -= len; - if (len < rSize) { + int rSize = Math.min(size, currentReadBuffer.remaining()); + if (rSize > 0) { + currentReadBuffer.limit(currentReadBuffer.position() + rSize); + int len; + try { + len = sc.read(currentReadBuffer); + if (len < 0) { + throw new NetException("Socket Closed"); + } + } finally { + currentReadBuffer.limit(currentReadBuffer.capacity()); + } + size -= len; + if (len < rSize) { + return size; + } + } else { return size; } - } else { - return size; - } - if (currentReadBuffer.remaining() <= 0) { - flush(); + if (currentReadBuffer.remaining() <= 0) { + flush(); + } } } }