Repository: cxf
Updated Branches:
  refs/heads/3.0.x-fixes 9db0b6856 -> bf04ea01f


Switch to use atmosphere's async websocket invocation executor


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/bf04ea01
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/bf04ea01
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/bf04ea01

Branch: refs/heads/3.0.x-fixes
Commit: bf04ea01f9a5168902865cc68a40d3f4f19c4af4
Parents: 9db0b68
Author: Akitoshi Yoshida <[email protected]>
Authored: Wed Jun 10 13:59:37 2015 +0200
Committer: Akitoshi Yoshida <[email protected]>
Committed: Wed Jun 10 15:10:18 2015 +0200

----------------------------------------------------------------------
 .../atmosphere/AtmosphereWebSocketHandler.java  | 57 +++++++-------------
 .../AtmosphereWebSocketJettyDestination.java    | 37 +++----------
 .../AtmosphereWebSocketServletDestination.java  | 41 +++-----------
 3 files changed, 32 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
index 1cf1124..1501666 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketHandler.java
@@ -27,7 +27,6 @@ import java.security.Principal;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Locale;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -101,49 +100,29 @@ public class AtmosphereWebSocketHandler implements 
WebSocketProtocol {
     
     protected List<AtmosphereRequest> invokeService(final WebSocket webSocket, 
 final InputStream stream) {
         LOG.fine("invokeService(WebSocket, InputStream)");
-        // invoke the service asynchronously as onMessage is synchronously 
blocked (in jetty)
-        // make sure the byte array passed to this method is immutable, as the 
websocket framework
-        // may corrupt the byte array after this method is returned (i.e., 
before the data is returned in
-        // the executor's thread.
-        executeServiceTask(new Runnable() {
-            @Override
-            public void run() {
-                HttpServletRequest request = null;
-                HttpServletResponse response = null;
-                try {
-                    WebSocketServletHolder webSocketHolder = new 
AtmosphereWebSocketServletHolder(webSocket);
-                    response = createServletResponse(webSocketHolder);
-                    request = createServletRequest(webSocketHolder, stream);
-                    if (destination != null) {
-                        String reqid = request.getHeader(requestIdKey);
-                        if (reqid != null) {
-                            response.setHeader(responseIdKey, reqid);
-                        }
-                        
((WebSocketDestinationService)destination).invokeInternal(null,
-                            
webSocket.resource().getRequest().getServletContext(),
-                            request, response);
-                    }
-                } catch (InvalidPathException ex) {
-                    reportErrorStatus(response, 400);
-                } catch (Exception e) {
-                    LOG.log(Level.WARNING, "Failed to invoke service", e);
+        HttpServletRequest request = null;
+        HttpServletResponse response = null;
+        try {
+            WebSocketServletHolder webSocketHolder = new 
AtmosphereWebSocketServletHolder(webSocket);
+            response = createServletResponse(webSocketHolder);
+            request = createServletRequest(webSocketHolder, stream);
+            if (destination != null) {
+                String reqid = request.getHeader(requestIdKey);
+                if (reqid != null) {
+                    response.setHeader(responseIdKey, reqid);
                 }
+                ((WebSocketDestinationService)destination).invokeInternal(null,
+                    webSocket.resource().getRequest().getServletContext(),
+                    request, response);
             }
-        });
+        } catch (InvalidPathException ex) {
+            reportErrorStatus(response, 400);
+        } catch (Exception e) {
+            LOG.log(Level.WARNING, "Failed to invoke service", e);
+        }
         return null;
     }
 
-    private void executeServiceTask(Runnable r) {
-        try {
-            destination.getExecutor().execute(r);
-        } catch (RejectedExecutionException e) {
-            LOG.warning(
-                "Executor queue is full, run the service invocation task in 
caller thread." 
-                + "  Users can specify a larger executor queue to avoid 
this.");
-            r.run();
-        }
-    }
-    
     // may want to move this error reporting code to WebSocketServletHolder
     protected void reportErrorStatus(HttpServletResponse response, int status) 
{
         if (response != null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
index ec873cc..2abef6c 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketJettyDestination.java
@@ -21,8 +21,6 @@ package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -41,7 +39,6 @@ import 
org.apache.cxf.transport.http_jetty.JettyHTTPDestination;
 import org.apache.cxf.transport.http_jetty.JettyHTTPHandler;
 import org.apache.cxf.transport.http_jetty.JettyHTTPServerEngineFactory;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.workqueue.WorkQueueManager;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereRequest;
@@ -59,7 +56,6 @@ public class AtmosphereWebSocketJettyDestination extends 
JettyHTTPDestination im
     WebSocketDestinationService {
     private static final Logger LOG = 
LogUtils.getL7dLogger(AtmosphereWebSocketJettyDestination.class);
     private AtmosphereFramework framework;
-    private Executor executor;
     
     public AtmosphereWebSocketJettyDestination(Bus bus, DestinationRegistry 
registry, EndpointInfo ei,
                                      JettyHTTPServerEngineFactory 
serverEngineFactory) throws IOException {
@@ -69,13 +65,10 @@ public class AtmosphereWebSocketJettyDestination extends 
JettyHTTPDestination im
         
framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, 
"true");
         framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, 
"true");
         framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, 
"true");
+        
framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, 
"true");
         AtmosphereUtils.addInterceptors(framework, bus);
         framework.addAtmosphereHandler("/", new DestinationHandler());
         framework.init();
-
-        // the executor for decoupling the service invocation from websocket's 
onMessage call which is
-        // synchronously blocked
-        executor = 
bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
     
     @Override
@@ -146,31 +139,15 @@ public class AtmosphereWebSocketJettyDestination extends 
JettyHTTPDestination im
         @Override
         public void onRequest(final AtmosphereResource resource) throws 
IOException {
             LOG.fine("onRequest");
-            executeHandlerTask(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        invokeInternal(null, 
-                            resource.getRequest().getServletContext(), 
resource.getRequest(), resource.getResponse());
-                    } catch (Exception e) {
-                        LOG.log(Level.WARNING, "Failed to invoke service", e);
-                    }
-                }
-            });
+            try {
+                invokeInternal(null, 
+                    resource.getRequest().getServletContext(), 
resource.getRequest(), resource.getResponse());
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Failed to invoke service", e);
+            }
         }
     }
     
