Hello,
I tired to create priority queue with Camel inside Service Mix, but
after
two days I stucked.
General idea is:
1) one cxf endpoint on Service Mix have priority 0
2) second cxf endpoint on Service Mix have priority 9
3) destination service can process only one message at time
I want configure only CXF and Camel endpoints, without jms queue.
My routes:
package pl.bpsa.smx3.services.router;
import javax.xml.transform.dom.DOMSource;
import org.apache.camel.builder.RouteBuilder;
public class RoutesProvider extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jbi:service:http://stock/high") // important messages
.to("jms:queue:foo?priority=9");
from("jbi:service:http://stock/low")
.to("jms:queue:foo?priority=0");
from("jms:queue:foo")
.to("jbi:service:http://stock/destination");
// endpoint only for testing
from("jbi:service:http://stock/destination")
.to("file:log");
}
}
All process are stoped when camel try put message to queue. How to fix
this
issue?
[DEBUG] [org.apache.camel.impl.DefaultCamelContext] Adding routes from:
Routes: [Route[ [From[jbi:service:http://stock/high]] ->
[To[jms:queue:foo?priority=9]]], Route[ [From[jbi:s
ervice:http://stock/low]] -> [To[jms:queue:foo?priority=0]]], Route[
[From[jms:queue:foo]] -> [To[jbi:service:http://stock/destination]]],
Route[
[From[jbi:service:http://stock/des
tination]] -> [To[file:log]]]] routes: []
[DEBUG] [org.apache.camel.spring.SpringCamelContext] Starting the
CamelContext now that the ApplicationContext has started
[DEBUG] [org.apache.camel.impl.DefaultComponentResolver] Found
component:
jbi in registry: [EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext]
jbi:service:http://stock/high converted to endpoint:
Endpoint[service:http://stock/high] by component:
org.apache.servicemix.cam
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponentResolver] Found
component:
jms in registry: [EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponent] Creating endpoint
uri=[jms:queue:foo?priority=9], path=[queue:foo],
parameters=[{priority=9}]
[DEBUG] [org.apache.camel.util.ResolverUtil] Searching for annotations
of
org.apache.camel.Converter in packages:
[org.apache.activemq.camel.converter, org.apache.camel.converter,
org.apache.camel.spring.converter]
[DEBUG] [org.apache.camel.util.ResolverUtil] Loading from jar:
E:\repository_tmp\org\apache\activemq\activemq-core\5.0.0.15-fuse\activemq-core-5.0.0.15-fuse.jar
[DEBUG] [org.apache.camel.util.ResolverUtil] Loading from jar:
E:\repository_tmp\org\apache\camel\camel-core\1.4.0.0-fuse\camel-core-1.4.0.0-fuse.jar
[DEBUG] [org.apache.camel.util.ResolverUtil] Loading from jar:
E:\repository_tmp\org\apache\camel\camel-core\1.4.0.0-fuse\camel-core-1.4.0.0-fuse-tests.jar
[DEBUG] [org.apache.camel.util.ResolverUtil] Found: [class
org.apache.camel.converter.CollectionConverter, class
org.apache.camel.converter.CamelConverter, class org.apache.camel.c
onverter.stream.StreamCacheConverter, class
org.apache.camel.converter.IOConverter, class
org.apache.camel.converter.jaxp.DomConverter, class
org.apache.activemq.camel.converter.Ac
tiveMQMessageConverter, class
org.apache.activemq.camel.converter.ActiveMQConverter, class
org.apache.camel.converter.NIOConverter, class
org.apache.camel.converter.jaxp.XmlConvert
er, class org.apache.camel.converter.jaxp.StaxConverter, class
org.apache.camel.converter.ObjectConverter]
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.CollectionConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.CamelConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class:
org.apache.camel.converter.stream.StreamCacheConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.IOConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.jaxp.DomConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class:
org.apache.activemq.camel.converter.ActiveMQMessageConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class:
org.apache.activemq.camel.converter.ActiveMQConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.NIOConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.jaxp.XmlConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.jaxp.StaxConverter
[DEBUG] [org.apache.camel.impl.converter.AnnotationTypeConverterLoader]
Loading converter class: org.apache.camel.converter.ObjectConverter
[DEBUG] [org.apache.camel.impl.DefaultCamelContext]
jms:queue:foo?priority=9
converted to endpoint: Endpoint[jms:queue:foo?priority=9] by component:
org.apache.camel.component.jms.
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext]
jbi:service:http://stock/low converted to endpoint:
Endpoint[service:http://stock/low] by component:
org.apache.servicemix.camel
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponent] Creating endpoint
uri=[jms:queue:foo?priority=0], path=[queue:foo],
parameters=[{priority=0}]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext]
jms:queue:foo?priority=0
converted to endpoint: Endpoint[jms:queue:foo?priority=0] by component:
org.apache.camel.component.jms.
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponent] Creating endpoint
uri=[jms:queue:foo], path=[queue:foo], parameters=[{}]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext] jms:queue:foo
converted
to endpoint: Endpoint[jms:queue:foo] by component:
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext]
jbi:service:http://stock/destination converted to endpoint:
Endpoint[service:http://stock/destination] by component: org.apache.
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponentResolver] Found
component:
file via type: org.apache.camel.component.file.FileComponent via
META-INF/services/org/apache/camel/compon
ent/file
[DEBUG] [org.apache.camel.impl.DefaultComponent] Creating endpoint
uri=[file:log], path=[log], parameters=[{}]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext] file:log converted
to
endpoint: Endpoint[file:log] by component:
[EMAIL PROTECTED]
[DEBUG] [org.apache.camel.impl.DefaultComponent] Creating endpoint
uri=[event:default], path=[default], parameters=[{}]
[DEBUG] [org.apache.camel.impl.DefaultCamelContext] event:default
converted
to endpoint: Endpoint[event:default] by component:
[EMAIL PROTECTED]
d
test
[DEBUG]
[org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate]
Executing callback on JMS Session: ActiveMQSession
{id=ID:warc-8a0-3554-1220961055917-0:2:1,started=false
}
[DEBUG] [org.apache.camel.component.jms.JmsProducer]
Endpoint[jms:queue:foo?priority=0] sending JMS message:
ActiveMQObjectMessage {commandId = 0, responseRequired = false, message
Id = null, originalDestination = null, originalTransactionId = null,
producerId = null, destination = null, transactionId = null, expiration
=
0,
timestamp = 0, arrival = 0, broker
InTime = 0, brokerOutTime = 0, correlationId = null, replyTo =
temp-queue://ID:warc-8a0-3554-1220961055917-0:1:1, persistent = false,
type
= null, priority = 0, groupID = null, gro
upSequence = 0, targetConsumerId = null, compressed = false, userID =
null,
content = [EMAIL PROTECTED],
marshalledProperties
= null, dataStructure = nul
l, redeliveryCounter = 0, size = 0, properties = null,
readOnlyProperties
=
false, readOnlyBody = false, droppable = false}
[DEBUG]
[org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate]
Sending created message: ActiveMQObjectMessage {commandId = 0,
responseRequired = false, messageId = null
, originalDestination = null, originalTransactionId = null, producerId
=
null, destination = null, transactionId = null, expiration = 0,
timestamp
=
0, arrival = 0, brokerInTime =
0, brokerOutTime = 0, correlationId = null, replyTo =
temp-queue://ID:warc-8a0-3554-1220961055917-0:1:1, persistent = false,
type
= null, priority = 0, groupID = null, groupSequenc
e = 0, targetConsumerId = null, compressed = false, userID = null,
content =
[EMAIL PROTECTED], marshalledProperties =
null,
dataStructure = null, redeli
veryCounter = 0, size = 0, properties = null, readOnlyProperties =
false,
readOnlyBody = false, droppable = false}
[DEBUG] [org.apache.camel.component.jms.JmsProducer] Future timed out:
java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
at
java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:211)
at java.util.concurrent.FutureTask.get(FutureTask.java:85)
at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:202)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:75)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:146)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:90)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:39)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:66)
at
org.apache.servicemix.camel.CamelJbiEndpoint.handleActiveProviderExchange(CamelJbiEndpoint.java:101)
at
org.apache.servicemix.camel.CamelJbiEndpoint.process(CamelJbiEndpoint.java:74)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.doProcess(AsyncBaseLifeCycle.java:540)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.processExchange(AsyncBaseLifeCycle.java:492)
at
org.apache.servicemix.common.BaseLifeCycle.onMessageExchange(BaseLifeCycle.java:46)
at
org.apache.servicemix.jbi.messaging.DeliveryChannelImpl.processInBound(DeliveryChannelImpl.java:610)
at
org.apache.servicemix.jbi.nmr.flow.AbstractFlow.doRouting(AbstractFlow.java:172)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow.doRouting(SedaFlow.java:167)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaQueue$1.run(SedaQueue.java:134)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Thread.java:595)
[ERROR] [org.apache.camel.processor.DeadLetterChannel] Failed delivery
for
exchangeId: ID-warc-8a0/3556-1220961056058/0-0. On delivery attempt: 0
caught: org.apache.camel.ExchangeT
imedOutException: The OUT message was not received within: 20000 millis
on
the exchange: Exchange[JbiMessage:
[EMAIL PROTECTED]
erties: {JMSCorrelationID=ID-warc-8a0/3556-1220961056058/2-0}}]
org.apache.camel.ExchangeTimedOutException: The OUT message was not
received
within: 20000 millis on the exchange: Exchange[JbiMessage:
org.apache.servicemix.jbi.messaging.Normaliz
[EMAIL PROTECTED]:
{JMSCorrelationID=ID-warc-8a0/3556-1220961056058/2-0}}]
at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:221)
at
org.apache.camel.impl.converter.AsyncProcessorTypeConverter$ProcessorToAsyncProcessorBridge.process(AsyncProcessorTypeConverter.java:43)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:75)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:146)
at
org.apache.camel.processor.DeadLetterChannel.process(DeadLetterChannel.java:90)
at
org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:39)
at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:41)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:66)
at
org.apache.servicemix.camel.CamelJbiEndpoint.handleActiveProviderExchange(CamelJbiEndpoint.java:101)
at
org.apache.servicemix.camel.CamelJbiEndpoint.process(CamelJbiEndpoint.java:74)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.doProcess(AsyncBaseLifeCycle.java:540)
at
org.apache.servicemix.common.AsyncBaseLifeCycle.processExchange(AsyncBaseLifeCycle.java:492)
at
org.apache.servicemix.common.BaseLifeCycle.onMessageExchange(BaseLifeCycle.java:46)
at
org.apache.servicemix.jbi.messaging.DeliveryChannelImpl.processInBound(DeliveryChannelImpl.java:610)
at
org.apache.servicemix.jbi.nmr.flow.AbstractFlow.doRouting(AbstractFlow.java:172)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaFlow.doRouting(SedaFlow.java:167)
at
org.apache.servicemix.jbi.nmr.flow.seda.SedaQueue$1.run(SedaQueue.java:134)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
at java.lang.Thread.run(Thread.java:595)
[DEBUG]
[org.apache.camel.processor.exceptionpolicy.DefaultExceptionPolicyStrategy]
Finding best suited exception policy for thrown exception
org.apache.camel.ExchangeTimedOutExcep
tion
[DEBUG]
[org.apache.camel.processor.exceptionpolicy.DefaultExceptionPolicyStrategy]
No candidate found to be used as exception policy
[DEBUG] [org.apache.camel.processor.DeadLetterChannel] Sleeping for:
1000
millis until attempting redelivery
[DEBUG] [org.apache.camel.util.DefaultTimeoutMap] Evicting inactive
request
for correlationID: Entry for key: null
[DEBUG]
[org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate]
Executing callback on JMS Session: ActiveMQSession
{id=ID:warc-8a0-3554-1220961055917-0:3:1,started=false
}
[DEBUG] [org.apache.camel.component.jms.JmsProducer]
Endpoint[jms:queue:foo?priority=0] sending JMS message:
ActiveMQObjectMessage {commandId = 0, responseRequired = false, message
Id = null, originalDestination = null, originalTransactionId = null,
producerId = null, destination = null, transactionId = null, expiration
=
0,
timestamp = 0, arrival = 0, broker
InTime = 0, brokerOutTime = 0, correlationId = null, replyTo =
temp-queue://ID:warc-8a0-3554-1220961055917-0:1:1, persistent = false,
type
= null, priority = 0, groupID = null, gro
upSequence = 0, targetConsumerId = null, compressed = false, userID =
null,
content = [EMAIL PROTECTED],
marshalledProperties
= null, dataStructure = nul
l, redeliveryCounter = 0, size = 0, properties = null,
readOnlyProperties
=
false, readOnlyBody = false, droppable = false}
[DEBUG]
[org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate]
Sending created message: ActiveMQObjectMessage {commandId = 0,
responseRequired = false, messageId = null
, originalDestination = null, originalTransactionId = null, producerId
=
null, destination = null, transactionId = null, expiration = 0,
timestamp
=
0, arrival = 0, brokerInTime =
0, brokerOutTime = 0, correlationId = null, replyTo =
temp-queue://ID:warc-8a0-3554-1220961055917-0:1:1, persistent = false,
type
= null, priority = 0, groupID = null, groupSequenc
e = 0, targetConsumerId = null, compressed = false, userID = null,
content =
[EMAIL PROTECTED], marshalledProperties =
null,
dataStructure = null, redeli
veryCounter = 0, size = 0, properties = null, readOnlyProperties =
false,
readOnlyBody = false, droppable = false}