Repository: cxf Updated Branches: refs/heads/3.0.x-fixes 9db0b6856 -> bf04ea01f
Switch to use atmosphere's async websocket invocation executor Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/bf04ea01 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/bf04ea01 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/bf04ea01 Branch: refs/heads/3.0.x-fixes Commit: bf04ea01f9a5168902865cc68a40d3f4f19c4af4 Parents: 9db0b68 Author: Akitoshi Yoshida <[email protected]> Authored: Wed Jun 10 13:59:37 2015 +0200 Committer: Akitoshi Yoshida <[email protected]> Committed: Wed Jun 10 15:10:18 2015 +0200 ---------------------------------------------------------------------- .../atmosphere/AtmosphereWebSocketHandler.java | 57 +++++++------------- .../AtmosphereWebSocketJettyDestination.java | 37 +++---------- .../AtmosphereWebSocketServletDestination.java | 41 +++----------- 3 files changed, 32 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java index 1cf1124..1501666 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java @@ -27,7 +27,6 @@ import java.security.Principal; import java.util.Enumeration; import java.util.List; import java.util.Locale; -import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -101,49 +100,29 @@ public class AtmosphereWebSocketHandler implements WebSocketProtocol { protected List<AtmosphereRequest> invokeService(final WebSocket webSocket, final InputStream stream) { LOG.fine("invokeService(WebSocket, InputStream)"); - // invoke the service asynchronously as onMessage is synchronously blocked (in jetty) - // make sure the byte array passed to this method is immutable, as the websocket framework - // may corrupt the byte array after this method is returned (i.e., before the data is returned in - // the executor's thread. - executeServiceTask(new Runnable() { - @Override - public void run() { - HttpServletRequest request = null; - HttpServletResponse response = null; - try { - WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket); - response = createServletResponse(webSocketHolder); - request = createServletRequest(webSocketHolder, stream); - if (destination != null) { - String reqid = request.getHeader(requestIdKey); - if (reqid != null) { - response.setHeader(responseIdKey, reqid); - } - ((WebSocketDestinationService)destination).invokeInternal(null, - webSocket.resource().getRequest().getServletContext(), - request, response); - } - } catch (InvalidPathException ex) { - reportErrorStatus(response, 400); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to invoke service", e); + HttpServletRequest request = null; + HttpServletResponse response = null; + try { + WebSocketServletHolder webSocketHolder = new AtmosphereWebSocketServletHolder(webSocket); + response = createServletResponse(webSocketHolder); + request = createServletRequest(webSocketHolder, stream); + if (destination != null) { + String reqid = request.getHeader(requestIdKey); + if (reqid != null) { + response.setHeader(responseIdKey, reqid); } + ((WebSocketDestinationService)destination).invokeInternal(null, + webSocket.resource().getRequest().getServletContext(), + request, response); } - }); + } catch (InvalidPathException ex) { + reportErrorStatus(response, 400); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } return null; } - private void executeServiceTask(Runnable r) { - try { - destination.getExecutor().execute(r); - } catch (RejectedExecutionException e) { - LOG.warning( - "Executor queue is full, run the service invocation task in caller thread." - + " Users can specify a larger executor queue to avoid this."); - r.run(); - } - } - // may want to move this error reporting code to WebSocketServletHolder protected void reportErrorStatus(HttpServletResponse response, int status) { if (response != null) { http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java index ec873cc..2abef6c 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java @@ -21,8 +21,6 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.io.IOException; import java.net.URL; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,7 +39,6 @@ import org.apache.cxf.transport.http_jetty.JettyHTTPDestination; import org.apache.cxf.transport.http_jetty.JettyHTTPHandler; import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; import org.apache.cxf.transport.websocket.WebSocketDestinationService; -import org.apache.cxf.workqueue.WorkQueueManager; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereRequest; @@ -59,7 +56,6 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im WebSocketDestinationService { private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketJettyDestination.class); private AtmosphereFramework framework; - private Executor executor; public AtmosphereWebSocketJettyDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, JettyHTTPServerEngineFactory serverEngineFactory) throws IOException { @@ -69,13 +65,10 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true"); AtmosphereUtils.addInterceptors(framework, bus); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); - - // the executor for decoupling the service invocation from websocket's onMessage call which is - // synchronously blocked - executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); } @Override @@ -146,31 +139,15 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im @Override public void onRequest(final AtmosphereResource resource) throws IOException { LOG.fine("onRequest"); - executeHandlerTask(new Runnable() { - @Override - public void run() { - try { - invokeInternal(null, - resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to invoke service", e); - } - } - }); + try { + invokeInternal(null, + resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } } } - private void executeHandlerTask(Runnable r) { - try { - executor.execute(r); - } catch (RejectedExecutionException e) { - LOG.warning( - "Executor queue is full, run the service invocation task in caller thread." - + " Users can specify a larger executor queue to avoid this."); - r.run(); - } - } - // used for internal tests AtmosphereFramework getAtmosphereFramework() { return framework; http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java index 657a183..6459150 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java @@ -20,8 +20,6 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.io.IOException; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,7 +35,6 @@ import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.servlet.ServletDestination; import org.apache.cxf.transport.websocket.WebSocketDestinationService; -import org.apache.cxf.workqueue.WorkQueueManager; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereRequest; @@ -54,7 +51,6 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class); private AtmosphereFramework framework; - private Executor executor; public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, String path) throws IOException { @@ -64,13 +60,10 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true"); AtmosphereUtils.addInterceptors(framework, bus); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); - - // the executor for decoupling the service invocation from websocket's onMessage call which is - // synchronously blocked - executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); } @Override @@ -94,10 +87,6 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im super.invoke(config, context, req, resp); } - Executor getExecutor() { - return executor; - } - @Override public void shutdown() { try { @@ -114,31 +103,15 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im @Override public void onRequest(final AtmosphereResource resource) throws IOException { LOG.fine("onRequest"); - executeHandlerTask(new Runnable() { - @Override - public void run() { - try { - invokeInternal(null, - resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to invoke service", e); - } - } - }); + try { + invokeInternal(null, + resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } } } - private void executeHandlerTask(Runnable r) { - try { - executor.execute(r); - } catch (RejectedExecutionException e) { - LOG.warning( - "Executor queue is full, run the service invocation task in caller thread." - + " Users can specify a larger executor queue to avoid this."); - r.run(); - } - } - // used for internal tests AtmosphereFramework getAtmosphereFramework() { return framework;
