Author: dejanb
Date: Fri Aug 31 12:53:53 2012
New Revision: 1379433
URL: http://svn.apache.org/viewvc?rev=1379433&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3986 - respect prefetch values for
MDBs
Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Modified:
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
---
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
(original)
+++
activemq/trunk/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
Fri Aug 31 12:53:53 2012
@@ -21,6 +21,7 @@ import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
+import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -97,67 +98,76 @@ public class ActiveMQEndpointWorker {
public void run() {
currentReconnectDelay = INITIAL_RECONNECT_DELAY;
MessageActivationSpec activationSpec =
endpointActivationKey.getActivationSpec();
- if ( LOG.isInfoEnabled() ) {
+ if (LOG.isInfoEnabled()) {
LOG.info("Establishing connection to broker [" +
adapter.getInfo().getServerUrl() + "]");
}
- while ( connecting.get() && running ) {
- try {
- connection = adapter.makeConnection(activationSpec);
- connection.setExceptionListener(new ExceptionListener() {
- public void onException(JMSException error) {
- if (!serverSessionPool.isClosing()) {
+ while (connecting.get() && running) {
+ try {
+ connection = adapter.makeConnection(activationSpec);
+ connection.setExceptionListener(new
ExceptionListener() {
+ public void onException(JMSException error) {
+ if (!serverSessionPool.isClosing()) {
// initiate reconnection only once, i.e.
on initial exception
// and only if not already trying to
connect
LOG.error("Connection to broker failed: "
+ error.getMessage(), error);
- if ( connecting.compareAndSet(false, true)
) {
- synchronized ( connectWork ) {
+ if (connecting.compareAndSet(false, true))
{
+ synchronized (connectWork) {
disconnect();
serverSessionPool.closeIdleSessions();
connect();
- }
+ }
} else {
// connection attempt has already been
initiated
LOG.info("Connection attempt already
in progress, ignoring connection exception");
- }
+ }
}
}
- });
+ });
connection.start();
- int prefetchSize =
activationSpec.getMaxMessagesPerSessionsIntValue() *
activationSpec.getMaxSessionsIntValue();
- if (activationSpec.isDurableSubscription()) {
+ if (activationSpec.isDurableSubscription()) {
consumer =
connection.createDurableConnectionConsumer(
(Topic) dest,
- activationSpec.getSubscriptionName(),
+ activationSpec.getSubscriptionName(),
emptyToNull(activationSpec.getMessageSelector()),
- serverSessionPool,
- prefetchSize,
+ serverSessionPool,
+
connection.getPrefetchPolicy().getDurableTopicPrefetch(),
activationSpec.getNoLocalBooleanValue());
- } else {
+ } else {
consumer = connection.createConnectionConsumer(
- dest,
-
emptyToNull(activationSpec.getMessageSelector()),
- serverSessionPool,
- prefetchSize,
-
activationSpec.getNoLocalBooleanValue());
- }
+ dest,
+
emptyToNull(activationSpec.getMessageSelector()),
+ serverSessionPool,
+ getPrefetch(activationSpec, connection,
dest),
+ activationSpec.getNoLocalBooleanValue());
+ }
- if ( connecting.compareAndSet(true, false) ) {
- if ( LOG.isInfoEnabled() ) {
+ if (connecting.compareAndSet(true, false)) {
+ if (LOG.isInfoEnabled()) {
LOG.info("Successfully established connection
to broker [" + adapter.getInfo().getServerUrl() + "]");
}
} else {
LOG.error("Could not release connection lock");
}
- } catch (JMSException error) {
- if ( LOG.isDebugEnabled() ) {
+ } catch (JMSException error) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Failed to connect: " +
error.getMessage(), error);
- }
+ }
disconnect();
pause(error);
+ }
+ }
}
+
+ private int getPrefetch(MessageActivationSpec activationSpec,
ActiveMQConnection connection, ActiveMQDestination destination) {
+ if (destination.isTopic()) {
+ return connection.getPrefetchPolicy().getTopicPrefetch();
+ } else if (destination.isQueue()) {
+ return connection.getPrefetchPolicy().getQueuePrefetch();
+ } else {
+ return activationSpec.getMaxMessagesPerSessionsIntValue()
* activationSpec.getMaxSessionsIntValue();
}
}
Modified:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=1379433&r1=1379432&r2=1379433&view=diff
==============================================================================
---
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
(original)
+++
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
Fri Aug 31 12:53:53 2012
@@ -48,7 +48,12 @@ import javax.transaction.xa.Xid;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
public class MDBTest extends TestCase {
@@ -133,10 +138,14 @@ public class MDBTest extends TestCase {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
Connection connection = factory.createConnection();
+ connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer advisory =
session.createConsumer(AdvisorySupport.getConsumerAdvisoryTopic(new
ActiveMQQueue("TEST")));
+
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
adapter.setServerUrl("vm://localhost?broker.persistent=false");
+ adapter.setQueuePrefetch(1);
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1);
@@ -168,16 +177,17 @@ public class MDBTest extends TestCase {
// Activate an Endpoint
adapter.endpointActivation(messageEndpointFactory, activationSpec);
- // Give endpoint a chance to setup and register its listeners
- try {
- Thread.sleep(1000);
- } catch (Exception e) {
-
+ ActiveMQMessage msg = (ActiveMQMessage)advisory.receive(1000);
+ if (msg != null) {
+ assertEquals("Prefetch size hasn't been set", 1,
((ConsumerInfo)msg.getDataStructure()).getPrefetchSize());
+ } else {
+ fail("Consumer hasn't been created");
}
// Send the broker a message to that endpoint
MessageProducer producer = session.createProducer(new
ActiveMQQueue("TEST"));
producer.send(session.createTextMessage("Hello!"));
+
connection.close();
// Wait for the message to be delivered.