On Sun, Jul 31, 2011 at 7:42 PM, Christian Müller <christian.muel...@gmail.com> wrote: > Hello Claus! > > As you know, the redelivery logic is part of Apache ActiveMQ, but we cannot > use it in Camel in the way I would like it. To only have one redelivery > policy in place is not sufficient for us (if you configure the redelivery > policy in your ActiveMQConnectionFactory). We have multiple applications > sharing the same broker with different requirements (online, batch, ...). > > I know it's also possible to configure it on the ActiveMQConnection [1], but > as I know we don't have this possibility in Camel right now. Correct me if > I'm wrong. May be this is a better solution for my needs, when we have to > possibilities to configure the redelivery policy as an option in the > ActiveMqComponent: > > from("activemq:queue:foo?redeliveryPolicy=#redeliveryPolicy")... > > or > > from("activemq:queue:foo?initialRedeliveryDelay=500&maximumRedeliveries=5&...")... > > But for this, I think I have to open a JIRA for ActiveMQ, right? > > [1] http://activemq.apache.org/message-redelivery-and-dlq-handling.html >
You can setup a number of activemq components, one for each that has different redelivery settings As show here: http://camel.apache.org/activemq Then just give the <bean> a different id, and have the brokerURL with the redelivery settings. <bean id="activemq" ...> </bean> <bean id="activemq-batch" ...> </bean> <bean id="activemq-online" ...> </bean> Then in the Camel routes you pick the id you want to use <from uri="activemq-online:queue:foo"/> ... > Best, > Christian > > On Sun, Jul 31, 2011 at 2:20 PM, Claus Ibsen <claus.ib...@gmail.com> wrote: > >> Frankly I thing the redelivery logic should be part of Apache ActiveMQ. >> >> Your solution is brittle in the fact that you consume the message, and >> then send it back to the broker. What happens if you cannot send the >> message back to the broker? >> It would be better if the broker handled all this out of the box. >> >> ActiveMQ already has redelivery policy where you can configure various >> delays options such as exponential backoff etc. >> >> And there is a JIRA ticket for ActiveMQ to support asynchronous >> scheduled redeliveries. >> https://issues.apache.org/jira/browse/AMQ-1853 >> Which has the potential for consumers to pickup the next message, and >> not block waiting to issue the redelivery. >> >> >> >> On Sun, Jul 31, 2011 at 1:49 PM, Christian Müller >> <christian.muel...@gmail.com> wrote: >> > Hello! >> > >> > I didn't get so much responses as I hoped. May it was not clear what I >> try >> > to do or what I miss at present in Camel. >> > Therefore a build a small unit test (and attached it) to demonstrate what >> > I'm looking for (for convenience I also paste the code into this mail at >> the >> > end). >> > >> > I build my own (simple) redelivery processor which I use to re-enqueue >> > messages into ActiveMQ after the default Camel error handler was kicked >> in >> > and catched the exception. I use the same logic as Camel does (but much >> more >> > simpler for this test) to trace the redelivery delay and count. But >> instead >> > to wait for the retry, I put the message message into ActiveMQ for >> > redelivery and set the "AMQ_SCHEDULED_DELAY" header which is used by >> > ActiveMQ to delay the delivery of the message: >> > >> > public class RedeliveryTest extends CamelTestSupport { >> > >> > @EndpointInject(uri = "mock:intercept") >> > private MockEndpoint intercept; >> > >> > @EndpointInject(uri = "mock:dlq") >> > private MockEndpoint dlq; >> > >> > @Before >> > public void setUp() throws Exception { >> > disableJMX(); >> > >> > super.setUp(); >> > } >> > >> > @Test >> > public void redelieryTest() throws Exception { >> > context.addRoutes(new RouteBuilder() { >> > @Override >> > public void configure() throws Exception { >> > from("activemq:queue:dlq") >> > .to("mock:dlq"); >> > } >> > }); >> > >> > intercept.expectedMessageCount(6); // 1 + 5 redeliveries >> > dlq.expectedMessageCount(1); >> > >> > template.sendBody("activemq:queue:start", "Hello Camel!"); >> > >> > intercept.assertIsSatisfied(); >> > dlq.assertIsSatisfied(); >> > >> > Exchange exchange = dlq.getExchanges().get(0); >> > assertEquals(new Long(5), >> > exchange.getIn().getHeader("CamelActiveMqRedeliveryCounter")); >> > assertEquals(new Long(3200l), >> > exchange.getIn().getHeader("CamelActiveMqRedeliveryDelay")); >> > } >> > >> > @Override >> > protected JndiRegistry createRegistry() throws Exception { >> > JndiRegistry registry = super.createRegistry(); >> > >> > ActiveMQComponent activemq = >> > >> ActiveMQComponent.activeMQComponent("vm://localhost?broker.schedulerSupport=true&broker.persistent=false&broker.useJmx=false"); >> > registry.bind("activemq", activemq); >> > >> > return registry; >> > } >> > >> > @Override >> > protected RouteBuilder createRouteBuilder() throws Exception { >> > return new RouteBuilder() { >> > @Override >> > public void configure() throws Exception { >> > context.addInterceptStrategy(new Tracer()); >> > >> > ActiveMqRedeliveryProcessor activeMqRedeliveryProcessor = >> > new ActiveMqRedeliveryProcessor(); >> > >> > >> activeMqRedeliveryProcessor.setRedeliveryEndpoint("activemq:queue:start"); >> > >> > activeMqRedeliveryProcessor.setDeadLetterEndpoint("activemq:queue:dlq"); >> > activeMqRedeliveryProcessor.setRedeliveryDelay(200); >> > activeMqRedeliveryProcessor.setBackOffMultiplier(2.0); >> > activeMqRedeliveryProcessor.setMaximumRedeliveries(5); >> > >> > onException(Exception.class) >> > .handled(true) >> > .bean(activeMqRedeliveryProcessor) >> > .end(); >> > >> > from("activemq:queue:start").routeId("main") >> > .to("mock:intercept") >> > .throwException(new Exception("forced Exception for >> > test!")); >> > } >> > }; >> > } >> > } >> > >> > And the ActiveMqRedeliveryProcessor is: >> > >> > public class ActiveMqRedeliveryProcessor { >> > >> > private String redeliveryEndpoint; >> > private String deadLetterEndpoint; >> > >> > private long redeliveryDelay = 0l; >> > private double backOffMultiplier = 1; >> > private int maximumRedeliveries = 0; >> > >> > public void process(Exchange exchange) throws Exception { >> > JmsMessage message = exchange.getIn(JmsMessage.class); >> > >> > Long delay = message.getHeader("CamelActiveMqRedeliveryDelay", >> > Long.class); >> > Long redeliveryCount = >> > message.getHeader("CamelActiveMqRedeliveryCounter", Long.class); >> > >> > if (redeliveryCount == null) { >> > redeliveryCount = new Long(0); >> > } >> > >> > ProducerTemplate template = new >> > DefaultProducerTemplate(exchange.getContext()); >> > template.start(); >> > >> > if (redeliveryCount < maximumRedeliveries) { >> > redeliveryCount = new Long(redeliveryCount + 1); >> > message.setHeader("CamelActiveMqRedeliveryCounter", >> > redeliveryCount); >> > >> > if (delay == null) { >> > delay = new Long(redeliveryDelay); >> > } else { >> > delay = new Long((long) (delay * backOffMultiplier)); >> > } >> > message.setHeader("scheduledJobId", null); >> > message.setHeader("CamelActiveMqRedeliveryDelay", delay); >> > message.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, >> delay); >> > >> > template.send(redeliveryEndpoint, exchange); >> > } else { >> > template.send(deadLetterEndpoint, exchange); >> > } >> > template.stop(); >> > } >> > >> > public void setRedeliveryDelay(long redeliveryDelay) { >> > this.redeliveryDelay = redeliveryDelay; >> > } >> > >> > public void setBackOffMultiplier(double backOffMultiplier) { >> > this.backOffMultiplier = backOffMultiplier; >> > } >> > >> > public void setMaximumRedeliveries(int maximumRedeliveries) { >> > this.maximumRedeliveries = maximumRedeliveries; >> > } >> > >> > public void setRedeliveryEndpoint(String redeliveryEndpoint) { >> > this.redeliveryEndpoint = redeliveryEndpoint; >> > } >> > >> > public void setDeadLetterEndpoint(String deadLetterEndpoint) { >> > this.deadLetterEndpoint = deadLetterEndpoint; >> > } >> > } >> > >> > I'm looking for a better integration into Camel for this feature, if >> other >> > people also think this is a common use case (let ActiveMQ handle the >> > redelivery instead of having long live inflight messages if the delay is >> > more than a few minutes). This integration could looks like: >> > >> > errorHandler( >> > activeMqDeadLetterChannel("activemq:queue:FOO", "activemq:queue:DLQ") >> > .maximumRedeliveries(8) >> > .deliveryDelay(60000) >> > .useExponentialBackOff() >> > .backOffMultiplier(2)); >> > Where "activemq:queue:FOO" is the queue for redelivery and >> > "activemq:queue:DLQ" is the dead letter queue. >> > >> > I'm really interested in your opinions whether this is also useful for >> other >> > people or is this only a special requirement from my site. >> > >> > Best, >> > Christian >> > >> >> >> >> -- >> Claus Ibsen >> ----------------- >> FuseSource >> Email: cib...@fusesource.com >> Web: http://fusesource.com >> Twitter: davsclaus, fusenews >> Blog: http://davsclaus.blogspot.com/ >> Author of Camel in Action: http://www.manning.com/ibsen/ >> > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/