On 08/17/2017 07:53 AM, nayanateja9 wrote:
Hi All,

I am trying to use connsumer.prefetchSize on destination, I want to limit
the no of messages sent from broker to consumer on destination basis, but it
is not working for me, but when is set "<policyEntry queue="&gt;"
queuePrefetch="2" >" on activemq.xml this is working properly, Please find
the below program I am not acknowledging the messages in CLIENT_ACKNOWLEDGE
, in this case I am receiving more than 2 messages from TEST.QUEUE5 (the
queue has 3 messages), but the below program works fine when (queueprefetch
applied on broker level). Please help

import javax.jms.Message;
import javax.jms.MessageListener;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;


public class Test implements MessageListener{
        
        public static void main(String s[]) throws Exception
        {
                ActiveMQConnection connection =null;
                ActiveMQSession session =null;
                
                try
                {
                        Test test = new Test();
                        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
                        connection = (ActiveMQConnection) 
factory.createConnection();
                        connection.start();
                        session = (ActiveMQSession)
connection.createSession(false,ActiveMQSession.CLIENT_ACKNOWLEDGE);
                        ActiveMQQueue queue = new
ActiveMQQueue("TEST.QUEUE5?consumer.prefetchSize=2");
                        ActiveMQMessageConsumer acmgConsume = 
(ActiveMQMessageConsumer)
session.createConsumer(queue);
                        System.out.println(acmgConsume.getPrefetchNumber());
                        //System.out.println(session.isAsyncDispatch());
                        acmgConsume.setMessageListener(test);
                        
                        //System.out.println(acmgConsume.receive());
                        //System.out.println(acmgConsume.receive());
                        //System.out.println(acmgConsume.receive());
                        
                        //System.out.println(session.isAsyncDispatch());
                        //while(true);
                }catch(Exception e)
                {
                        e.printStackTrace();
                } finally
                {
                        session.close();
                        connection.stop();
                        connection.close();
                }
                
        }

        @Override
        public void onMessage(Message arg0) {
                // TODO Auto-generated method stub
                
                System.out.println(arg0);
        }

}

Output:

2
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:1, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052355, brokerOutTime =
1502970437587, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@282ba1e, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=1},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja}

ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:2, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052356, brokerOutTime =
1502970437587, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@13b6d03, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=2},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja}

ActiveMQTextMessage {commandId = 7, responseRequired = false, messageId =
ID:DA15060505-63172-1502968777734-3:1:1:1:3, originalDestination = null,
originalTransactionId = null, producerId =
ID:DA15060505-63172-1502968777734-3:1:1:1, destination =
queue://TEST.QUEUE5, transactionId = null, expiration = 0, timestamp =
1502969052352, arrival = 0, brokerInTime = 1502969052358, brokerOutTime =
1502970437601, correlationId = , replyTo = null, persistent = false, type =
, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
org.apache.activemq.util.ByteSequence@f5f2bb7, dataStructure = null,
redeliveryCounter = 10, size = 0, properties = {JMSXMessageCounter=3},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text =
Test Teja}






--
View this message in context: 
http://activemq.2283324.n4.nabble.com/consumer-prefetchSize-option-on-destination-is-not-working-tp4729727.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
.

Support questions should only be directed to the users list, the dev list is where ActiveMQ development discussions are held.


--
Tim Bish

Reply via email to