[ 
https://issues.apache.org/activemq/browse/AMQ-2048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagath Vijayan Janakiraman updated AMQ-2048:
--------------------------------------------

    Issue Type: Bug  (was: Test)

> Messages are read from physical queue of virtual topic but not removed.  
> "Messages Received" count remains zero.
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2048
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2048
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.1.0
>         Environment: Microsoft Windows Server 2003 - Standard x64 Edition - 
> Service Pack 2
> Intel Xeon 1.60 GHz, 8 GB RAM
> ActiveMQ 5.1.0 JMS Message Broker
>            Reporter: Jagath Vijayan Janakiraman
>         Attachments: TestQueueAcknowledgement.java
>
>
> Virtual Topic: VirtualTopic.AckTest
> Physical Queue of Virtual Topic: 
> Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest
> Consumer is connected to Openwire TCP connector and I am posting messages 
> directly to the virtual topic.
> Even if message.acknowledge() is called, messages are read but not removed 
> from queue.  "Messages Received" count remains at zero.  Is this the expected 
> behavior while using Virtual Topics?
> Test Consumer:
> import java.util.Properties;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.QueueConnection;
> import javax.jms.QueueSession;
> import javax.jms.TextMessage;
> import javax.naming.InitialContext;
> public class TestQueueAcknowledgement {
>       private QueueConnection conn;
>       private QueueSession session;
>       
>       public void listen() {
>               try {
>                       Properties props = new Properties();
>                       props.put("java.naming.factory.initial", 
>                                       
> "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>                       props.put("java.naming.provider.url", 
>                                       "tcp://localhost:61616");
>                       props.put("queue.queueName", 
>                                       
> "Consumer.TestQueueAcknowledgement.VirtualTopic.AckTest");
>                       
>                       javax.naming.Context ctx = new InitialContext(props);
>                       javax.jms.QueueConnectionFactory factory 
>                               = (javax.jms.QueueConnectionFactory) 
>                               ctx.lookup("ConnectionFactory");
>                       conn = factory.createQueueConnection();
>                       final javax.jms.Queue queue 
>                               = (javax.jms.Queue) ctx.lookup("queueName");
>                       session = conn.createQueueSession(false, 
>                                       QueueSession.AUTO_ACKNOWLEDGE);
>                       javax.jms.QueueReceiver receiver = 
> session.createReceiver(queue);
>                       
>                       receiver.setMessageListener(new MessageListener() {
>                               public void onMessage(Message message) {
>                                       try {
>                                   if (message instanceof TextMessage) {
>                                       TextMessage txtMsg = (TextMessage) 
> message;
>                                   String msg = txtMsg.getText();
>                                   System.out.println(msg);
>                                   }
>                                               message.acknowledge();
>                                       } catch (Exception e) {
>                                               e.printStackTrace();
>                                       }
>                               }
>                       });
>                       conn.start();
>               } catch (Exception e) {
>                       e.printStackTrace();
>               }
>       }
>       protected void finalize() throws Throwable {
>               if (session != null) {
>                       session.close();
>               }
>       }
>       
>       public static void main(String[] args) {
>               TestQueueAcknowledgement ack = new TestQueueAcknowledgement();
>               ack.listen();
>       }
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to