-    private void executeHandlerTask(Runnable r) {
-        try {
-            executor.execute(r);
-        } catch (RejectedExecutionException e) {
-            LOG.warning(
-                "Executor queue is full, run the service invocation task in 
caller thread." 
-                + "  Users can specify a larger executor queue to avoid 
this.");
-            r.run();
-        }
-    }
-
     // used for internal tests
     AtmosphereFramework getAtmosphereFramework() {
         return framework;

http://git-wip-us.apache.org/repos/asf/cxf/blob/bf04ea01/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
----------------------------------------------------------------------
diff --git 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
index 657a183..6459150 100644
--- 
a/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
+++ 
b/rt/transports/websocket/src/main/java/org/apache/cxf/transport/websocket/atmosphere/AtmosphereWebSocketServletDestination.java
@@ -20,8 +20,6 @@
 package org.apache.cxf.transport.websocket.atmosphere;
 
 import java.io.IOException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -37,7 +35,6 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.http.DestinationRegistry;
 import org.apache.cxf.transport.servlet.ServletDestination;
 import org.apache.cxf.transport.websocket.WebSocketDestinationService;
-import org.apache.cxf.workqueue.WorkQueueManager;
 import org.atmosphere.cpr.ApplicationConfig;
 import org.atmosphere.cpr.AtmosphereFramework;
 import org.atmosphere.cpr.AtmosphereRequest;
@@ -54,7 +51,6 @@ public class AtmosphereWebSocketServletDestination extends 
ServletDestination im
     private static final Logger LOG = 
LogUtils.getL7dLogger(AtmosphereWebSocketServletDestination.class);
 
     private AtmosphereFramework framework;
-    private Executor executor;
 
     public AtmosphereWebSocketServletDestination(Bus bus, DestinationRegistry 
registry, EndpointInfo ei, 
                                                  String path) throws 
IOException {
@@ -64,13 +60,10 @@ public class AtmosphereWebSocketServletDestination extends 
ServletDestination im
         
framework.addInitParameter(ApplicationConfig.PROPERTY_NATIVE_COMETSUPPORT, 
"true");
         framework.addInitParameter(ApplicationConfig.PROPERTY_SESSION_SUPPORT, 
"true");
         framework.addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, 
"true");
+        
framework.addInitParameter(ApplicationConfig.WEBSOCKET_PROTOCOL_EXECUTION, 
"true");
         AtmosphereUtils.addInterceptors(framework, bus);
         framework.addAtmosphereHandler("/", new DestinationHandler());
         framework.init();
-
-        // the executor for decoupling the service invocation from websocket's 
onMessage call which is
-        // synchronously blocked
-        executor = 
bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue();
     }
 
     @Override
@@ -94,10 +87,6 @@ public class AtmosphereWebSocketServletDestination extends 
ServletDestination im
         super.invoke(config, context, req, resp);
     }
 
-    Executor getExecutor() {
-        return executor;
-    }
-
     @Override
     public void shutdown() {
         try {
@@ -114,31 +103,15 @@ public class AtmosphereWebSocketServletDestination 
extends ServletDestination im
         @Override
         public void onRequest(final AtmosphereResource resource) throws 
IOException {
             LOG.fine("onRequest");
-            executeHandlerTask(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        invokeInternal(null, 
-                            resource.getRequest().getServletContext(), 
resource.getRequest(), resource.getResponse());
-                    } catch (Exception e) {
-                        LOG.log(Level.WARNING, "Failed to invoke service", e);
-                    }
-                }
-            });
+            try {
+                invokeInternal(null, 
+                    resource.getRequest().getServletContext(), 
resource.getRequest(), resource.getResponse());
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Failed to invoke service", e);
+            }
         }
     }
     
-    private void executeHandlerTask(Runnable r) {
-        try {
-            executor.execute(r);
-        } catch (RejectedExecutionException e) {
-            LOG.warning(
-                "Executor queue is full, run the service invocation task in 
caller thread." 
-                + "  Users can specify a larger executor queue to avoid 
this.");
-            r.run();
-        }
-    }
-
     // used for internal tests
     AtmosphereFramework getAtmosphereFramework() {
         return framework;

Reply via email to