[ 
https://issues.apache.org/jira/browse/CAMEL-11731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371857#comment-16371857
 ] 

ASF GitHub Bot commented on CAMEL-11731:
----------------------------------------

onderson closed pull request #2188: CAMEL-11731 - add asynccallback and sync 
camel and servlet async APIs
URL: https://github.com/apache/camel/pull/2188
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/camel-core/src/main/docs/eips/serviceCall-eip.adoc 
b/camel-core/src/main/docs/eips/serviceCall-eip.adoc
index c3ade43224b..b90460468b0 100644
--- a/camel-core/src/main/docs/eips/serviceCall-eip.adoc
+++ b/camel-core/src/main/docs/eips/serviceCall-eip.adoc
@@ -100,18 +100,18 @@ The Service Call EIP supports 14 options which are listed 
below:
 | Name | Description | Default | Type
 | *name* | *Required* Sets the name of the service to use |  | String
 | *uri* | The uri of the endpoint to send to. The uri can be dynamic computed 
using the org.apache.camel.language.simple.SimpleLanguage expression. |  | 
String
-| *component* | The component to use | http4 | String
+| *component* | The component to use. | http4 | String
 | *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | 
 | ExchangePattern
 | *configurationRef* | Refers to a ServiceCall configuration to use |  | String
-| *serviceDiscoveryRef* | Sets a reference to a custom ServiceDiscovery to use 
|  | String
-| *serviceFilterRef* | Sets a reference to a custom ServiceFilter to use |  | 
String
-| *serviceChooserRef* | Sets a reference to a custom ServiceChooser to use |  
| String
-| *loadBalancerRef* | Sets a reference to a custom ServiceLoadBalancer to use 
|  | String
-| *expressionRef* | Set a reference to a custom Expression to use |  | String
-| *serviceDiscovery Configuration* | *Required* Configures the 
ServiceDiscovery using the given configuration |  | ServiceCallService 
DiscoveryConfiguration
-| *serviceFilterConfiguration* | *Required* Configures the ServiceFilter using 
the given configuration |  | ServiceCallService FilterConfiguration
-| *loadBalancerConfiguration* | *Required* Configures the LoadBalancer using 
the given configuration |  | ServiceCallServiceLoad BalancerConfiguration
-| *expressionConfiguration* | *Required* Configures the Expression using the 
given configuration |  | ServiceCallExpression Configuration
+| *serviceDiscoveryRef* | Sets a reference to a custom ServiceDiscovery to 
use. |  | String
+| *serviceFilterRef* | Sets a reference to a custom ServiceFilter to use. |  | 
String
+| *serviceChooserRef* | Sets a reference to a custom ServiceChooser to use. |  
| String
+| *loadBalancerRef* | Sets a reference to a custom ServiceLoadBalancer to use. 
|  | String
+| *expressionRef* | Set a reference to a custom Expression to use. |  | String
+| *serviceDiscovery Configuration* | *Required* Configures the 
ServiceDiscovery using the given configuration. |  | ServiceCallService 
DiscoveryConfiguration
+| *serviceFilterConfiguration* | *Required* Configures the ServiceFilter using 
the given configuration. |  | ServiceCallService FilterConfiguration
+| *loadBalancerConfiguration* | *Required* Configures the LoadBalancer using 
the given configuration. |  | ServiceCallServiceLoad BalancerConfiguration
+| *expressionConfiguration* | *Required* Configures the Expression using the 
given configuration. |  | ServiceCallExpression Configuration
 |===
 // eip options: END
 
diff --git 
a/components/camel-http-common/src/main/java/org/apache/camel/http/common/CamelServlet.java
 
b/components/camel-http-common/src/main/java/org/apache/camel/http/common/CamelServlet.java
index 19755b490b8..abf5309d55e 100644
--- 
a/components/camel-http-common/src/main/java/org/apache/camel/http/common/CamelServlet.java
+++ 
b/components/camel-http-common/src/main/java/org/apache/camel/http/common/CamelServlet.java
@@ -30,6 +30,7 @@
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.RuntimeCamelException;
@@ -75,7 +76,19 @@ protected final void service(HttpServletRequest req, 
HttpServletResponse resp) t
             //run async
             context.start(() -> doServiceAsync(context));
         } else {
-            doService(req, resp);
+            try {
+                doService(req, resp);
+            } catch (Exception e) {
+                //An error shouldn't occur as we should handle most of error 
in doService
+                log.error("Error processing request", e);
+                try {
+                    
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                } catch (Exception e1) {
+                    log.debug("Cannot send reply to client!", e1);
+                }
+                //Need to wrap it in RuntimeException as it occurs in a 
Runnable
+                throw new RuntimeCamelException(e);
+            }
         }
     }
 
