Author: dejanb
Date: Fri Aug 31 12:53:53 2012
New Revision: 1379433

URL: http://svn.apache.org/viewvc?rev=1379433&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3986 - respect prefetch values for 
MDBs

Modified:
    
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java

Modified: 
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
--- 
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
 (original)
+++ 
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
 Fri Aug 31 12:53:53 2012
@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
+import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -97,67 +98,76 @@ public class ActiveMQEndpointWorker {
             public void run() {
                 currentReconnectDelay = INITIAL_RECONNECT_DELAY;
                 MessageActivationSpec activationSpec = 
endpointActivationKey.getActivationSpec();
-                if ( LOG.isInfoEnabled() ) {
+                if (LOG.isInfoEnabled()) {
                     LOG.info("Establishing connection to broker [" + 
adapter.getInfo().getServerUrl() + "]");
                 }
 
-                while ( connecting.get() && running ) {
-                try {
-                    connection = adapter.makeConnection(activationSpec);
-                    connection.setExceptionListener(new ExceptionListener() {
-                        public void onException(JMSException error) {
-                            if (!serverSessionPool.isClosing()) {
+                while (connecting.get() && running) {
+                    try {
+                        connection = adapter.makeConnection(activationSpec);
+                        connection.setExceptionListener(new 
ExceptionListener() {
+                            public void onException(JMSException error) {
+                                if (!serverSessionPool.isClosing()) {
                                     // initiate reconnection only once, i.e. 
on initial exception
                                     // and only if not already trying to 
connect
                                     LOG.error("Connection to broker failed: " 
+ error.getMessage(), error);
-                                    if ( connecting.compareAndSet(false, true) 
) {
-                                        synchronized ( connectWork ) {
+                                    if (connecting.compareAndSet(false, true)) 
{
+                                        synchronized (connectWork) {
                                             disconnect();
                                             
serverSessionPool.closeIdleSessions();
                                             connect();
-                            }
+                                        }
                                     } else {
                                         // connection attempt has already been 
initiated
                                         LOG.info("Connection attempt already 
in progress, ignoring connection exception");
-                        }
+                                    }
                                 }
                             }
-                    });
+                        });
                         connection.start();
 
-                        int prefetchSize = 
activationSpec.getMaxMessagesPerSessionsIntValue() * 
activationSpec.getMaxSessionsIntValue();
-                    if (activationSpec.isDurableSubscription()) {
+                        if (activationSpec.isDurableSubscription()) {
                             consumer = 
connection.createDurableConnectionConsumer(
                                     (Topic) dest,
-                                    activationSpec.getSubscriptionName(), 
+                                    activationSpec.getSubscriptionName(),
                                     
emptyToNull(activationSpec.getMessageSelector()),
-                                    serverSessionPool, 
-                                    prefetchSize,
+                                    serverSessionPool,
+                                    
connection.getPrefetchPolicy().getDurableTopicPrefetch(),
                                     activationSpec.getNoLocalBooleanValue());
-                    } else {
+                        } else {
                             consumer = connection.createConnectionConsumer(
-                                    dest, 
-                                    
emptyToNull(activationSpec.getMessageSelector()), 
-                                    serverSessionPool, 
-                                    prefetchSize,
-                                                                       
activationSpec.getNoLocalBooleanValue());
-                    }
+                                    dest,
+                                    
emptyToNull(activationSpec.getMessageSelector()),
+                                    serverSessionPool,
+                                    getPrefetch(activationSpec, connection, 
dest),
+                                    activationSpec.getNoLocalBooleanValue());
+                        }
 
 
-                        if ( connecting.compareAndSet(true, false) ) {
-                            if ( LOG.isInfoEnabled() ) {
+                        if (connecting.compareAndSet(true, false)) {
+                            if (LOG.isInfoEnabled()) {
                                 LOG.info("Successfully established connection 
to broker [" + adapter.getInfo().getServerUrl() + "]");
                             }
                         } else {
                             LOG.error("Could not release connection lock");
                         }
-                } catch (JMSException error) {
-                        if ( LOG.isDebugEnabled() ) {
+                    } catch (JMSException error) {
+                        if (LOG.isDebugEnabled()) {
                             LOG.debug("Failed to connect: " + 
error.getMessage(), error);
-                }
+                        }
                         disconnect();
                         pause(error);
+                    }
+                }
             }
+
+            private int getPrefetch(MessageActivationSpec activationSpec, 
ActiveMQConnection connection, ActiveMQDestination destination) {
+                if (destination.isTopic()) {
+                    return connection.getPrefetchPolicy().getTopicPrefetch();
+                } else if (destination.isQueue()) {
+                    return connection.getPrefetchPolicy().getQueuePrefetch();
+                } else {
+                    return activationSpec.getMaxMessagesPerSessionsIntValue() 
* activationSpec.getMaxSessionsIntValue();
                 }
             }
             

Modified: 
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
--- 
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java 
(original)
+++ 
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java 
Fri Aug 31 12:53:53 2012
@@ -48,7 +48,12 @@ import javax.transaction.xa.Xid;
 
 import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
 
 public class MDBTest extends TestCase {
 
@@ -133,10 +138,14 @@ public class MDBTest extends TestCase {
 
         ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
         Connection connection = factory.createConnection();
+        connection.start();
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
+        MessageConsumer advisory = 
session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new 
ActiveMQQueue("TEST")));
+
         ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
         adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.setQueuePrefetch(1);
         adapter.start(new StubBootstrapContext());
 
         final CountDownLatch messageDelivered = new CountDownLatch(1);
@@ -168,16 +177,17 @@ public class MDBTest extends TestCase {
         // Activate an Endpoint
         adapter.endpointActivation(messageEndpointFactory, activationSpec);
 
-        // Give endpoint a chance to setup and register its listeners
-        try {
-            Thread.sleep(1000);
-        } catch (Exception e) {
-
+        ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+        if (msg != null) {
+            assertEquals("Prefetch size hasn't been set", 1, 
((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
+        } else {
+            fail("Consumer hasn't been created");
         }
 
         // Send the broker a message to that endpoint
         MessageProducer producer = session.createProducer(new 
ActiveMQQueue("TEST"));
         producer.send(session.createTextMessage("Hello!"));
+
         connection.close();
 
         // Wait for the message to be delivered.


Reply via email to