Hi You have to use valid JMS selector syntax. This is standard JMS. So google information how to do that with JMS. Its a bit limited what you can do.
For example for ActiveMQ http://activemq.apache.org/selectors.html http://download.oracle.com/javaee/1.4/api/javax/jms/Message.html On Wed, Sep 8, 2010 at 10:30 AM, RomualdoGobbo <romualdo.go...@newlog.it> wrote: > > Hi Claus, > thank you very much for your suggestions. > I've completely reviewed my code, but unfortunately I'm making some > mistakes, but I don't find it: the message body don't return to the main > context, as you can see in the code below, due to exception error. > > You can find below the new camelContext.xml and the new bean > TelelinkCommandBean.java followed by the error dump. > > Thank a lot for your help > > Best Regards, > Romualdo > > ------------------------------------------------------------------------------> > camelContext.xml: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > TelelinkCommandBean.java: > > // TelelinkCommandBean.java > > package newlog.camel; > > import org.apache.camel.*; > > public class TelelinkCommandBean { > private ProducerTemplate producer; > private ConsumerTemplate consumer; > > public void setConsumer(ConsumerTemplate consumer) { > this.consumer = consumer; > } > > public void setProducer(ProducerTemplate producer) { > this.producer = producer; > } > > public void getTelelinkCommand(Exchange exchange) { > String body = null; > String bodyTelelink = null; > String command = null; > String telelink = null; > > // receive the message from payload exchange > body = exchange.getIn().getBody(String.class); > telelink = body.substring(5, 14); > System.out.println("after receiving payload: telelink ID = " + > telelink); > > > // loop to empty queue > while (true) { > > // receive the message from the COMMAND queue > bodyTelelink = > consumer.receiveBody("jms:toTelelinkQ.filter(body().contains("+ telelink > +"))", String.class); > if (bodyTelelink == null) { > // no more messages in queue > break; > } > > if (bodyTelelink.contains("#")) { > System.out.println("OK " + telelink + "=" + > bodyTelelink); > command = bodyTelelink.substring(10); > } else { > System.out.println("NO OK " + telelink + "<>" > + bodyTelelink); > command = bodyTelelink; > } > // send it out the command > exchange.getIn().setBody(command); > } > } > } > > Error received at run-time: > > [INFO] Classpath = [file:/C:/Documents and > Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-spring/2.4.0/camel-spring-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-core/2.4.0/camel-core-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/commons-logging/commons-logging-api/1.1/commons-logging-api-1.1.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/fusesource/commonman/commons-management/1.0/commons-management-1.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-context/3.0.3.RELEASE/spring-context-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-aop/3.0.3.RELEASE/spring-aop-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-asm/3.0.3.RELEASE/spring-asm-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-beans/3.0.3.RELEASE/spring-beans-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-core/3.0.3.RELEASE/spring-core-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-expression/3.0.3.RELEASE/spring-expression-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-tx/3.0.3.RELEASE/spring-tx-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/mina/mina-core/1.1.7/mina-core-1.1.7.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/slf4j/slf4j-api/1.6.0/slf4j-api-1.6.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-saxon/2.4.0/camel-saxon-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/net/sf/saxon/saxon/9.1.0.8/saxon-9.1.0.8.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/net/sf/saxon/saxon-dom/9.1.0.8/saxon-dom-9.1.0.8.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/net/sf/saxon/saxon-sql/9.1.0.8/saxon-sql-9.1.0.8.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/net/sf/saxon/saxon-xpath/9.1.0.8/saxon-xpath-9.1.0.8.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-mina/2.4.0/camel-mina-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-netty/2.4.0/camel-netty-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/jboss/netty/netty/3.2.1.Final/netty-3.2.1.Final.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-stream/2.4.0/camel-stream-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/camel/camel-jms/2.4.0/camel-jms-2.4.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/spring-jms/3.0.3.RELEASE/spring-jms-3.0.3.RELEASE.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-jms_1.1_spec/1.1.1/geronimo-jms_1.1_spec-1.1.1.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/activemq/activemq-core/5.3.2/activemq-core-5.3.2.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/activemq/activeio-core/3.1.2/activeio-core-3.1.2.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.1_spec/1.0.1/geronimo-j2ee-management_1.1_spec-1.0.1.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/activemq/kahadb/5.3.2/kahadb-5.3.2.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/activemq/protobuf/activemq-protobuf/1.0/activemq-protobuf-1.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/osgi/org.osgi.core/4.1.0/org.osgi.core-4.1.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-core/1.2.0/spring-osgi-core-1.2.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/springframework/osgi/spring-osgi-io/1.2.0/spring-osgi-io-1.2.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/org/apache/geronimo/specs/geronimo-j2ee-management_1.0_spec/1.1/geronimo-j2ee-management_1.0_spec-1.1.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/commons-net/commons-net/2.0/commons-net-2.0.jar, > file:/C:/Documents and > Settings/Administrator/.m2/repository/log4j/log4j/1.2.16/log4j-1.2.16.jar, > file:/C:/Programmi/Java/jdk1.6.0_20/jre/../lib/tools.jar] > [pache.camel.spring.Main.main()] MainSupport INFO Apache > Camel 2.4.0 starting > [pache.camel.spring.Main.main()] ClassPathXmlApplicationContext INFO > Refreshing > org.springframework.context.support.classpathxmlapplicationcont...@fbcb70: > startup date [Wed Sep 08 10:12:17 CEST 2010]; root of context hierarchy > [pache.camel.spring.Main.main()] XmlBeanDefinitionReader INFO > Loading XML bean definitions from file [C:\Documents and > Settings\Administrator\workspace\nwlCamelTelelinkServer\target\classes\META-INF\spring\camelContext.xml] > [pache.camel.spring.Main.main()] CamelNamespaceHandler INFO > camel-osgi.jar/camel-spring-osgi.jar not detected in classpath > [pache.camel.spring.Main.main()] DefaultListableBeanFactory INFO > Pre-instantiating singletons in > org.springframework.beans.factory.support.defaultlistablebeanfact...@302df5: > defining beans > [producer,consumer,Telelink-context:beanPostProcessor,Telelink-context,myBeanId,jms]; > root of factory hierarchy > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache > Camel 2.4.0 (CamelContext: Telelink-context) is starting > [pache.camel.spring.Main.main()] DefaultCamelContext INFO > Tracing is enabled on CamelContext: Telelink-context > [pache.camel.spring.Main.main()] DefaultCamelContext INFO JMX > enabled. Using ManagedManagementStrategy. > [pache.camel.spring.Main.main()] AnnotationTypeConverterLoader INFO Found > 4 packages with 14 @Converter classes to load > [pache.camel.spring.Main.main()] DefaultTypeConverter INFO Loaded > 148 type converters in 0.311 seconds > [pache.camel.spring.Main.main()] NettyConsumer INFO Netty > consumer bound to: localhost:5000 > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route: > route1 started and consuming from: Endpoint[file://src/data] > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route: > route2 started and consuming from: Endpoint[tcp://localhost:5000] > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route: > route3 started and consuming from: Endpoint[jms://MyQueue] > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Route: > route4 started and consuming from: Endpoint[file://target/outputFiles] > [pache.camel.spring.Main.main()] DefaultCamelContext INFO > Started 4 routes > [pache.camel.spring.Main.main()] DefaultCamelContext INFO Apache > Camel 2.4.0 (CamelContext: Telelink-context) started in 1.432 seconds > [ New I/O server worker #1-1] Tracer INFO > 83cba678-3265-4cd2-9786-f07456fd9621 >>> (route2) from(tcp://localhost:5000) > --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut, > Headers:{CamelNettyRemoteAddress=/127.0.0.1:2287, > camelnettychannelhandlercontext=org.jboss.netty.channel.defaultchannelpipeline$defaultchannelhandlercont...@fdbc27, > CamelNettyMessageEvent=[id: 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] > RECEIVED: > $DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C}, > BodyType:String, > Body:$DCS,550700111,100908081220,1,45.57576,9.39887,255,156.5,123456,0223,0,0,,1,00001,000,12.7,97,315,373C > after receiving payload: telelink ID = 550700111 > [ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed > delivery for exchangeId: 83cba678-3265-4cd2-9786-f07456fd9621. Exhausted > after delivery attempt: 1 caught: java.lang.NullPointerException > java.lang.NullPointerException > at > newlog.camel.TelelinkCommandBean.getTelelinkCommand(TelelinkCommandBean.java:35)[file:/C:/Documents > and > Settings/Administrator/workspace/nwlCamelTelelinkServer/target/classes/:] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method)[:1.6.0_20] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)[:1.6.0_20] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)[:1.6.0_20] > at java.lang.reflect.Method.invoke(Method.java:597)[:1.6.0_20] > at > org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:260)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:164)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:159)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:174)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:290)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:202)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:256)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:143)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.Pipeline.process(Pipeline.java:78)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:99)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:70)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:98)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:89)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:68)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:91)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:85)[camel-core-2.4.0.jar:2.4.0] > at > org.apache.camel.component.netty.handlers.ServerChannelHandler.messageReceived(ServerChannelHandler.java:96)[camel-netty-2.4.0.jar:2.4.0] > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:76)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:754)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:302)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:317)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:299)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:80)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:545)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:540)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:349)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)[netty-3.2.1.Final.jar:] > at > org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)[netty-3.2.1.Final.jar:] > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_20] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_20] > at java.lang.Thread.run(Thread.java:619)[:1.6.0_20] > [ New I/O server worker #1-1] Tracer INFO > 6954e702-b12f-4384-b548-c227910d0344 >>> (route2) from(tcp://localhost:5000) > --> ref:myBeanId method: getTelelinkCommand <<< Pattern:InOut, > Headers:{camelnettychannelhandlercontext=org.jboss.netty.channel.defaultchannelpipeline$defaultchannelhandlercont...@fdbc27, > CamelNettyRemoteAddress=/127.0.0.1:2287, CamelNettyMessageEvent=[id: > 0x008f5944, /127.0.0.1:2287 => /127.0.0.1:5000] RECEIVED: > $DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE}, > BodyType:String, > Body:$DCS,550700111,100908081222,1,45.57527,9.39625,256,156.5,123456,0224,0,0,,1,00001,000,12.7,97,316,D4DE > after receiving payload: telelink ID = 550700111 > [ New I/O server worker #1-1] DefaultErrorHandler ERROR Failed > delivery for exchangeId: 6954e702-b12f-4384-b548-c227910d0344. Exhausted > after delivery attempt: 1 caught: java.lang.NullPointerException > java.lang.NullPointerException.... and more > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Messages-queue-filtered-depending-on-info-received-by-another-queue-tp2805911p2807527.html > Sent from the Camel Development mailing list archive at Nabble.com. > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus