Author: markt Date: Fri Feb 27 15:01:06 2015 New Revision: 1662703 URL: http://svn.apache.org/r1662703 Log: Refactor blocking text messages to use blocking writes rather than the TextMessageSendHandler
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1662703&r1=1662702&r2=1662703&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java Fri Feb 27 15:01:06 2015 @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import javax.websocket.DeploymentException; @@ -242,40 +241,38 @@ public abstract class WsRemoteEndpointIm void sendMessageBlock(CharBuffer part, boolean last) throws IOException { - try { - // Get the timeout before we send the message. The message may - // trigger a session close and depending on timing the client - // session may close before we can read the timeout. - long timeout = getBlockingSendTimeout(); - FutureToSendHandler f2sh = new FutureToSendHandler(wsSession); - TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, part, - last, encoder, encoderBuffer, this); - tmsh.write(); - if (timeout == -1) { - f2sh.get(); - } else { - f2sh.get(timeout, TimeUnit.MILLISECONDS); + long timeoutExpiry = getTimeoutExpiry(); + boolean isDone = false; + while (!isDone) { + encoderBuffer.clear(); + CoderResult cr = encoder.encode(part, encoderBuffer, true); + if (cr.isError()) { + throw new IllegalArgumentException(cr.toString()); } - } catch (InterruptedException | ExecutionException | - TimeoutException e) { - throw new IOException(e); + isDone = !cr.isOverflow(); + encoderBuffer.flip(); + sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeoutExpiry); } + stateMachine.complete(last); } void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last) throws IOException { + sendMessageBlock(opCode, payload, last, getTimeoutExpiry()); + } + + + private long getTimeoutExpiry() { // Get the timeout before we send the message. The message may // trigger a session close and depending on timing the client // session may close before we can read the timeout. long timeout = getBlockingSendTimeout(); - long timeoutExpiry; if (timeout < 0) { - timeoutExpiry = Long.MAX_VALUE; + return Long.MAX_VALUE; } else { - timeoutExpiry = System.currentTimeMillis() + timeout; + return System.currentTimeMillis() + timeout; } - sendMessageBlock(opCode, payload, last, timeoutExpiry); } @@ -863,7 +860,7 @@ public abstract class WsRemoteEndpointIm if (payloadLeft > outputSpace) { // Restore the original limit payload.limit(payloadLimit); - // Still more headers to write, need to flush + // Still more data to write, need to flush outputBuffer.flip(); endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); return; @@ -872,12 +869,12 @@ public abstract class WsRemoteEndpointIm if (flushRequired) { outputBuffer.flip(); if (outputBuffer.remaining() == 0) { - handler.onResult(new SendResult()); + handler.onResult(SENDRESULT_OK); } else { endpoint.doWrite(this, blockingWriteTimeoutExpiry, outputBuffer); } } else { - handler.onResult(new SendResult()); + handler.onResult(SENDRESULT_OK); } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org