Author: remm
Date: Mon Apr 30 15:28:26 2018
New Revision: 1830592
URL: http://svn.apache.org/viewvc?rev=1830592&view=rev
Log:
Add async IO API use in websockets writes. Although I doubt there's an actual
benefit at the moment, the change is small and it still improves testing of the
API as the usage is different from HTTP/2. Tested with the testsuite, the
examples and Autobahn.
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
tomcat/trunk/webapps/docs/changelog.xml
Modified:
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java?rev=1830592&r1=1830591&r2=1830592&view=diff
==============================================================================
---
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
(original)
+++
tomcat/trunk/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
Mon Apr 30 15:28:26 2018
@@ -20,7 +20,10 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
@@ -28,6 +31,10 @@ import javax.websocket.SendResult;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.net.SocketWrapperBase.BlockingMode;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionCheck;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionHandlerCall;
+import org.apache.tomcat.util.net.SocketWrapperBase.CompletionState;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
@@ -62,20 +69,95 @@ public class WsRemoteEndpointImplServer
return false;
}
-
@Override
protected void doWrite(SendHandler handler, long
blockingWriteTimeoutExpiry,
ByteBuffer... buffers) {
- if (blockingWriteTimeoutExpiry == -1) {
- this.handler = handler;
- this.buffers = buffers;
- // This is definitely the same thread that triggered the write so a
- // dispatch will be required.
- onWritePossible(true);
+ if (socketWrapper.hasAsyncIO()) {
+ final boolean block = (blockingWriteTimeoutExpiry != -1);
+ long timeout = -1;
+ if (block) {
+ timeout = blockingWriteTimeoutExpiry -
System.currentTimeMillis();
+ if (timeout <= 0) {
+ SendResult sr = new SendResult(new
SocketTimeoutException());
+ handler.onResult(sr);
+ return;
+ }
+ } else {
+ this.handler = handler;
+ if (timeout > 0) {
+ // Register with timeout thread
+ timeoutExpiry = timeout + System.currentTimeMillis();
+ wsWriteTimeout.register(this);
+ }
+ timeout = getSendTimeout();
+ }
+ socketWrapper.write(block ? BlockingMode.BLOCK :
BlockingMode.SEMI_BLOCK, timeout,
+ TimeUnit.MILLISECONDS, null,
+ new CompletionCheck() {
+ @Override
+ public CompletionHandlerCall
callHandler(CompletionState state, ByteBuffer[] buffers,
+ int offset, int length) {
+ for (int i = 0; i < length; i++) {
+ if (buffers[offset + i].remaining() > 0) {
+ return CompletionHandlerCall.CONTINUE;
+ }
+ }
+ return CompletionHandlerCall.DONE;
+ }
+ },
+ new CompletionHandler<Long, Void>() {
+ @Override
+ public void completed(Long result, Void attachment) {
+ if (block) {
+ long timeout = blockingWriteTimeoutExpiry -
System.currentTimeMillis();
+ if (timeout <= 0) {
+ failed(new SocketTimeoutException(), null);
+ } else {
+ handler.onResult(SENDRESULT_OK);
+ }
+ } else {
+
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+ clearHandler(null, true);
+ if (close) {
+ close();
+ }
+ }
+ }
+ @Override
+ public void failed(Throwable exc, Void attachment) {
+ if (exc instanceof InterruptedByTimeoutException) {
+ exc = new SocketTimeoutException();
+ }
+ if (block) {
+ SendResult sr = new SendResult(exc);
+ handler.onResult(sr);
+ } else {
+
wsWriteTimeout.unregister(WsRemoteEndpointImplServer.this);
+ clearHandler(exc, true);
+ close();
+ }
+ }
+ }, buffers);
} else {
- // Blocking
- try {
- for (ByteBuffer buffer : buffers) {
+ if (blockingWriteTimeoutExpiry == -1) {
+ this.handler = handler;
+ this.buffers = buffers;
+ // This is definitely the same thread that triggered the write
so a
+ // dispatch will be required.
+ onWritePossible(true);
+ } else {
+ // Blocking
+ try {
+ for (ByteBuffer buffer : buffers) {
+ long timeout = blockingWriteTimeoutExpiry -
System.currentTimeMillis();
+ if (timeout <= 0) {
+ SendResult sr = new SendResult(new
SocketTimeoutException());
+ handler.onResult(sr);
+ return;
+ }
+ socketWrapper.setWriteTimeout(timeout);
+ socketWrapper.write(true, buffer);
+ }
long timeout = blockingWriteTimeoutExpiry -
System.currentTimeMillis();
if (timeout <= 0) {
SendResult sr = new SendResult(new
SocketTimeoutException());
@@ -83,26 +165,19 @@ public class WsRemoteEndpointImplServer
return;
}
socketWrapper.setWriteTimeout(timeout);
- socketWrapper.write(true, buffer);
- }
- long timeout = blockingWriteTimeoutExpiry -
System.currentTimeMillis();
- if (timeout <= 0) {
- SendResult sr = new SendResult(new
SocketTimeoutException());
+ socketWrapper.flush(true);
+ handler.onResult(SENDRESULT_OK);
+ } catch (IOException e) {
+ SendResult sr = new SendResult(e);
handler.onResult(sr);
- return;
}
- socketWrapper.setWriteTimeout(timeout);
- socketWrapper.flush(true);
- handler.onResult(SENDRESULT_OK);
- } catch (IOException e) {
- SendResult sr = new SendResult(e);
- handler.onResult(sr);
}
}
}
public void onWritePossible(boolean useDispatch) {
+ // Note: Unused for async IO
ByteBuffer[] buffers = this.buffers;
if (buffers == null) {
// Servlet 3.1 will call the write listener once even if nothing
Modified: tomcat/trunk/webapps/docs/changelog.xml
URL:
http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1830592&r1=1830591&r2=1830592&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Mon Apr 30 15:28:26 2018
@@ -57,6 +57,13 @@
</fix>
</changelog>
</subsection>
+ <subsection name="WebSocket">
+ <changelog>
+ <update>
+ Use NIO2 API for websockets writes. (remm)
+ </update>
+ </changelog>
+ </subsection>
</section>
<section name="Tomcat 9.0.8 (markt)" rtext="release in progress">
<subsection name="Catalina">
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]