[
https://issues.apache.org/jira/browse/CAMEL-6784?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13790256#comment-13790256
]
Dmytro Puzhay commented on CAMEL-6784:
--------------------------------------
Hi Christian,
thanks for answer, please see below how my code looks like.
In my RouteBuilder constructor:
this.timer = new TimerComponent();
getContext().addComponent("timer", this.timer);
this.amqpComponent = (AMQPComponent)
AMQPComponent.amqpComponent("amqp://admin:admin@localhost/test?brokerlist='tcp://localhost:10315'");
getContext().addComponent("amqp", this.amqpComponent);
this.directComponent = new DirectComponent();
getContext().addComponent("direct", this.directComponent);
In my RouteBuilder configure:
Endpoint timerEndpoint =
this.timer.createEndpoint("timer:MyTimer?delay=0&repeatCount=1");
Endpoint amqpEndpoint =
this.amqpComponent.createEndpoint("amqp:queue:broadcast.MyTopic.MyQueue");
Endpoint directEndpoint =
this.directComponent.createEndpoint("direct:MyDirect");
MyPollingConsumer myConsumer = new MyPollingConsumer(amqpEndpoint,
directEndpoint);
from(timerEndpoint).bean(myConsumer);
from(directEndpoint) ...
MyPollingConsumer:
public class LoaderPollingConsumer
{
private PollingConsumer pollingConsumer = null;
private ProducerTemplate producerTemplate = null;
public LoaderPollingConsumer(Endpoint amqpEndpoint, Endpoint
directEndpoint) throws Exception
{
this.pollingConsumer = amqpEndpoint.createPollingConsumer();
ProducerTemplate directEndpointProducerTemplate =
directEndpoint.getCamelContext().createProducerTemplate();
directEndpointProducerTemplate.setDefaultEndpoint(directEndpoint);
this.producerTemplate = directEndpointProducerTemplate;
}
@Handler
public void onEvent() throws Exception
{
this.producerTemplate.start();
while (true)
{
Exchange exchange = this.pollingConsumer.receiveNoWait();
if (exchange != null)
{
this.producerTemplate.send(exchange);
}
else
{
this.producerTemplate.sendBody(null);
break;
}
}
this.producerTemplate.stop();
}
}
"AMQP component uses the JMS component under the covers (which uses Spring
DMLC)... I don't think you can force the DMLC to do a browse as it's event
driven and a browse is a static snapshot"
I thinks one can read in browsing mode with event-driven route, but can't with
polling consumer, because polling consumer technically makes new connection
with every poll, which simply forces amqp to start from first message.
"Or try using the BrowsableEndpoint like this"
I will try (probably this week) and write back about results.
> PollingConsumer.receiveNoWait() never returns null consuming from
> AMQPComponent endpoint in browsing mode
> ---------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-6784
> URL: https://issues.apache.org/jira/browse/CAMEL-6784
> Project: Camel
> Issue Type: Bug
> Components: camel-amqp
> Affects Versions: 2.10.2
> Reporter: Dmytro Puzhay
>
> I create PollingConsumer for my AMQP queue and consume with receiveNoWait()
> until null is received. It worked nicely in default mode, but in browsing
> mode (queue name suffixed with "{mode: browse}", see
> https://qpid.apache.org/books/0.12/Programming-In-Apache-Qpid/html/ch02s04.html
> "2.4.3.3. browse" for details) my PollingConsumer keeps running infinitely.
--
This message was sent by Atlassian JIRA
(v6.1#6144)