Author: markt
Date: Fri Feb 27 15:01:06 2015
New Revision: 1662703
URL: http://svn.apache.org/r1662703
Log:
Refactor blocking text messages to use blocking writes rather than the
TextMessageSendHandler
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java?rev=1662703&r1=1662702&r2=1662703&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
Fri Feb 27 15:01:06 2015
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionExc
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.websocket.DeploymentException;
@@ -242,40 +241,38 @@ public abstract class WsRemoteEndpointIm
void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
- try {
- // Get the timeout before we send the message. The message may
- // trigger a session close and depending on timing the client
- // session may close before we can read the timeout.
- long timeout = getBlockingSendTimeout();
- FutureToSendHandler f2sh = new FutureToSendHandler(wsSession);
- TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh,
part,
- last, encoder, encoderBuffer, this);
- tmsh.write();
- if (timeout == -1) {
- f2sh.get();
- } else {
- f2sh.get(timeout, TimeUnit.MILLISECONDS);
+ long timeoutExpiry = getTimeoutExpiry();
+ boolean isDone = false;
+ while (!isDone) {
+ encoderBuffer.clear();
+ CoderResult cr = encoder.encode(part, encoderBuffer, true);
+ if (cr.isError()) {
+ throw new IllegalArgumentException(cr.toString());
}
- } catch (InterruptedException | ExecutionException |
- TimeoutException e) {
- throw new IOException(e);
+ isDone = !cr.isOverflow();
+ encoderBuffer.flip();
+ sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last &&
isDone, timeoutExpiry);
}
+ stateMachine.complete(last);
}
void sendMessageBlock(byte opCode, ByteBuffer payload, boolean last)
throws IOException {
+ sendMessageBlock(opCode, payload, last, getTimeoutExpiry());
+ }
+
+
+ private long getTimeoutExpiry() {
// Get the timeout before we send the message. The message may
// trigger a session close and depending on timing the client
// session may close before we can read the timeout.
long timeout = getBlockingSendTimeout();
- long timeoutExpiry;
if (timeout < 0) {
- timeoutExpiry = Long.MAX_VALUE;
+ return Long.MAX_VALUE;
} else {
- timeoutExpiry = System.currentTimeMillis() + timeout;
+ return System.currentTimeMillis() + timeout;
}
- sendMessageBlock(opCode, payload, last, timeoutExpiry);
}
@@ -863,7 +860,7 @@ public abstract class WsRemoteEndpointIm
if (payloadLeft > outputSpace) {
// Restore the original limit
payload.limit(payloadLimit);
- // Still more headers to write, need to flush
+ // Still more data to write, need to flush
outputBuffer.flip();
endpoint.doWrite(this, blockingWriteTimeoutExpiry,
outputBuffer);
return;
@@ -872,12 +869,12 @@ public abstract class WsRemoteEndpointIm
if (flushRequired) {
outputBuffer.flip();
if (outputBuffer.remaining() == 0) {
- handler.onResult(new SendResult());
+ handler.onResult(SENDRESULT_OK);
} else {
endpoint.doWrite(this, blockingWriteTimeoutExpiry,
outputBuffer);
}
} else {
- handler.onResult(new SendResult());
+ handler.onResult(SENDRESULT_OK);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]