Author: gtully
Date: Tue Feb 24 14:05:28 2009
New Revision: 747384
URL: http://svn.apache.org/viewvc?rev=747384&view=rev
Log:
apply patch from AMQ-2109, with thanks
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=747384&r1=747383&r2=747384&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Tue Feb 24 14:05:28 2009
@@ -82,18 +82,14 @@
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
- // not matched so create a new one
- // but first, if it's durable - changed set the
- // ConsumerId here - so it won't be removed if the
- // durable subscriber goes away on the other end
- if (info.isDurable() || (info.getDestination().isQueue() &&
!info.getDestination().isTemporary())) {
- info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator
- .getNextSequenceId()));
- }
+
if (info.isDurable()) {
// set the subscriber name to something reproducible
-
info.setSubscriptionName(getSubscriberName(info.getDestination()));
+ // and override the consumerId with something unique so that it
won't
+ // be removed if the durable subscriber (at the other end) goes
away
+ info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator
+ .getNextSequenceId()));
}
info.setSelector(null);
return doCreateDemandSubscription(info);
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=747384&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
Tue Feb 24 14:05:28 2009
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.MalformedURLException;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NetworkBrokerDetachTest extends TestCase {
+
+ private final static String BROKER_NAME = "broker";
+ private final static String REM_BROKER_NAME = "networkedBroker";
+ private final static String QUEUE_NAME = "testQ";
+ private final static int NUM_CONSUMERS = 1;
+
+ protected static final Log LOG =
LogFactory.getLog(NetworkBrokerDetachTest.class);
+ protected final int numRestarts = 3;
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(BROKER_NAME);
+ broker.addConnector("tcp://localhost:61617");
+ NetworkConnector networkConnector =
broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+ networkConnector.setDuplex(false);
+ return broker;
+ }
+
+ protected BrokerService createNetworkedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(REM_BROKER_NAME);
+ broker.addConnector("tcp://localhost:62617");
+ return broker;
+ }
+
+ public void testNetworkedBrokerDetach() throws Exception {
+ BrokerService broker = createBroker();
+ broker.start();
+
+ BrokerService networkedBroker = createNetworkedBroker();
+ networkedBroker.start();
+
+ LOG.info("Creating Consumer on the networked broker ...");
+ // Create a consumer on the networked broker
+ ConnectionFactory consFactory =
createConnectionFactory(networkedBroker);
+ Connection consConn = consFactory.createConnection();
+ Session consSession = consConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ for(int i=0; i<NUM_CONSUMERS; i++) {
+ MessageConsumer consumer =
consSession.createConsumer(consSession.createQueue(QUEUE_NAME));
+ }
+
+
+ Thread.sleep(5000);
+
+ MBeanServerConnection mbsc = getMBeanServerConnection();
+ // We should have 1 consumer for the queue on the local broker
+ Object consumers = getAttribute(mbsc, "Queue", "Destination=" +
QUEUE_NAME, "ConsumerCount");
+ LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : "
+ consumers);
+ assertEquals(1L, ((Long)consumers).longValue());
+
+
+ LOG.info("Stopping Consumer on the networked broker ...");
+ // Closing the connection will also close the consumer
+ consConn.close();
+
+ Thread.sleep(5000);
+
+ // We should have 0 consumer for the queue on the local broker
+ consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME,
"ConsumerCount");
+ LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : "
+ consumers);
+ assertEquals(0L, ((Long)consumers).longValue());
+
+ networkedBroker.stop();
+ networkedBroker.waitUntilStopped();
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ protected ConnectionFactory createConnectionFactory(final BrokerService
broker) throws Exception {
+
+ String url = ((TransportConnector)
broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(url);
+ connectionFactory.setOptimizedMessageDispatch(true);
+ connectionFactory.setCopyMessageOnSend(false);
+ connectionFactory.setUseCompression(false);
+ connectionFactory.setDispatchAsync(false);
+ connectionFactory.setUseAsyncSend(false);
+ connectionFactory.setOptimizeAcknowledge(false);
+ connectionFactory.setWatchTopicAdvisories(true);
+ ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
+ qPrefetchPolicy.setQueuePrefetch(100);
+ qPrefetchPolicy.setTopicPrefetch(1000);
+ connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
+ connectionFactory.setAlwaysSyncSend(true);
+ return connectionFactory;
+ }
+
+ // JMX Helper Methods
+
+ private MBeanServerConnection getMBeanServerConnection() throws
MalformedURLException {
+ final JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
+ MBeanServerConnection mbsc = null;
+ try {
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+ mbsc = jmxc.getMBeanServerConnection();
+
+// // trace all existing MBeans
+// Set<?> all = mbsc.queryMBeans(null, null);
+// LOG.info("Total MBean count=" + all.size());
+// for (Object o : all) {
+// ObjectInstance bean = (ObjectInstance)o;
+// LOG.info(bean.getObjectName());
+// }
+ } catch (Exception ignored) {
+ }
+ return mbsc;
+ }
+
+ private Object getAttribute(MBeanServerConnection mbsc, String type,
String pattern, String attrName) throws Exception {
+ Object obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type,
pattern), attrName);
+ return obj;
+ }
+
+ private ObjectName getObjectName(String brokerName, String type, String
pattern) throws Exception {
+ ObjectName beanName = new ObjectName(
+ "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +","
+ pattern
+ );
+
+ return beanName;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date