Hi Emmanuel, I'm not familiar with the non-JMS API - but if your MessageCount and ReceivedMessageCount are equal then that indicates that the messages are still on the queue i.e. not ack'd.
Sorry I can't help more, but I'm not up on the logic with the lower level so I don't know how it could/should be used as you're making use of it. Anyone else know ? Thanks, Marnie ps What features do you need out of interest that the JMS API doesn't expose ? Thx On Mon, May 31, 2010 at 9:11 AM, Emmanuel Bourg <[email protected]> wrote: > Le 29/05/2010 17:47, Marnie McCormack a écrit : > > > What client are you using to consume the messages ? (I'm assuming not java >> since the accept-mode and acquire-mode settings are not ringing any bells >> with me !) >> > > I'm using the Java client, but the non-JMS one in the > org.apache.qpid.transport package. I know it isn't officially supported by > it has some features missing in the JMS client that I need. > > > > If your MessageCount (as opposed to ReceivedMessageCount) is not going down >> then the messages are likely not being ack'd by the consumer. >> > > The message count and received message count are equals. By reading the > AMQP spec I was under the impression that in implicit/pre-acquired mode the > acknowledgment wasn't required, and the messages were removed automatically > from the queue once they were transfered to the client. Is this correct? > > > > On the java broker, all subscribers to a topic get their own private queue >> created as a temp queue - how exactly are you subscibing to the queue >> (whats >> the queue called?) - is it possible you have multiple subscribers and one >> of >> them is not ack-ing ? >> > > There is exactly one producer and one subscriber. The private queue is > declared like this: > > // declare the queue > String queue = username + "-" + System.currentTimeMillis(); > session.queueDeclare(queue, null, null, Option.EXCLUSIVE, > Option.AUTO_DELETE); > session.messageSubscribe(queue, queue, MessageAcceptMode.NONE, > MessageAcquireMode.PRE_ACQUIRED, null, 0, null); > > // issue credit > session.messageFlow(queue, MessageCreditUnit.BYTE, > Session.UNLIMITED_CREDIT); > session.messageFlow(queue, MessageCreditUnit.MESSAGE, > Session.UNLIMITED_CREDIT); > > // bind a topic to the queue > session.exchangeBind(queue, "amq.topic", "foo.bar", null); > > > And on the producer side the messages are sent with: > > DeliveryProperties deliveryProps = new DeliveryProperties(); > deliveryProps.setDiscardUnroutable(true); > deliveryProps.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); > deliveryProps.setRoutingKey("foo.bar"); > > session.messageTransfer("amq.topic", MessageAcceptMode.NONE, > MessageAcquireMode.PRE_ACQUIRED, new Header(deliveryProps), msg); > > > Emmanuel Bourg > >
