Hi Wilson,I don't think you can leverage the JMS queue to achieve the cache the request for the slow service provider.
Since the CXF component consumer do not use the async processor to handler the request , the router's worker thread will drive the processor before it get the extern Web Services response back , then it will
return the response to the router client.If you have multi clients to invoke the router service, they will take up the Jetty bundle thread pool's threads to handle the request. When all the thread in the thread pool are busy, the router client request will be denied.
Camel supports the async processor[1], but because CXF still use the sync http servlet API , I do not find
a good way to introduce the async processor into the camel-cxf component. Any thoughts? Willem. Wilson wrote:
Hi Willem, I have a situation where there are a big amount of one way messages arriving (via SOAP) from an external client that must be routed to an external service provider (via SOAP). The service provider may not be able to consume the messages at the same speed they are arriving. I would like to save the arriving messages into a JMS queue. After a consumer would consume the messages from the queue and call the external web service. This way the external client is not blocked by the slow service provider. I am really new to Camel and I am not sure this is the right way to achieve this? Thank you, Wilson willem.jiang wrote:Hi WilsonThe first error is caused by the Exchange that you get from activeMQQueue misses the CXF message which is came from cxfRouter.My question why do you want to put the router message into the activeMQQueue and then put it to the cxfService endpoint. If you want the request to be put into the activeMQQueue, you could write your router configuration like this.--------JAVA CODE--BEGIN------------- from(cxfRouter).multicast(new UseLatestAggregationStrategy()) .to(activeMQQueue, cxfService);--------JAVA CODE--END-------------Hope this will help you :) Willem. Wilson wrote:Hi Willem, I've got the last 1.3 SPAPSHOT and the problem is gone.Now I have another one.It is possible to do this? --------JAVA CODE--BEGIN------------- from(cxfRouter).to(activeMQQueue); from(activeMQQueue).to(cxfService); --------JAVA CODE--END------------- When I try this configuration, I got this exception: --------EXCEPTION--BEGIN------------- 14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain doIntercept INFO: Interceptor has thrown exception, unwinding now java.lang.NullPointerException at org.apache.cxf.interceptor.MessageSenderInterceptor.getConduit(MessageSenderInterceptor.java:71) at org.apache.cxf.interceptor.MessageSenderInterceptor.handleMessage(MessageSenderInterceptor.java:46) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207) at org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:95) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207) at org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83) at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284) at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240) at org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712) at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139) at org.mortbay.jetty.Server.handle(Server.java:313) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506) at org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844) at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726) at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:205) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381) at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396) at org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442) 14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain doIntercept --------EXCEPTION--END------------- If I try this route: --------JAVA CODE--BEGIN------------- from(cxfRouter).to(activeMQQueue).to(cxfService); --------JAVA CODE--END------------- The external client is called but the message is note dequeued from the JMS queue. Thank you, Wilson willem.jiang wrote:Hi Wilson,I just traced the code and find a way to fix this issue and it will be in the trunk soon :)Willem. Wilson wrote:Hi Willem, Issue created: https://issues.apache.org/activemq/browse/CAMEL-286 Best regards, Wilson willem.jiang wrote:Hi Wilson,Yes , it must be a camel-cxf component's bug. Could you fill a JIRA[1] for it ?[1]http://issues.apache.org/activemq/browse/CAMEL Willem. Wilson wrote:Hi, I found a workaround for the problem. I added a processor after the CXF router endpoint: -------JAVA-CODE--BEGIN------------------------------------------- String anyEndpoint = "activemq:pedidos"; from(StringUtils.join(cxfRouterURI)).process(new Processor(){ public void process(Exchange exchange) throws Exception { exchange.setOut(exchange.getIn()); } }).to(anyEndpoint).to(StringUtils.join(cxfServiceURI)); -------JAVA-CODE--END------------------------------------------- Now it is working. I think there is a bug in CXF Camel code. -- Wilson Wilson wrote:Hi Willem, I am using Camel 1.3. I am using Maven to build the project. Today I updated to the snapshot released on 2008-01-09 but the problem is still there. Thank you. Wilson willem.jiang wrote:Hi Wilson, Which version of Camel are you using? Please try out Camel 1.3 snapshot which fixed a camel-cxfcomponent's issue which can't pass the exchange back form the cxf producer.Willem. Wilson wrote:Hi Willem, Thank you for your reply! I changed my code in order to use a SEI to describe the Web Service. Now it is working fine but I am having problems when I add an endpoint between the cxf endpoints. The (working) code looks like this: -------------------------------------------------------- package com.tc.eai; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.spring.Main; import org.apache.commons.lang.StringUtils; public class MyRouteBuilder extends RouteBuilder { private static String ROUTER_ADDRESS = "http://localhost:9003/pedidoService"; private static String DATA_FORMAT = "dataFormat=MESSAGE"; private static String SERVICE_NAME = "serviceName=%7bhttp://www.tc.com/pedido%7dpedido"; private static String SERVICE_CLASS = "serviceClass=com.tc.eai.PedidoService"; private static String PORT_NAME = "portName=%7bhttp://www.tc.com/pedido%7dpedidoSOAP"; private static String SERVICE_ADDRESS = "http://localhost:9000/pedidoService"; /** * A main() so we can easily run these routing rules in our IDE */ public static void main(String... args) { Main.main(args); } /** * Lets configure the Camel routing rules using Java code... */ public void configure() { //-- Router receives requests from external clients and send to channel String[] cxfRouterURI = { "cxf://" ,ROUTER_ADDRESS ,"?" ,DATA_FORMAT ,"&" ,SERVICE_NAME ,"&" ,SERVICE_CLASS ,"&" ,PORT_NAME }; //-- Service points to external web service. The request routed by theRouter //-- is send to the external service providerString[] cxfServiceURI = { "cxf://" ,SERVICE_ADDRESS ,"?" ,DATA_FORMAT ,"&" ,SERVICE_NAME ,"&" ,SERVICE_CLASS ,"&" ,PORT_NAME };from(StringUtils.join(cxfRouterURI)).to(StringUtils.join(cxfServiceURI));} } -------------------------------------------------------- When change the route this way: String anyEndpoint = "log:org.apache.camel?level=DEBUG";from(StringUtils.join(cxfRouterURI)).to(anyEndpoint).to(StringUtils.join(cxfServiceURI));I get this error: ------Exception Begin---------------------------- 09/01/2008 18:32:40 org.apache.cxf.phase.PhaseInterceptorChain doIntercept INFO: Interceptor has thrown exception, unwinding now org.apache.cxf.interceptor.Fault at org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:43) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207) at org.apache.camel.component.cxf.invoker.CxfClient.invokeWithMessageStream(CxfClient.java:137) at org.apache.camel.component.cxf.invoker.CxfClient.dispatch(CxfClient.java:89) at org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:202) at org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:152) at org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsynProcessorBridge.process(AsyncProcessorTypeConverter.java:44) at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:73) at org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:143) at org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:87) at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) at org.apache.camel.processor.Pipeline.process(Pipeline.java:85) at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:40) at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:44) at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:68) at org.apache.camel.component.cxf.CamelInvoker.invoke(CamelInvoker.java:71) at org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:65) at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207) at org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83) at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284) at org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240) at org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712) at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139) at org.mortbay.jetty.Server.handle(Server.java:313) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506) at org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844) at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726) at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:211) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381) at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396) at org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442) Caused by: java.lang.NullPointerException at org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1025) at org.apache.commons.io.IOUtils.copy(IOUtils.java:999) at org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:39) ... 32 more ------Exception End---------------------------- The same problem happens with seda and activemq endpoints. Thank you, Wilson Hi Wilson, I am afraid you need to provide the SEI class for camel-cxfcomponent to initiate the endpoint.If you do not want to unmarshal the request message , I think youcould use soap message or raw message data formate. Now I am thinking to do some refactoring work to enable the web services endpoint without SEI class.Willem.
