Author: rajdavies
Date: Wed Jul 9 12:14:35 2008
New Revision: 675314
URL: http://svn.apache.org/viewvc?rev=675314&view=rev
Log:
patch from https://issues.apache.org/activemq/browse/AMQ-1848
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=675314&r1=675313&r2=675314&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Jul 9 12:14:35 2008
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -32,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@@ -1152,7 +1150,7 @@
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
- duplexBridge.duplexStart(brokerInfo, info);
+ duplexBridge.duplexStart(this,brokerInfo, info);
LOG.info("Created Duplex Bridge back to " +
info.getBrokerName());
return null;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=675314&r1=675313&r2=675314&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Jul 9 12:14:35 2008
@@ -28,7 +28,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
@@ -120,6 +122,7 @@
private AtomicBoolean started = new AtomicBoolean();
+ private TransportConnection duplexInitiatingConnection;
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration
configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
@@ -127,9 +130,10 @@
this.remoteBroker = remoteBroker;
}
- public void duplexStart(BrokerInfo localBrokerInfo, BrokerInfo
remoteBrokerInfo) throws Exception {
+ public void duplexStart(TransportConnection connection, BrokerInfo
localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
this.localBrokerInfo = localBrokerInfo;
this.remoteBrokerInfo = remoteBrokerInfo;
+ this.duplexInitiatingConnection = connection;
start();
serviceRemoteCommand(remoteBrokerInfo);
}
@@ -381,7 +385,7 @@
LOG.debug("The remote Exception was: " + error, error);
ASYNC_TASKS.execute(new Runnable() {
public void run() {
- ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+ ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
@@ -533,13 +537,17 @@
LOG.debug("The local Exception was:" + error, error);
ASYNC_TASKS.execute(new Runnable() {
public void run() {
- ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+ ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
+ protected Service getControllingService() {
+ return duplexInitiatingConnection != null ? duplexInitiatingConnection
: DemandForwardingBridgeSupport.this;
+ }
+
protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
localBroker.oneway(sub.getLocalInfo());
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java?rev=675314&r1=675313&r2=675314&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
Wed Jul 9 12:14:35 2008
@@ -33,6 +33,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.XATransactionId;
/**
* Used to simulate the recovery that occurs when a broker shuts down.
@@ -235,7 +236,7 @@
Message m = receiveMessage(connection);
assertNull(m);
}
-
+
public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart()
throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TEST");
@@ -403,6 +404,7 @@
assertNoMessagesLeft(connection);
}
+
public void testQueuePersistentCommitedAcksNotLostOnRestart() throws
Exception {
@@ -456,6 +458,8 @@
Message m = receiveMessage(connection);
assertNull(m);
}
+
+
public void testQueuePersistentUncommitedAcksLostOnRestart() throws
Exception {
@@ -512,6 +516,62 @@
assertNoMessagesLeft(connection);
}
+
+ public void testQueuePersistentXAUncommitedAcksLostOnRestart() throws
Exception {
+ int NUMBER = 100;
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ for (int i = 0; i < NUMBER; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Setup the consumer and receive the message.
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.send(consumerInfo);
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+ for (int i = 0; i < NUMBER; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ MessageAck ack = createAck(consumerInfo, m, 1,
MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+ }
+ // Don't commit
+
+ // restart the broker.
+ restartBroker();
+
+ // Setup the consumer and receive the message.
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
+ // All messages should be re-delivered.
+ for (int i = 0; i < NUMBER; i++) {
+ Message m = receiveMessage(connection);
+ assertNotNull(m);
+ }
+
+ assertNoMessagesLeft(connection);
+ }
public static Test suite() {
return suite(RecoveryBrokerTest.class);
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=675314&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
Wed Jul 9 12:14:35 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DuplexNetworkMBeanTest extends TestCase {
+
+ protected static final Log LOG =
LogFactory.getLog(DuplexNetworkMBeanTest.class);
+ protected final int numRestarts = 2;
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName("broker");
+ broker.addConnector("tcp://localhost:61617");
+
+ return broker;
+ }
+
+ protected BrokerService createNetworkedBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName("networkedBroker");
+ broker.addConnector("tcp://localhost:62617");
+ NetworkConnector networkConnector =
broker.addNetworkConnector("static://tcp://localhost:61617");
+ networkConnector.setDuplex(true);
+ return broker;
+ }
+
+ public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
+ BrokerService broker = createBroker();
+ broker.start();
+ assertEquals(1, countMbeans(broker, "Connector"));
+ assertEquals(0, countMbeans(broker, "Connection"));
+ BrokerService networkedBroker = null;
+ for (int i=0; i<numRestarts; i++) {
+ networkedBroker = createNetworkedBroker();
+ networkedBroker.start();
+ assertEquals(1, countMbeans(networkedBroker, "NetworkBridge",
2000));
+ assertEquals(1, countMbeans(broker, "Connection"));
+ networkedBroker.stop();
+ networkedBroker.waitUntilStopped();
+ assertEquals(0, countMbeans(networkedBroker, "stopped"));
+ }
+
+ assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
+ assertEquals(0, countMbeans(networkedBroker, "Connector"));
+ assertEquals(0, countMbeans(networkedBroker, "Connection"));
+ assertEquals(1, countMbeans(broker, "Connector"));
+ broker.stop();
+ }
+
+ public void testMbeanPresenceOnBrokerRestart() throws Exception {
+
+ BrokerService networkedBroker = createNetworkedBroker();
+ networkedBroker.start();
+ assertEquals(1, countMbeans(networkedBroker, "Connector"));
+ assertEquals(0, countMbeans(networkedBroker, "Connection"));
+
+ BrokerService broker = null;
+ for (int i=0; i<numRestarts; i++) {
+ broker = createBroker();
+ broker.start();
+ assertEquals(1, countMbeans(networkedBroker, "NetworkBridge",
5000));
+ assertEquals(1, countMbeans(broker, "Connection"));
+
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
+ assertEquals(1, countMbeans(networkedBroker, "Connector"));
+ assertEquals(0, countMbeans(networkedBroker, "Connection"));
+ assertEquals(0, countMbeans(broker, "Connection"));
+
+ networkedBroker.stop();
+ }
+
+ private int countMbeans(BrokerService broker, String type) throws
Exception {
+ return countMbeans(broker, type, 0);
+ }
+
+ private int countMbeans(BrokerService broker, String type, int timeout)
throws Exception {
+ final long expiryTime = System.currentTimeMillis() + timeout;
+ JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
+ JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
+ MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+
+ Set all = mbsc.queryMBeans(null, null);
+ LOG.info("MBean total=" + all.size());
+ for (Object o : all) {
+ ObjectInstance bean = (ObjectInstance)o;
+ LOG.info(bean.getObjectName());
+ }
+ ObjectName beanName = new ObjectName("org.apache.activemq:BrokerName="
+ + broker.getBrokerName() + ",Type=" + type +",*");
+ Set mbeans = null;
+ do {
+ if (timeout > 0) {
+ Thread.sleep(100);
+ }
+ mbeans = mbsc.queryMBeans(beanName, null);
+ } while (mbeans.isEmpty() && expiryTime > System.currentTimeMillis());
+ return mbeans.size();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
------------------------------------------------------------------------------
svn:eol-style = native