This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 9554871aaee87a3bd5e528123eca50aa212749af
Author: Murtadha Hubail <mhub...@apache.org>
AuthorDate: Fri Aug 16 13:53:25 2019 -0700

    [NO ISSUE][NET] Ensure Recycling Buffer and Notifying Sender is Atomic
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - To avoid synchronization issues that might occur
      due to JVM reordering, ensure that both recycling
      read buffers and notifying the sender  of their
      availability are done atomically before the next
      buffer is received from the sender.
    
    Change-Id: Ia3b1920f33bf7d4e7efbd2ea3405cbc4310a78c7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3520
    Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
    (cherry picked from commit 32eed5f384c5851eae1c613fcb3b9532744ed595)
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3525
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 .../muxdemux/FullFrameChannelReadInterface.java    | 90 +++++++++++-----------
 1 file changed, 47 insertions(+), 43 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..53be212 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -21,8 +21,8 @@ package org.apache.hyracks.net.protocols.muxdemux;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.LinkedBlockingDeque;
+import java.util.ArrayDeque;
+import java.util.Deque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -33,65 +33,69 @@ import org.apache.logging.log4j.Logger;
 public class FullFrameChannelReadInterface extends 
AbstractChannelReadInterface {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private final BlockingDeque<ByteBuffer> riEmptyStack;
+    private final Deque<ByteBuffer> riEmptyStack;
     private final IChannelControlBlock ccb;
+    private final Object bufferRecycleLock = new Object();
 
     public FullFrameChannelReadInterface(IChannelControlBlock ccb) {
         this.ccb = ccb;
-        riEmptyStack = new LinkedBlockingDeque<>();
+        riEmptyStack = new ArrayDeque<>();
         credits = 0;
-
         emptyBufferAcceptor = buffer -> {
-            if (ccb.isRemotelyClosed()) {
-                return;
-            }
             final int delta = buffer.remaining();
-            riEmptyStack.push(buffer);
-            ccb.addPendingCredits(delta);
+            synchronized (bufferRecycleLock) {
+                if (ccb.isRemotelyClosed()) {
+                    return;
+                }
+                riEmptyStack.push(buffer);
+                ccb.addPendingCredits(delta);
+            }
         };
     }
 
     @Override
     public int read(SocketChannel sc, int size) throws IOException, 
NetException {
-        while (true) {
-            if (size <= 0) {
-                return size;
-            }
-            if (currentReadBuffer == null) {
-                currentReadBuffer = riEmptyStack.poll();
-                //if current buffer == null and limit not reached
-                // factory.createBuffer factory
-                if (currentReadBuffer == null) {
-                    currentReadBuffer = bufferFactory.createBuffer();
+        synchronized (bufferRecycleLock) {
+            while (true) {
+                if (size <= 0) {
+                    return size;
                 }
-            }
-            if (currentReadBuffer == null) {
-                if (LOGGER.isWarnEnabled()) {
-                    LOGGER.warn("{} read buffers exceeded. Current empty 
buffers: {}", ccb, riEmptyStack.size());
+                if (currentReadBuffer == null) {
+                    currentReadBuffer = riEmptyStack.poll();
+                    //if current buffer == null and limit not reached
+                    // factory.createBuffer factory
+                    if (currentReadBuffer == null) {
+                        currentReadBuffer = bufferFactory.createBuffer();
+                    }
                 }
-                throw new IllegalStateException(ccb + " read buffers 
exceeded");
-            }
-            int rSize = Math.min(size, currentReadBuffer.remaining());
-            if (rSize > 0) {
-                currentReadBuffer.limit(currentReadBuffer.position() + rSize);
-                int len;
-                try {
-                    len = sc.read(currentReadBuffer);
-                    if (len < 0) {
-                        throw new NetException("Socket Closed");
+                if (currentReadBuffer == null) {
+                    if (LOGGER.isWarnEnabled()) {
+                        LOGGER.warn("{} read buffers exceeded. Current empty 
buffers: {}", ccb, riEmptyStack.size());
                     }
-                } finally {
-                    currentReadBuffer.limit(currentReadBuffer.capacity());
+                    throw new IllegalStateException(ccb + " read buffers 
exceeded");
                 }
-                size -= len;
-                if (len < rSize) {
+                int rSize = Math.min(size, currentReadBuffer.remaining());
+                if (rSize > 0) {
+                    currentReadBuffer.limit(currentReadBuffer.position() + 
rSize);
+                    int len;
+                    try {
+                        len = sc.read(currentReadBuffer);
+                        if (len < 0) {
+                            throw new NetException("Socket Closed");
+                        }
+                    } finally {
+                        currentReadBuffer.limit(currentReadBuffer.capacity());
+                    }
+                    size -= len;
+                    if (len < rSize) {
+                        return size;
+                    }
+                } else {
                     return size;
                 }
-            } else {
-                return size;
-            }
-            if (currentReadBuffer.remaining() <= 0) {
-                flush();
+                if (currentReadBuffer.remaining() <= 0) {
+                    flush();
+                }
             }
         }
     }

Reply via email to