Yes, it is much easy to do with two router for the one way message.
Here is my question about the request and response message mode , how
can we implement it with two router way ?
Willem.
Daniel Kulp wrote:
Reading through this thread, I'm quite confused, but I think, from the
description, that this is really a "two route" problem.
The first route is the one from the exposed endpoint and into the JMS
queue.
The second would be from the JMS queue to the external service.
The first would be unthrottled, accepting the one way messages as fast as
the clients want to send them.
The second could be a single thread thing or something that would just pull
a message one at a time and send to the external service.
I think trying to do this with a single route doesn't make a whole lot of
sense. I think it over complicates things.
Does any of that make sense?
Dan
willem.jiang wrote:
OK , here is the whole story.
When we start up the CXF router endpoint in Camel , we just start a
Camel-CXF consumer which will listen to the address that we specified in
the endpoint URI. Now a CXF client call the router , the request message
will be passed through the CXF transport and some of the interceptors ,
then it will be handled by the CamelInvoker[1] .
The CamelInvoker is connection point of CXF and Camel core. We turn the
CXF message and exchange into the Camel message and exchange in this
place , and call the processor to process the Camel exchange.
Since all the request message will be send to CamelInvoker for further
processing even the oneway message, if we do not introduce the async
processor into the CamelInvoker to process the request message, we
could not get any benefit by the caching the message into the
ActiveMq.
[1]
https://svn.apache.org/repos/asf/activemq/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CamelInvoker.java
Willem.
Hiram Chirino wrote:
Hu? Could you clarify?
On Jan 17, 2008 3:10 AM, Willem Jiang <[EMAIL PROTECTED]> wrote:
Hi,
For the oneway invocation the router processor will be available after
the external webservice is called,
if we don't introduce the async processor in the CamelInvoker.
Willem.
Hiram Chirino wrote:
Hey...
Since those are one way soap operations I don't see why we shouldn't
be able to support that. In my oppinion it makes alot of sense to
send oneways to a JMS queue for persistance and then forward them on
to the final destination.
On Jan 16, 2008 1:47 AM, Willem Jiang <[EMAIL PROTECTED]> wrote:
Hi Wilson,
I don't think you can leverage the JMS queue to achieve the cache the
request for the slow service provider.
Since the CXF component consumer do not use the async processor to
handler the request , the router's
worker thread will drive the processor before it get the extern Web
Services response back , then it will
return the response to the router client.
I think he just want's to hand off the oneway from a cxf consumer to a
oneway jms producer. I don't think any 'camel async processing' is
needed since your not going to wait for a response from JMS.
If you have multi clients to invoke the router service, they will take
up the Jetty bundle thread pool's threads
to handle the request. When all the thread in the thread pool are
busy,
the router client request will be denied.
once again I think this is only the case if request response is being
done.
Camel supports the async processor[1], but because CXF still use the
sync http servlet API , I do not find
a good way to introduce the async processor into the camel-cxf
component.
Implementing the CXF consumer as a camel Async processor would allow
us to do really scaleable request response operations. But I don't
think they are needed to do oneway operations, since you don't need to
wait for a response.
Any thoughts?
Willem.
Wilson wrote:
Hi Willem,
I have a situation where there are a big amount of one way messages
arriving
(via SOAP) from an external client that must be routed to an external
service provider (via SOAP). The service provider may not be able to
consume
the messages at the same speed they are arriving.
I would like to save the arriving messages into a JMS queue. After a
consumer would consume the messages from the queue and call the
external web
service. This way the external client is not blocked by the slow
service
provider.
I am really new to Camel and I am not sure this is the right way to
achieve
this?
Thank you,
Wilson
willem.jiang wrote:
Hi Wilson
The first error is caused by the Exchange that you get from
activeMQQueue misses the CXF message which is came from cxfRouter.
My question why do you want to put the router message into the
activeMQQueue and then put it to the cxfService endpoint.
If you want the request to be put into the activeMQQueue, you could
write your router configuration like this.
--------JAVA CODE--BEGIN-------------
from(cxfRouter).multicast(new UseLatestAggregationStrategy())
.to(activeMQQueue, cxfService);
--------JAVA CODE--END-------------
Hope this will help you :)
Willem.
Wilson wrote:
Hi Willem,
I've got the last 1.3 SPAPSHOT and the problem is gone.
Now I have another one.
It is possible to do this?
--------JAVA CODE--BEGIN-------------
from(cxfRouter).to(activeMQQueue);
from(activeMQQueue).to(cxfService);
--------JAVA CODE--END-------------
When I try this configuration, I got this exception:
--------EXCEPTION--BEGIN-------------
14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
INFO: Interceptor has thrown exception, unwinding now
java.lang.NullPointerException
at
org.apache.cxf.interceptor.MessageSenderInterceptor.getConduit(MessageSenderInterceptor.java:71)
at
org.apache.cxf.interceptor.MessageSenderInterceptor.handleMessage(MessageSenderInterceptor.java:46)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:95)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240)
at
org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54)
at
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211)
at
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139)
at org.mortbay.jetty.Server.handle(Server.java:313)
at
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506)
at
org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844)
at
org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726)
at
org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:205)
at
org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396)
at
org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442)
14/01/2008 15:16:21 org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
--------EXCEPTION--END-------------
If I try this route:
--------JAVA CODE--BEGIN-------------
from(cxfRouter).to(activeMQQueue).to(cxfService);
--------JAVA CODE--END-------------
The external client is called but the message is note dequeued from
the
JMS
queue.
Thank you,
Wilson
willem.jiang wrote:
Hi Wilson,
I just traced the code and find a way to fix this issue and it
will be
in the trunk soon :)
Willem.
Wilson wrote:
Hi Willem,
Issue created:
https://issues.apache.org/activemq/browse/CAMEL-286
Best regards,
Wilson
willem.jiang wrote:
Hi Wilson,
Yes , it must be a camel-cxf component's bug. Could you fill a
JIRA[1]
for it ?
[1]http://issues.apache.org/activemq/browse/CAMEL
Willem.
Wilson wrote:
Hi,
I found a workaround for the problem. I added a processor after
the
CXF
router endpoint:
-------JAVA-CODE--BEGIN-------------------------------------------
String anyEndpoint = "activemq:pedidos";
from(StringUtils.join(cxfRouterURI)).process(new
Processor(){
public void process(Exchange exchange)
throws Exception {
exchange.setOut(exchange.getIn());
}
}).to(anyEndpoint).to(StringUtils.join(cxfServiceURI));
-------JAVA-CODE--END-------------------------------------------
Now it is working.
I think there is a bug in CXF Camel code.
--
Wilson
Wilson wrote:
Hi Willem,
I am using Camel 1.3. I am using Maven to build the project.
Today I
updated to the snapshot released on 2008-01-09 but the problem
is
still
there.
Thank you.
Wilson
willem.jiang wrote:
Hi Wilson,
Which version of Camel are you using?
Please try out Camel 1.3 snapshot which fixed a camel-cxf
component's
issue which can't pass the exchange back form the cxf
producer.
Willem.
Wilson wrote:
Hi Willem,
Thank you for your reply!
I changed my code in order to use a SEI to describe the Web
Service.
Now
it
is working fine but I am having problems when I add an
endpoint
between
the
cxf endpoints.
The (working) code looks like this:
--------------------------------------------------------
package com.tc.eai;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spring.Main;
import org.apache.commons.lang.StringUtils;
public class MyRouteBuilder extends RouteBuilder {
private static String ROUTER_ADDRESS =
"http://localhost:9003/pedidoService";
private static String DATA_FORMAT =
"dataFormat=MESSAGE";
private static String SERVICE_NAME =
"serviceName=%7bhttp://www.tc.com/pedido%7dpedido";
private static String SERVICE_CLASS =
"serviceClass=com.tc.eai.PedidoService";
private static String PORT_NAME =
"portName=%7bhttp://www.tc.com/pedido%7dpedidoSOAP";
private static String SERVICE_ADDRESS =
"http://localhost:9000/pedidoService";
/**
* A main() so we can easily run these routing rules in
our
IDE
*/
public static void main(String... args) {
Main.main(args);
}
/**
* Lets configure the Camel routing rules using Java
code...
*/
public void configure() {
//-- Router receives requests from external clients and
send
to
channel
String[] cxfRouterURI = {
"cxf://"
,ROUTER_ADDRESS
,"?"
,DATA_FORMAT
,"&"
,SERVICE_NAME
,"&"
,SERVICE_CLASS
,"&"
,PORT_NAME
};
//-- Service points to external web service. The
request
routed
by
the
Router
//-- is send to the external service provider
String[] cxfServiceURI = {
"cxf://"
,SERVICE_ADDRESS
,"?"
,DATA_FORMAT
,"&"
,SERVICE_NAME
,"&"
,SERVICE_CLASS
,"&"
,PORT_NAME
};
from(StringUtils.join(cxfRouterURI)).to(StringUtils.join(cxfServiceURI));
}
}
--------------------------------------------------------
When change the route this way:
String anyEndpoint =
"log:org.apache.camel?level=DEBUG";
from(StringUtils.join(cxfRouterURI)).to(anyEndpoint).to(StringUtils.join(cxfServiceURI));
I get this error:
------Exception Begin----------------------------
09/01/2008 18:32:40
org.apache.cxf.phase.PhaseInterceptorChain
doIntercept
INFO: Interceptor has thrown exception, unwinding now
org.apache.cxf.interceptor.Fault
at
org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:43)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.invoker.CxfClient.invokeWithMessageStream(CxfClient.java:137)
at
org.apache.camel.component.cxf.invoker.CxfClient.dispatch(CxfClient.java:89)
at
org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:202)
at
org.apache.camel.component.cxf.CxfProducer.process(CxfProducer.java:152)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsynProcessorBridge.process(AsyncProcessorTypeConverter.java:44)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:73)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:143)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:87)
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:85)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:40)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:44)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:68)
at
org.apache.camel.component.cxf.CamelInvoker.invoke(CamelInvoker.java:71)
at
org.apache.camel.component.cxf.interceptors.AbstractInvokerInterceptor.handleMessage(AbstractInvokerInterceptor.java:65)
at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:207)
at
org.apache.camel.component.cxf.CxfMessageObserver.onMessage(CxfMessageObserver.java:83)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.serviceRequest(JettyHTTPDestination.java:284)
at
org.apache.cxf.transport.http_jetty.JettyHTTPDestination.doService(JettyHTTPDestination.java:240)
at
org.apache.cxf.transport.http_jetty.JettyHTTPHandler.handle(JettyHTTPHandler.java:54)
at
org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712)
at
org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:211)
at
org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139)
at org.mortbay.jetty.Server.handle(Server.java:313)
at
org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:506)
at
org.mortbay.jetty.HttpConnection$RequestHandler.content(HttpConnection.java:844)
at
org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:726)
at
org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:211)
at
org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:381)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:396)
at
org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442)
Caused by: java.lang.NullPointerException
at
org.apache.commons.io.IOUtils.copyLarge(IOUtils.java:1025)
at
org.apache.commons.io.IOUtils.copy(IOUtils.java:999)
at
org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor.handleMessage(RawMessageContentRedirectInterceptor.java:39)
... 32 more
------Exception End----------------------------
The same problem happens with seda and activemq endpoints.
Thank you,
Wilson
Hi Wilson,
I am afraid you need to provide the SEI class for camel-cxf
component
to initiate the endpoint.
If you do not want to unmarshal the request message , I
think you
could
use soap message or raw message data formate.
Now I am thinking to do some refactoring work to enable the
web
services endpoint without SEI class.
Willem.