Le 29/05/2010 17:47, Marnie McCormack a écrit :
What client are you using to consume the messages ? (I'm assuming not java since the accept-mode and acquire-mode settings are not ringing any bells with me !)
I'm using the Java client, but the non-JMS one in the org.apache.qpid.transport package. I know it isn't officially supported by it has some features missing in the JMS client that I need.
If your MessageCount (as opposed to ReceivedMessageCount) is not going down then the messages are likely not being ack'd by the consumer.
The message count and received message count are equals. By reading the AMQP spec I was under the impression that in implicit/pre-acquired mode the acknowledgment wasn't required, and the messages were removed automatically from the queue once they were transfered to the client. Is this correct?
On the java broker, all subscribers to a topic get their own private queue created as a temp queue - how exactly are you subscibing to the queue (whats the queue called?) - is it possible you have multiple subscribers and one of them is not ack-ing ?
There is exactly one producer and one subscriber. The private queue is declared like this:
// declare the queue String queue = username + "-" + System.currentTimeMillis();session.queueDeclare(queue, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); session.messageSubscribe(queue, queue, MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, null, 0, null);
// issue creditsession.messageFlow(queue, MessageCreditUnit.BYTE, Session.UNLIMITED_CREDIT); session.messageFlow(queue, MessageCreditUnit.MESSAGE, Session.UNLIMITED_CREDIT);
// bind a topic to the queue
session.exchangeBind(queue, "amq.topic", "foo.bar", null);
And on the producer side the messages are sent with:
DeliveryProperties deliveryProps = new DeliveryProperties();
deliveryProps.setDiscardUnroutable(true);
deliveryProps.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
deliveryProps.setRoutingKey("foo.bar");
session.messageTransfer("amq.topic", MessageAcceptMode.NONE,
MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps), msg);
Emmanuel Bourg
smime.p7s
Description: S/MIME Cryptographic Signature
