Repository: cxf Updated Branches: refs/heads/master 7f7cc3c39 -> 5e4a14b4d
Switch to use atmosphere's async websocket invocation executor Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/5e4a14b4 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/5e4a14b4 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/5e4a14b4 Branch: refs/heads/master Commit: 5e4a14b4da1f1b27e4df811be80a7f11fc6dae8d Parents: 7f7cc3c Author: Akitoshi Yoshida <[email protected]> Authored: Wed Jun 10 13:59:37 2015 +0200 Committer: Akitoshi Yoshida <[email protected]> Committed: Wed Jun 10 13:59:37 2015 +0200 ---------------------------------------------------------------------- .../AtmosphereWebSocketJettyDestination.java | 37 ++++-------------- .../AtmosphereWebSocketServletDestination.java | 41 ++++---------------- 2 files changed, 14 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/5e4a14b4/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java index ec873cc..2abef6c 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java @@ -21,8 +21,6 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.io.IOException; import java.net.URL; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -41,7 +39,6 @@ import org.apache.cxf.transport.http_jetty.JettyHTTPDestination; import org.apache.cxf.transport.http_jetty.JettyHTTPHandler; import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory; import org.apache.cxf.transport.websocket.WebSocketDestinationService; -import org.apache.cxf.workqueue.WorkQueueManager; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereRequest; @@ -59,7 +56,6 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im WebSocketDestinationService { private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketJettyDestination.class); private AtmosphereFramework framework; - private Executor executor; public AtmosphereWebSocketJettyDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, JettyHTTPServerEngineFactory serverEngineFactory) throws IOException { @@ -69,13 +65,10 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true"); AtmosphereUtils.addInterceptors(framework, bus); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); - - // the executor for decoupling the service invocation from websocket's onMessage call which is - // synchronously blocked - executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); } @Override @@ -146,31 +139,15 @@ public class AtmosphereWebSocketJettyDestination extends JettyHTTPDestination im @Override public void onRequest(final AtmosphereResource resource) throws IOException { LOG.fine("onRequest"); - executeHandlerTask(new Runnable() { - @Override - public void run() { - try { - invokeInternal(null, - resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to invoke service", e); - } - } - }); + try { + invokeInternal(null, + resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } } } - private void executeHandlerTask(Runnable r) { - try { - executor.execute(r); - } catch (RejectedExecutionException e) { - LOG.warning( - "Executor queue is full, run the service invocation task in caller thread." - + " Users can specify a larger executor queue to avoid this."); - r.run(); - } - } - // used for internal tests AtmosphereFramework getAtmosphereFramework() { return framework; http://git-wip-us.apache.org/repos/asf/cxf/blob/5e4a14b4/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java ---------------------------------------------------------------------- diff --git a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java index 657a183..6459150 100644 --- a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java +++ b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java @@ -20,8 +20,6 @@ package org.apache.cxf.transport.websocket.atmosphere; import java.io.IOException; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,7 +35,6 @@ import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.http.DestinationRegistry; import org.apache.cxf.transport.servlet.ServletDestination; import org.apache.cxf.transport.websocket.WebSocketDestinationService; -import org.apache.cxf.workqueue.WorkQueueManager; import org.atmosphere.cpr.ApplicationConfig; import org.atmosphere.cpr.AtmosphereFramework; import org.atmosphere.cpr.AtmosphereRequest; @@ -54,7 +51,6 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im private static final Logger LOG = LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class); private AtmosphereFramework framework; - private Executor executor; public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry registry, EndpointInfo ei, String path) throws IOException { @@ -64,13 +60,10 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, "true"); framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, "true"); framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true"); + framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, "true"); AtmosphereUtils.addInterceptors(framework, bus); framework.addAtmosphereHandler("/", new DestinationHandler()); framework.init(); - - // the executor for decoupling the service invocation from websocket's onMessage call which is - // synchronously blocked - executor = bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue(); } @Override @@ -94,10 +87,6 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im super.invoke(config, context, req, resp); } - Executor getExecutor() { - return executor; - } - @Override public void shutdown() { try { @@ -114,31 +103,15 @@ public class AtmosphereWebSocketServletDestination extends ServletDestination im @Override public void onRequest(final AtmosphereResource resource) throws IOException { LOG.fine("onRequest"); - executeHandlerTask(new Runnable() { - @Override - public void run() { - try { - invokeInternal(null, - resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); - } catch (Exception e) { - LOG.log(Level.WARNING, "Failed to invoke service", e); - } - } - }); + try { + invokeInternal(null, + resource.getRequest().getServletContext(), resource.getRequest(), resource.getResponse()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to invoke service", e); + } } } - private void executeHandlerTask(Runnable r) { - try { - executor.execute(r); - } catch (RejectedExecutionException e) { - LOG.warning( - "Executor queue is full, run the service invocation task in caller thread." - + " Users can specify a larger executor queue to avoid this."); - r.run(); - } - } - // used for internal tests AtmosphereFramework getAtmosphereFramework() { return framework;
