OK , here is the whole story.
When we start up the CXF router endpoint in Camel , we just start a
Camel-CXF consumer which will listen to the address that we specified in
the endpoint URI. Now a CXF client call the router , the request message
will be passed through the CXF transport and some of the interceptors ,
then it will be handled by the CamelInvoker[1] .
The CamelInvoker is connection point of CXF and Camel core. We turn the
CXF message and exchange into the Camel message and exchange in this
place , and call the processor to process the Camel exchange.
Since all the request message will be send to CamelInvoker for further
processing even the oneway message, if we do not introduce the async
processor into the CamelInvoker to process the request message, we
could not get any benefit by the caching the message into the ActiveMq.
[1]
https://svn.apache.org/repos/asf/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
Willem.
Hiram Chirino wrote:
Hu? Could you clarify?
On Jan 17, 2008 3:10 AM, Willem Jiang <[EMAIL PROTECTED]> wrote:
Hi,
For the oneway invocation the router processor will be available after
the external webservice is called,
if we don't introduce the async processor in the CamelInvoker.
Willem.
Hiram Chirino wrote:
Hey...
Since those are one way soap operations I don't see why we shouldn't
be able to support that. In my oppinion it makes alot of sense to
send oneways to a JMS queue for persistance and then forward them on
to the final destination.
On Jan 16, 2008 1:47 AM, Willem Jiang <[EMAIL PROTECTED]> wrote:
Hi Wilson,
I don't think you can leverage the JMS queue to achieve the cache the
request for the slow service provider.
Since the CXF component consumer do not use the async processor to
handler the request , the router's
worker thread will drive the processor before it get the extern Web
Services response back , then it will
return the response to the router client.
I think he just want's to hand off the oneway from a cxf consumer to a
oneway jms producer. I don't think any 'camel async processing' is
needed since your not going to wait for a response from JMS.
If you have multi clients to invoke the router service, they will take
up the Jetty bundle thread pool's threads
to handle the request. When all the thread in the thread pool are busy,
the router client request will be denied.
once again I think this is only the case if request response is being done.
Camel supports the async processor[1], but because CXF still use the
sync http servlet API , I do not find
a good way to introduce the async processor into the camel-cxf component.
Implementing the CXF consumer as a camel Async processor would allow
us to do really scaleable request response operations. But I don't
think they are needed to do oneway operations, since you don't need to
wait for a response.
Any thoughts?
Willem.
Wilson wrote:
Hi Willem,
I have a situation where there are a big amount of one way messages arriving
(via SOAP) from an external client that must be routed to an external
service provider (via SOAP). The service provider may not be able to consume
the messages at the same speed they are arriving.
I would like to save the arriving messages into a JMS queue. After a
consumer would consume the messages from the queue and call the external web
service. This way the external client is not blocked by the slow service
provider.
I am really new to Camel and I am not sure this is the right way to achieve
this?
Thank you,
Wilson
willem.jiang wrote:
Hi Wilson
The first error is caused by the Exchange that you get from
activeMQQueue misses the CXF message which is came from cxfRouter.
My question why do you want to put the router message into the
activeMQQueue and then put it to the cxfService endpoint.
If you want the request to be put into the activeMQQueue, you could
write your router configuration like this.
--------JAVA CODE--BEGIN-------------
from(cxfRouter).multicast(new UseLatestAggregationStrategy())
.to(activeMQQueue, cxfService);
--------JAVA CODE--END-------------
Hope this will help you :)
Willem.
Wilson wrote:
Hi Willem,
I've got the last 1.3 SPAPSHOT and the problem is gone.
Now I have another one.
It is possible to do this?
--------JAVA CODE--BEGIN-------------
from(cxfRouter).to(activeMQQueue);
from(activeMQQueue).to(cxfService);
--------JAVA CODE--END-------------
When I try this configuration, I got this exception:
--------EXCEPTION--BEGIN-------------
14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
INFO: Interceptor has thrown exception, unwinding now
java.lang.NullPointerException
at
org.apache.cxf.interceptor.MessageSenderInterceptor.getConduit(MessageSenderInterceptor.java:71)
at
org.apache.cxf.interceptor.MessageSenderInterceptor.handleMessage(MessageSenderInterceptor.java:46)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:95)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240)
at
org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54)
at
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211)
at
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139)
at org.mortbay.jetty.Server.handle(Server.java:313)
at
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506)
at
org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844)
at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726)
at
org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:205)
at
org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396)
at
org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442)
14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
--------EXCEPTION--END-------------
If I try this route:
--------JAVA CODE--BEGIN-------------
from(cxfRouter).to(activeMQQueue).to(cxfService);
--------JAVA CODE--END-------------
The external client is called but the message is note dequeued from the
JMS
queue.
Thank you,
Wilson
willem.jiang wrote:
Hi Wilson,
I just traced the code and find a way to fix this issue and it will be
in the trunk soon :)
Willem.
Wilson wrote:
Hi Willem,
Issue created: https://issues.apache.org/activemq/browse/CAMEL-286
Best regards,
Wilson
willem.jiang wrote:
Hi Wilson,
Yes , it must be a camel-cxf component's bug. Could you fill a JIRA[1]
for it ?
[1]http://issues.apache.org/activemq/browse/CAMEL
Willem.
Wilson wrote:
Hi,
I found a workaround for the problem. I added a processor after the
CXF
router endpoint:
-------JAVA-CODE--BEGIN-------------------------------------------
String anyEndpoint = "activemq:pedidos";
from(StringUtils.join(cxfRouterURI)).process(new Processor(){
public void process(Exchange exchange) throws Exception
{
exchange.setOut(exchange.getIn());
}
}).to(anyEndpoint).to(StringUtils.join(cxfServiceURI));
-------JAVA-CODE--END-------------------------------------------
Now it is working.
I think there is a bug in CXF Camel code.
--
Wilson
Wilson wrote:
Hi Willem,
I am using Camel 1.3. I am using Maven to build the project. Today I
updated to the snapshot released on 2008-01-09 but the problem is
still
there.
Thank you.
Wilson
willem.jiang wrote:
Hi Wilson,
Which version of Camel are you using?
Please try out Camel 1.3 snapshot which fixed a camel-cxf
component's
issue which can't pass the exchange back form the cxf producer.
Willem.
Wilson wrote:
Hi Willem,
Thank you for your reply!
I changed my code in order to use a SEI to describe the Web
Service.
Now
it
is working fine but I am having problems when I add an endpoint
between
the
cxf endpoints.
The (working) code looks like this:
--------------------------------------------------------
package com.tc.eai;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.Main;
import org.apache.commons.lang.StringUtils;
public class MyRouteBuilder extends RouteBuilder {
private static String ROUTER_ADDRESS =
"http://localhost:9003/pedidoService";
private static String DATA_FORMAT = "dataFormat=MESSAGE";
private static String SERVICE_NAME =
"serviceName=%7bhttp://www.tc.com/pedido%7dpedido";
private static String SERVICE_CLASS =
"serviceClass=com.tc.eai.PedidoService";
private static String PORT_NAME =
"portName=%7bhttp://www.tc.com/pedido%7dpedidoSOAP";
private static String SERVICE_ADDRESS =
"http://localhost:9000/pedidoService";
/**
* A main() so we can easily run these routing rules in our
IDE
*/
public static void main(String... args) {
Main.main(args);
}
/**
* Lets configure the Camel routing rules using Java code...
*/
public void configure() {
//-- Router receives requests from external clients and send
to
channel
String[] cxfRouterURI = {
"cxf://"
,ROUTER_ADDRESS
,"?"
,DATA_FORMAT
,"&"
,SERVICE_NAME
,"&"
,SERVICE_CLASS
,"&"
,PORT_NAME
};
//-- Service points to external web service. The request
routed
by
the
Router
//-- is send to the external service provider
String[] cxfServiceURI = {
"cxf://"
,SERVICE_ADDRESS
,"?"
,DATA_FORMAT
,"&"
,SERVICE_NAME
,"&"
,SERVICE_CLASS
,"&"
,PORT_NAME
};
from(StringUtils.join(cxfRouterURI)).to(StringUtils.join(cxfServiceURI));
}
}
--------------------------------------------------------
When change the route this way:
String anyEndpoint = "log:org.apache.camel?level=DEBUG";
from(StringUtils.join(cxfRouterURI)).to(anyEndpoint).to(StringUtils.join(cxfServiceURI));
I get this error:
------Exception Begin----------------------------
09/01/2008 18:32:40 org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
INFO: Interceptor has thrown exception, unwinding now
org.apache.cxf.interceptor.Fault
at
org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:43)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.invoker.CxfClient.invokeWithMessageStream(CxfClient.java:137)
at
org.apache.camel.component.cxf.invoker.CxfClient.dispatch(CxfClient.java:89)
at
org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:202)
at
org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:152)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsynProcessorBridge.process(AsyncProcessorTypeConverter.java:44)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:73)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:143)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:87)
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:85)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:40)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:44)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:68)
at
org.apache.camel.component.cxf.CamelInvoker.invoke(CamelInvoker.java:71)
at
org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:65)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240)
at
org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54)
at
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211)
at
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139)
at org.mortbay.jetty.Server.handle(Server.java:313)
at
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506)
at
org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844)
at
org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726)
at
org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:211)
at
org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396)
at
org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442)
Caused by: java.lang.NullPointerException
at
org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1025)
at org.apache.commons.io.IOUtils.copy(IOUtils.java:999)
at
org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:39)
... 32 more
------Exception End----------------------------
The same problem happens with seda and activemq endpoints.
Thank you,
Wilson
Hi Wilson,
I am afraid you need to provide the SEI class for camel-cxf
component
to initiate the endpoint.
If you do not want to unmarshal the request message , I think you
could
use soap message or raw message data formate.
Now I am thinking to do some refactoring work to enable the web
services endpoint without SEI class.
Willem.