@@ -86,37 +99,174 @@ protected final void service(HttpServletRequest req, 
HttpServletResponse resp) t
     protected void doServiceAsync(AsyncContext context) {
         final HttpServletRequest request = (HttpServletRequest) 
context.getRequest();
         final HttpServletResponse response = (HttpServletResponse) 
context.getResponse();
+        doServiceAsync(request, response, new AsyncCallback() {
+              
+            @Override
+            public void done(boolean doneSync) {
+                if (!doneSync) {
+                    context.complete();
+                }
+            }
+        });
+    }
+
+    protected void doService(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException {
+        log.trace("Service: {}", request);
+
+        // Is there a consumer registered for the request.
+        HttpConsumer consumer = resolve(request);
+        boolean check = nullCheckConsumer(request, response, consumer);
+        if (check) {
+            return;
+        }
+        
+        // create exchange and set data on it
+        Exchange exchange = createCamelExchange(consumer);
+        
+        ClassLoader oldTccl = createTccl(request, response, exchange, 
consumer);
+
+        // we want to handle the UoW
         try {
-            doService(request, response);
+            consumer.createUoW(exchange);
         } catch (Exception e) {
-            //An error shouldn't occur as we should handle most of error in 
doService
             log.error("Error processing request", e);
-            try {
-                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
-            } catch (Exception e1) {
-                log.debug("Cannot send reply to client!", e1);
+            throw new ServletException(e);
+        }
+
+        try {
+            if (log.isTraceEnabled()) {
+                log.trace("Processing request for exchangeId: {}", 
exchange.getExchangeId());
+            }
+            // process the exchange
+            consumer.getProcessor().process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        try {
+            // now lets output to the response
+            if (log.isTraceEnabled()) {
+                log.trace("Writing response for exchangeId: {}", 
exchange.getExchangeId());
             }
-            //Need to wrap it in RuntimeException as it occurs in a Runnable
-            throw new RuntimeCamelException(e);
+            Integer bs = consumer.getEndpoint().getResponseBufferSize();
+            if (bs != null) {
+                log.trace("Using response buffer size: {}", bs);
+                response.setBufferSize(bs);
+            }
+            consumer.getBinding().writeResponse(exchange, response);
+        } catch (IOException e) {
+            log.error("Error processing request", e);
+            throw e;
+        } catch (Exception e) {
+            log.error("Error processing request", e);
+            throw new ServletException(e);
         } finally {
-            context.complete();
+            consumer.doneUoW(exchange);
+            restoreTccl(exchange, oldTccl);
         }
     }
+    
+    private Exchange createCamelExchange(HttpConsumer consumer) {
+        Exchange exchange = 
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
 
+        if (consumer.getEndpoint().isBridgeEndpoint()) {
+            exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
+            exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, 
Boolean.TRUE);
+        }
+        if (consumer.getEndpoint().isDisableStreamCache()) {
+            exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, 
Boolean.TRUE);
+        }
+        return exchange;
+    }
+    
     /**
      * This is the logical implementation to handle request with {@link 
CamelServlet}
-     * This is where most exceptions should be handled
+     * This is where all exceptions should be handled and no further thrown
      *
      * @param request the {@link HttpServletRequest}
      * @param response the {@link HttpServletResponse}
-     * @throws ServletException
-     * @throws IOException
+     * @param asyncCallback {@link AsyncCallback}
      */
-    protected void doService(HttpServletRequest request, HttpServletResponse 
response) throws ServletException, IOException {
+    protected void doServiceAsync(HttpServletRequest request, 
HttpServletResponse response, AsyncCallback asyncCallback) {
         log.trace("Service: {}", request);
+        Exchange exchange = null;
+        HttpConsumer consumer = null;
+        ClassLoader oldTccl = null;
+        try {
+            // Is there a consumer registered for the request.
+            consumer = resolve(request);
+            boolean check = nullCheckConsumer(request, response, consumer);
+            if (check) {
+                return;
+            }
+            
+            // create exchange and set data on it
+            exchange = createCamelExchange(consumer);
 
-        // Is there a consumer registered for the request.
-        HttpConsumer consumer = resolve(request);
+            oldTccl = createTccl(request, response, exchange, consumer);
+            
+            // we want to handle the UoW
+            consumer.createUoW(exchange);
+            
+            if (log.isTraceEnabled()) {
+                log.trace("Processing request for exchangeId: {}", 
exchange.getExchangeId());
+            }
+            // process the exchange
+
+            consumer.getAsyncProcessor().process(exchange, asyncCallback);
+            
+            // now lets output to the response
+            if (log.isTraceEnabled()) {
+                log.trace("Writing response for exchangeId: {}", 
exchange.getExchangeId());
+            }
+            Integer bs = consumer.getEndpoint().getResponseBufferSize();
+            if (bs != null) {
+                log.trace("Using response buffer size: {}", bs);
+                response.setBufferSize(bs);
+            }
+            consumer.getBinding().writeResponse(exchange, response);
+            
+        } catch (Exception e) {
+            log.error("Error processing request", e);
+            if (exchange != null) {
+                exchange.setException(e);
+            }
+        } finally {
+            if (consumer != null) {
+                consumer.doneUoW(exchange);
+            }
+            if (exchange != null && oldTccl != null) {
+                restoreTccl(exchange, oldTccl);
+            }
+            if (asyncCallback != null) {
+                asyncCallback.done(false);
+            }
+        }
+    }
+
+    private ClassLoader createTccl(HttpServletRequest request, 
HttpServletResponse response, Exchange exchange,
+        HttpConsumer consumer) {
+        // we override the classloader before building the HttpMessage just in 
case the binding
+        // does some class resolution
+        ClassLoader oldTccl = overrideTccl(exchange);
+        HttpHelper.setCharsetFromContentType(request.getContentType(), 
exchange);
+        exchange.setIn(new HttpMessage(exchange, consumer.getEndpoint(), 
request, response));
+        // set context path as header
+        String contextPath = consumer.getEndpoint().getPath();
+        exchange.getIn().setHeader("CamelServletContextPath", contextPath);
+
+        String httpPath = 
(String)exchange.getIn().getHeader(Exchange.HTTP_PATH);
+        // here we just remove the CamelServletContextPath part from the 
HTTP_PATH
+        if (contextPath != null
+            && httpPath.startsWith(contextPath)) {
+            exchange.getIn().setHeader(Exchange.HTTP_PATH,
+                    httpPath.substring(contextPath.length()));
+        }
+        return oldTccl;
+    }
+
+    private boolean nullCheckConsumer(HttpServletRequest request, 
HttpServletResponse response, HttpConsumer consumer) throws IOException {
+        boolean isReturn = false;
         if (consumer == null) {
             // okay we cannot process this requires so return either 404 or 
405.
             // to know if its 405 then we need to check if any other HTTP 
method would have a consumer for the "same" request
@@ -124,19 +274,18 @@ protected void doService(HttpServletRequest request, 
HttpServletResponse respons
             if (hasAnyMethod) {
                 log.debug("No consumer to service request {} as method {} is 
not allowed", request, request.getMethod());
                 response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
-                return;
+                return true;
             } else {
                 log.debug("No consumer to service request {} as resource is 
not found", request);
                 response.sendError(HttpServletResponse.SC_NOT_FOUND);
-                return;
+                return true;
             }
-        }       
-        
+        }
         // are we suspended?
         if (consumer.isSuspended()) {
             log.debug("Consumer suspended, cannot service request {}", 
request);
             response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
-            return;
+            return true;
         }
 
         // if its an OPTIONS request then return which method is allowed
@@ -150,87 +299,20 @@ protected void doService(HttpServletRequest request, 
HttpServletResponse respons
             }
             response.addHeader("Allow", s);
             response.setStatus(HttpServletResponse.SC_OK);
-            return;
+            return true;
         }
         
         if (consumer.getEndpoint().getHttpMethodRestrict() != null 
             && 
!consumer.getEndpoint().getHttpMethodRestrict().contains(request.getMethod())) {
             response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
-            return;
+            return true;
         }
 
         if ("TRACE".equals(request.getMethod()) && !consumer.isTraceEnabled()) 
{
             response.sendError(HttpServletResponse.SC_METHOD_NOT_ALLOWED);
-            return;
-        }
-        
-        // create exchange and set data on it
-        Exchange exchange = 
consumer.getEndpoint().createExchange(ExchangePattern.InOut);
-
-        if (consumer.getEndpoint().isBridgeEndpoint()) {
-            exchange.setProperty(Exchange.SKIP_GZIP_ENCODING, Boolean.TRUE);
-            exchange.setProperty(Exchange.SKIP_WWW_FORM_URLENCODED, 
Boolean.TRUE);
-        }
-        if (consumer.getEndpoint().isDisableStreamCache()) {
-            exchange.setProperty(Exchange.DISABLE_HTTP_STREAM_CACHE, 
Boolean.TRUE);
-        }
-
-        // we override the classloader before building the HttpMessage just in 
case the binding
-        // does some class resolution
-        ClassLoader oldTccl = overrideTccl(exchange);
-        HttpHelper.setCharsetFromContentType(request.getContentType(), 
exchange);
-        exchange.setIn(new HttpMessage(exchange, consumer.getEndpoint(), 
request, response));
-        // set context path as header
-        String contextPath = consumer.getEndpoint().getPath();
-        exchange.getIn().setHeader("CamelServletContextPath", contextPath);
-
-        String httpPath = 
(String)exchange.getIn().getHeader(Exchange.HTTP_PATH);
-        // here we just remove the CamelServletContextPath part from the 
HTTP_PATH
-        if (contextPath != null
-            && httpPath.startsWith(contextPath)) {
-            exchange.getIn().setHeader(Exchange.HTTP_PATH,
-                    httpPath.substring(contextPath.length()));
-        }
-
-        // we want to handle the UoW
-        try {
-            consumer.createUoW(exchange);
-        } catch (Exception e) {
-            log.error("Error processing request", e);
-            throw new ServletException(e);
-        }
-
-        try {
-            if (log.isTraceEnabled()) {
-                log.trace("Processing request for exchangeId: {}", 
exchange.getExchangeId());
-            }
-            // process the exchange
-            consumer.getProcessor().process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-
-        try {
-            // now lets output to the response
-            if (log.isTraceEnabled()) {
-                log.trace("Writing response for exchangeId: {}", 
exchange.getExchangeId());
-            }
-            Integer bs = consumer.getEndpoint().getResponseBufferSize();
-            if (bs != null) {
-                log.trace("Using response buffer size: {}", bs);
-                response.setBufferSize(bs);
-            }
-            consumer.getBinding().writeResponse(exchange, response);
-        } catch (IOException e) {
-            log.error("Error processing request", e);
-            throw e;
-        } catch (Exception e) {
-            log.error("Error processing request", e);
-            throw new ServletException(e);
-        } finally {
-            consumer.doneUoW(exchange);
-            restoreTccl(exchange, oldTccl);
+            return true;
         }
+        return isReturn;
     }
 
     /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Servlet Component isn't really async
> ------------------------------------
>
>                 Key: CAMEL-11731
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11731
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-servlet
>    Affects Versions: 2.18.4, 2.19.2
>         Environment: All
>            Reporter: Nick Houghton
>            Assignee: Önder Sezgin
>            Priority: Major
>             Fix For: Future
>
>
> So my assumption reading the servlet component doco is that with 2.18+ and a 
> Servlet 3+ container, the component supports async, which it kind of does 
> with the async=true init config, and there is even a method in CamelServlet 
> called "doServiceAsync" but from what i can tell it doesn't really do it in a 
> asynchronous manner, where there are no blocked threads while a request is 
> awaiting an async operation (like an AHC call for example).
> Looking at:
> https://github.com/apache/camel/blob/master/components/camel-http-common/src/main/java/org/apache/camel/http/common/CamelServlet.java
> While processing a request, we check if we are in async mode and call 
> doServiceAsync..
> {code:java}
>  @Override
>     protected final void service(HttpServletRequest req, HttpServletResponse 
> resp) throws ServletException, IOException {
>         if (isAsync()) {
>             final AsyncContext context = req.startAsync();
>             //run async
>             context.start(() -> doServiceAsync(context));
>         } else {
>             doService(req, resp);
>         }
>     }
> {code}
> then in doServiceAsync() we call doService().. and then we call 
> getProcessor().process(exchange), not process(exchange, asyncCallback) which 
> is the proper async method..
> {code:java}
> try {
>             if (log.isTraceEnabled()) {
>                 log.trace("Processing request for exchangeId: {}", 
> exchange.getExchangeId());
>             }
>             // process the exchange
>             consumer.getProcessor().process(exchange);
>         } catch (Exception e) {
>             exchange.setException(e);
>         }
> {code}
> So the actual behaviour is an inbound request in async mode that ends up just 
> blocking waiting for the request to complete, in a totally sync manner. I 
> presume this is not the desired behaviour?
> It seems the fix would be to move the doService() logic to doServiceAsync() 
> and have doService() call it and use the AsyncProcessorConverterHelper. Or 
> the other alternative would be to update the documentation to explicitly note 
> that it is actually not async at all.
> I can probably PR it in, just wanted to get thoughts on the actual intention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to