Author: violetagg
Date: Fri Apr 28 20:13:10 2017
New Revision: 1793147

URL: http://svn.apache.org/viewvc?rev=1793147&view=rev
Log:
Introduce new API - WsSession#suspend/WsSession#resume that can be used to 
suspend/resume reading of the incoming messages.

This closes #42

Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java
    tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
    tomcat/trunk/java/org/apache/tomcat/websocket/server/LocalStrings.properties
    tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java
    
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Fri Apr 28 
20:13:10 2017
@@ -880,6 +880,10 @@ public abstract class AbstractProtocol<S
                     if (status != SocketEvent.OPEN_WRITE) {
                         longPoll(wrapper, processor);
                     }
+                } else if (state == SocketState.SUSPENDED) {
+                    // Don't add sockets back to the poller.
+                    // The resumeProcessing() method will add this socket
+                    // to the poller.
                 } else {
                     // Connection closed. OK to recycle the processor. Upgrade
                     // processors are not recycled.

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AbstractEndpoint.java Fri Apr 
28 20:13:10 2017
@@ -68,7 +68,7 @@ public abstract class AbstractEndpoint<S
         public enum SocketState {
             // TODO Add a new state to the AsyncStateMachine and remove
             //      ASYNC_END (if possible)
-            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED
+            OPEN, CLOSED, LONG, ASYNC_END, SENDFILE, UPGRADING, UPGRADED, 
SUSPENDED
         }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Fri 
Apr 28 20:13:10 2017
@@ -52,12 +52,15 @@ util.unknownDecoderType=The Decoder type
 # frames and therefore must be 123 bytes (not characters) or less in length.
 # Messages are encoded using UTF-8 where a single character may be encoded in
 # as many as 4 bytes.
+wsFrame.alreadyResumed=Message receiving has already been resumed.
+wsFrame.alreadySuspended=Message receiving has already been suspended.
 wsFrame.bufferTooSmall=No async message support and buffer too small. Buffer 
size: [{0}], Message size: [{1}]
 wsFrame.byteToLongFail=Too many bytes ([{0}]) were provided to be converted 
into a long
 wsFrame.closed=New frame received after a close control frame
 wsFrame.controlFragmented=A fragmented control frame was received but control 
frames may not be fragmented
 wsFrame.controlPayloadTooBig=A control frame was sent with a payload of size 
[{0}] which is larger than the maximum permitted of 125 bytes
 wsFrame.controlNoFin=A control frame was sent that did not have the fin bit 
set. Control frames are not permitted to use continuation frames.
+wsFrame.illegalReadState=Unexpected read state [{0}]
 wsFrame.invalidOpCode= A WebSocket frame was sent with an unrecognised opCode 
of [{0}]
 wsFrame.invalidUtf8=A WebSocket text frame was received that could not be 
decoded to UTF-8 because it contained invalid byte sequences
 wsFrame.invalidUtf8Close=A WebSocket close frame was received with a close 
reason that contained invalid UTF-8 byte sequences
@@ -68,6 +71,7 @@ wsFrame.notMasked=The client frame was n
 wsFrame.oneByteCloseCode=The client sent a close frame with a single byte 
payload which is not valid
 wsFrame.partialHeaderComplete=WebSocket frame received. fin [{0}], rsv [{1}], 
OpCode [{2}], payload length [{3}]
 wsFrame.sessionClosed=The client data cannot be processed because the session 
has already been closed
+wsFrame.suspendRequested=Suspend of the message receiving has already been 
requested.
 wsFrame.textMessageTooBig=The decoded text message was too big for the output 
buffer and the endpoint does not support partial messages
 wsFrame.wrongRsv=The client frame set the reserved bits to [{0}] for a message 
with opCode [{1}] which was not supported by this endpoint
 

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameBase.java Fri Apr 28 
20:13:10 2017
@@ -23,6 +23,7 @@ import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CoderResult;
 import java.nio.charset.CodingErrorAction;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import javax.websocket.CloseReason;
 import javax.websocket.CloseReason.CloseCodes;
@@ -84,11 +85,16 @@ public abstract class WsFrameBase {
     private volatile State state = State.NEW_FRAME;
     private volatile boolean open = true;
 
+    private static final AtomicReferenceFieldUpdater<WsFrameBase, ReadState> 
READ_STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(WsFrameBase.class, 
ReadState.class, "readState");
+    private volatile ReadState readState = ReadState.WAITING;
+
     public WsFrameBase(WsSession wsSession, Transformation transformation) {
         inputBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
         inputBuffer.position(0).limit(0);
         messageBufferBinary = 
ByteBuffer.allocate(wsSession.getMaxBinaryMessageBufferSize());
         messageBufferText = 
CharBuffer.allocate(wsSession.getMaxTextMessageBufferSize());
+        wsSession.setWsFrame(this);
         this.wsSession = wsSession;
         Transformation finalTransformation;
         if (isMasked()) {
@@ -106,7 +112,7 @@ public abstract class WsFrameBase {
 
 
     protected void processInputBuffer() throws IOException {
-        while (true) {
+        while (!isSuspended()) {
             wsSession.updateLastActive();
             if (state == State.NEW_FRAME) {
                 if (!processInitialHeader()) {
@@ -687,6 +693,205 @@ public abstract class WsFrameBase {
     }
 
 
+    /**
+     * WAITING            - not suspended
+     *                      Server case: waiting for a notification that data
+     *                      is ready to be read from the socket, the socket is
+     *                      registered to the poller
+     *                      Client case: data has been read from the socket and
+     *                      is waiting for data to be processed
+     * PROCESSING         - not suspended
+     *                      Server case: reading from the socket and processing
+     *                      the data
+     *                      Client case: processing the data if such has
+     *                      already been read and more data will be read from
+     *                      the socket
+     * SUSPENDING_WAIT    - suspended, a call to suspend() was made while in
+     *                      WAITING state. A call to resume() will do nothing
+     *                      and will transition to WAITING state
+     * SUSPENDING_PROCESS - suspended, a call to suspend() was made while in
+     *                      PROCESSING state. A call to resume() will do
+     *                      nothing and will transition to PROCESSING state
+     * SUSPENDED          - suspended
+     *                      Server case: processing data finished
+     *                      (SUSPENDING_PROCESS) / a notification was received
+     *                      that data is ready to be read from the socket
+     *                      (SUSPENDING_WAIT), socket is not registered to the
+     *                      poller
+     *                      Client case: processing data finished
+     *                      (SUSPENDING_PROCESS) / data has been read from the
+     *                      socket and is available for processing
+     *                      (SUSPENDING_WAIT)
+     *                      A call to resume() will:
+     *                      Server case: register the socket to the poller
+     *                      Client case: resume data processing
+     * CLOSING            - not suspended, a close will be send
+     *
+     * <pre>
+     *     resume           data to be        resume
+     *     no action        processed         no action
+     *  |---------------| |---------------| |----------|
+     *  |               v |               v v          |
+     *  |  |----------WAITING«--------PROCESSING----|  |
+     *  |  |             ^   processing             |  |
+     *  |  |             |   finished               |  |
+     *  |  |             |                          |  |
+     *  | suspend        |                     suspend |
+     *  |  |             |                          |  |
+     *  |  |          resume                        |  |
+     *  |  |    register socket to poller (server)  |  |
+     *  |  |    resume data processing (client)     |  |
+     *  |  |             |                          |  |
+     *  |  v             |                          v  |
+     * SUSPENDING_WAIT   |                  SUSPENDING_PROCESS
+     *  |                |                             |
+     *  | data available |        processing finished  |
+     *  |-------------»SUSPENDED«----------------------|
+     * </pre>
+     */
+    protected enum ReadState {
+        WAITING           (false),
+        PROCESSING        (false),
+        SUSPENDING_WAIT   (true),
+        SUSPENDING_PROCESS(true),
+        SUSPENDED         (true),
+        CLOSING           (false);
+
+        private final boolean isSuspended;
+
+        ReadState(boolean isSuspended) {
+            this.isSuspended = isSuspended;
+        }
+
+        public boolean isSuspended() {
+            return isSuspended;
+        }
+    }
+
+    public void suspend() {
+        while (true) {
+            switch (readState) {
+            case WAITING:
+                if (!READ_STATE_UPDATER.compareAndSet(this, ReadState.WAITING,
+                        ReadState.SUSPENDING_WAIT)) {
+                    continue;
+                }
+                return;
+            case PROCESSING:
+                if (!READ_STATE_UPDATER.compareAndSet(this, 
ReadState.PROCESSING,
+                        ReadState.SUSPENDING_PROCESS)) {
+                    continue;
+                }
+                return;
+            case SUSPENDING_WAIT:
+                if (readState != ReadState.SUSPENDING_WAIT) {
+                    continue;
+                } else {
+                    if (getLog().isWarnEnabled()) {
+                        
getLog().warn(sm.getString("wsFrame.suspendRequested"));
+                    }
+                }
+                return;
+            case SUSPENDING_PROCESS:
+                if (readState != ReadState.SUSPENDING_PROCESS) {
+                    continue;
+                } else {
+                    if (getLog().isWarnEnabled()) {
+                        
getLog().warn(sm.getString("wsFrame.suspendRequested"));
+                    }
+                }
+                return;
+            case SUSPENDED:
+                if (readState != ReadState.SUSPENDED) {
+                    continue;
+                } else {
+                    if (getLog().isWarnEnabled()) {
+                        
getLog().warn(sm.getString("wsFrame.alreadySuspended"));
+                    }
+                }
+                return;
+            case CLOSING:
+                return;
+            default:
+                throw new 
IllegalStateException(sm.getString("wsFrame.illegalReadState", state));
+            }
+        }
+    }
+
+    public void resume() {
+        while (true) {
+            switch (readState) {
+            case WAITING:
+                if (readState != ReadState.WAITING) {
+                    continue;
+                } else {
+                    if (getLog().isWarnEnabled()) {
+                        getLog().warn(sm.getString("wsFrame.alreadyResumed"));
+                    }
+                }
+                return;
+            case PROCESSING:
+                if (readState != ReadState.PROCESSING) {
+                    continue;
+                } else {
+                    if (getLog().isWarnEnabled()) {
+                        getLog().warn(sm.getString("wsFrame.alreadyResumed"));
+                    }
+                }
+                return;
+            case SUSPENDING_WAIT:
+                if (!READ_STATE_UPDATER.compareAndSet(this, 
ReadState.SUSPENDING_WAIT,
+                        ReadState.WAITING)) {
+                    continue;
+                }
+                return;
+            case SUSPENDING_PROCESS:
+                if (!READ_STATE_UPDATER.compareAndSet(this, 
ReadState.SUSPENDING_PROCESS,
+                        ReadState.PROCESSING)) {
+                    continue;
+                }
+                return;
+            case SUSPENDED:
+                if (!READ_STATE_UPDATER.compareAndSet(this, 
ReadState.SUSPENDED,
+                        ReadState.WAITING)) {
+                    continue;
+                }
+                resumeProcessing();
+                return;
+            case CLOSING:
+                return;
+            default:
+                throw new 
IllegalStateException(sm.getString("wsFrame.illegalReadState", state));
+            }
+        }
+    }
+
+    protected boolean isSuspended() {
+        return readState.isSuspended();
+    }
+
+    protected ReadState getReadState() {
+        return readState;
+    }
+
+    protected void changeReadState(ReadState newState) {
+        READ_STATE_UPDATER.set(this, newState);
+    }
+
+    protected boolean changeReadState(ReadState oldState, ReadState newState) {
+        return READ_STATE_UPDATER.compareAndSet(this, oldState, newState);
+    }
+
+    /**
+     * This method will be invoked when the read operation is resumed.
+     * As the suspend of the read operation can be invoked at any time, when
+     * implementing this method one should consider that there might still be
+     * data remaining into the internal buffers that needs to be processed
+     * before reading again from the socket.
+     */
+    protected abstract void resumeProcessing();
+
+
     private abstract class TerminalTransformation implements Transformation {
 
         @Override

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsFrameClient.java Fri Apr 28 
20:13:10 2017
@@ -57,36 +57,67 @@ public class WsFrameClient extends WsFra
 
 
     private void processSocketRead() throws IOException {
+        while (true) {
+            switch (getReadState()) {
+            case WAITING:
+                if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) 
{
+                    continue;
+                }
+                while (response.hasRemaining()) {
+                    if (isSuspended()) {
+                        if (!changeReadState(ReadState.SUSPENDING_PROCESS, 
ReadState.SUSPENDED)) {
+                            continue;
+                        }
+                        // There is still data available in the response buffer
+                        // Return here so that the response buffer will not be
+                        // cleared and there will be no data read from the
+                        // socket. Thus when the read operation is resumed 
first
+                        // the data left in the response buffer will be 
consumed
+                        // and then a new socket read will be performed
+                        return;
+                    }
+                    inputBuffer.mark();
+                    
inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());
+
+                    int toCopy = Math.min(response.remaining(), 
inputBuffer.remaining());
+
+                    // Copy remaining bytes read in HTTP phase to input buffer 
used by
+                    // frame processing
+
+                    int orgLimit = response.limit();
+                    response.limit(response.position() + toCopy);
+                    inputBuffer.put(response);
+                    response.limit(orgLimit);
 
-        while (response.hasRemaining()) {
-            inputBuffer.mark();
-            
inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());
-
-            int toCopy = Math.min(response.remaining(), 
inputBuffer.remaining());
-
-            // Copy remaining bytes read in HTTP phase to input buffer used by
-            // frame processing
-
-            int orgLimit = response.limit();
-            response.limit(response.position() + toCopy);
-            inputBuffer.put(response);
-            response.limit(orgLimit);
-
-            inputBuffer.limit(inputBuffer.position()).reset();
+                    inputBuffer.limit(inputBuffer.position()).reset();
 
-            // Process the data we have
-            processInputBuffer();
-        }
-        response.clear();
+                    // Process the data we have
+                    processInputBuffer();
+                }
+                response.clear();
 
-        // Get some more data
-        if (isOpen()) {
-            channel.read(response, null, handler);
+                // Get some more data
+                if (isOpen()) {
+                    channel.read(response, null, handler);
+                } else {
+                    changeReadState(ReadState.CLOSING);
+                }
+                return;
+            case SUSPENDING_WAIT:
+                if (!changeReadState(ReadState.SUSPENDING_WAIT, 
ReadState.SUSPENDED)) {
+                    continue;
+                }
+                return;
+            default:
+                throw new IllegalStateException(
+                        sm.getString("wsFrameServer.illegalReadState", 
getReadState()));
+            }
         }
     }
 
 
     private final void close(Throwable t) {
+        changeReadState(ReadState.CLOSING);
         CloseReason cr;
         if (t instanceof WsIOException) {
             cr = ((WsIOException) t).getCloseReason();
@@ -129,19 +160,7 @@ public class WsFrameClient extends WsFra
                 return;
             }
             response.flip();
-            try {
-                processSocketRead();
-            } catch (IOException e) {
-                // Only send a close message on an IOException if the client
-                // has not yet received a close control message from the server
-                // as the IOException may be in response to the client
-                // continuing to send a message after the server sent a close
-                // control message.
-                if (isOpen()) {
-                    log.debug(sm.getString("wsFrameClient.ioe"), e);
-                    close(e);
-                }
-            }
+            doResumeProcessing(true);
         }
 
         @Override
@@ -151,13 +170,58 @@ public class WsFrameClient extends WsFra
                 response = ByteBuffer
                         .allocate(((ReadBufferOverflowException) 
exc).getMinBufferSize());
                 response.flip();
-                try {
-                    processSocketRead();
-                } catch (IOException e) {
+                doResumeProcessing(false);
+            } else {
+                close(exc);
+            }
+        }
+
+        private void doResumeProcessing(boolean checkOpenOnError) {
+            while (true) {
+                switch (getReadState()) {
+                case PROCESSING:
+                    if (!changeReadState(ReadState.PROCESSING, 
ReadState.WAITING)) {
+                        continue;
+                    }
+                    resumeProcessing(checkOpenOnError);
+                    return;
+                case SUSPENDING_PROCESS:
+                    if (!changeReadState(ReadState.SUSPENDING_PROCESS, 
ReadState.SUSPENDED)) {
+                        continue;
+                    }
+                    return;
+                default:
+                    throw new IllegalStateException(
+                            sm.getString("wsFrame.illegalReadState", 
getReadState()));
+                }
+            }
+        }
+    }
+
+
+    @Override
+    protected void resumeProcessing() {
+        resumeProcessing(true);
+    }
+
+    private void resumeProcessing(boolean checkOpenOnError) {
+        try {
+            processSocketRead();
+        } catch (IOException e) {
+            if (checkOpenOnError) {
+                // Only send a close message on an IOException if the client
+                // has not yet received a close control message from the server
+                // as the IOException may be in response to the client
+                // continuing to send a message after the server sent a close
+                // control message.
+                if (isOpen()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug(sm.getString("wsFrameClient.ioe"), e);
+                    }
                     close(e);
                 }
             } else {
-                close(exc);
+                close(e);
             }
         }
     }

Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Fri Apr 28 
20:13:10 2017
@@ -792,4 +792,26 @@ public class WsSession implements Sessio
         OUTPUT_CLOSED,
         CLOSED
     }
+
+
+    private WsFrameBase wsFrame;
+    void setWsFrame(WsFrameBase wsFrame) {
+        this.wsFrame = wsFrame;
+    }
+
+
+    /**
+     * Suspends the reading of the incoming messages.
+     */
+    public void suspend() {
+        wsFrame.suspend();
+    }
+
+
+    /**
+     * Resumes the reading of the incoming messages.
+     */
+    public void resume() {
+        wsFrame.resume();
+    }
 }

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/LocalStrings.properties?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/tomcat/websocket/server/LocalStrings.properties 
(original)
+++ 
tomcat/trunk/java/org/apache/tomcat/websocket/server/LocalStrings.properties 
Fri Apr 28 20:13:10 2017
@@ -31,6 +31,7 @@ uriTemplate.invalidPath=The path [{0}] i
 uriTemplate.invalidSegment=The segment [{0}] is not valid in the provided path 
[{1}]
 
 wsFrameServer.bytesRead=Read [{0}] bytes into input buffer ready for processing
+wsFrameServer.illegalReadState=Unexpected read state [{0}]
 wsFrameServer.onDataAvailable=Method entry
 
 wsHttpUpgradeHandler.closeOnError=Closing WebSocket connection due to an error

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/server/WsFrameServer.java Fri 
Apr 28 20:13:10 2017
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.SocketEvent;
 import org.apache.tomcat.util.net.SocketWrapperBase;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.websocket.Transformation;
@@ -52,11 +54,18 @@ public class WsFrameServer extends WsFra
      * @throws IOException if an I/O error occurs while processing the 
available
      *                     data
      */
-    public void onDataAvailable() throws IOException {
+    private void onDataAvailable() throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("wsFrameServer.onDataAvailable");
         }
-        while (isOpen()) {
+        if (isOpen() && inputBuffer.hasRemaining() && !isSuspended()) {
+            // There might be a data that was left in the buffer when
+            // the read has been suspended.
+            // Consume this data before reading from the socket.
+            processInputBuffer();
+        }
+
+        while (isOpen() && !isSuspended()) {
             // Fill up the input buffer with as much data as we can
             inputBuffer.mark();
             
inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());
@@ -124,4 +133,60 @@ public class WsFrameServer extends WsFra
             Thread.currentThread().setContextClassLoader(cl);
         }
     }
+
+
+    @Override
+    protected void resumeProcessing() {
+        socketWrapper.processSocket(SocketEvent.OPEN_READ, true);
+    }
+
+    SocketState notifyDataAvailable() throws IOException {
+        while (isOpen()) {
+            switch (getReadState()) {
+            case WAITING:
+                if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) 
{
+                    continue;
+                }
+                try {
+                    return doOnDataAvailable();
+                } catch (IOException e) {
+                    changeReadState(ReadState.CLOSING);
+                    throw e;
+                }
+            case SUSPENDING_WAIT:
+                if (!changeReadState(ReadState.SUSPENDING_WAIT, 
ReadState.SUSPENDED)) {
+                    continue;
+                }
+                return SocketState.SUSPENDED;
+            default:
+                throw new IllegalStateException(
+                        sm.getString("wsFrameServer.illegalReadState", 
getReadState()));
+            }
+        }
+
+        return SocketState.CLOSED;
+    }
+
+    private SocketState doOnDataAvailable() throws IOException {
+        onDataAvailable();
+        while (isOpen()) {
+            switch (getReadState()) {
+            case PROCESSING:
+                if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) 
{
+                    continue;
+                }
+                return SocketState.UPGRADED;
+            case SUSPENDING_PROCESS:
+                if (!changeReadState(ReadState.SUSPENDING_PROCESS, 
ReadState.SUSPENDED)) {
+                    continue;
+                }
+                return SocketState.SUSPENDED;
+            default:
+                throw new IllegalStateException(
+                        sm.getString("wsFrameServer.illegalReadState", 
getReadState()));
+            }
+        }
+
+        return SocketState.CLOSED;
+    }
 }

Modified: 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 
(original)
+++ 
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 
Fri Apr 28 20:13:10 2017
@@ -145,7 +145,7 @@ public class WsHttpUpgradeHandler implem
         switch (status) {
             case OPEN_READ:
                 try {
-                    wsFrame.onDataAvailable();
+                    return wsFrame.notifyDataAvailable();
                 } catch (WsIOException ws) {
                     close(ws.getCloseReason());
                 } catch (IOException ioe) {

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1793147&r1=1793146&r2=1793147&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Fri Apr 28 20:13:10 2017
@@ -94,6 +94,15 @@
       </fix>
     </changelog>
   </subsection>
+  <subsection name="WebSocket">
+    <changelog>
+      <add>
+        Introduce new API <code>o.a.tomcat.websocket.WsSession#suspend</code>/
+        <code>o.a.tomcat.websocket.WsSession#resume</code> that can be used to
+        suspend/resume reading of the incoming messages. (violetagg)
+      </add>
+    </changelog>
+  </subsection>
   <subsection name="Web Applications">
     <changelog>
       <add>



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to