Author: dejanb
Date: Wed Dec 29 14:32:37 2010
New Revision: 1053641
URL: http://svn.apache.org/viewvc?rev=1053641&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3107 - advisories for network bridge
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Wed Dec 29 14:32:37 2010
@@ -28,19 +28,8 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.*;
+import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -402,6 +391,44 @@ public class AdvisoryBroker extends Brok
}
}
+ @Override
+ public void networkBridgeStarted(BrokerInfo brokerInfo) {
+ try {
+ if (brokerInfo != null) {
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setBooleanProperty("started", true);
+
+ ActiveMQTopic topic =
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+ ConnectionContext context = new ConnectionContext();
+
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+ context.setBroker(getBrokerService().getBroker());
+ fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to fire network bridge advisory");
+ }
+ }
+
+ @Override
+ public void networkBridgeStopped(BrokerInfo brokerInfo) {
+ try {
+ if (brokerInfo != null) {
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setBooleanProperty("started", false);
+
+ ActiveMQTopic topic =
AdvisorySupport.getNetworkBridgeAdvisoryTopic();
+
+ ConnectionContext context = new ConnectionContext();
+
context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
+ context.setBroker(getBrokerService().getBroker());
+ fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to fire network bridge advisory");
+ }
+ }
+
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic
topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
Wed Dec 29 14:32:37 2010
@@ -47,6 +47,7 @@ public final class AdvisorySupport {
public static final String MESSAGE_CONSUMED_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
public static final String MESSAGE_DLQ_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
public static final String MASTER_BROKER_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "MasterBroker";
+ public static final String NETWORK_BRIDGE_TOPIC_PREFIX =
ADVISORY_TOPIC_PREFIX + "NetworkBridge";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID =
"originBrokerId";
@@ -201,6 +202,10 @@ public final class AdvisorySupport {
return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
}
+ public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
+ return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
+ }
+
public static ActiveMQTopic getFullAdvisoryTopic(Destination destination)
throws JMSException {
return
getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Wed Dec 29 14:32:37 2010
@@ -33,6 +33,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@@ -379,5 +380,9 @@ public interface Broker extends Region,
ThreadPoolExecutor getExecutor();
+ void networkBridgeStarted(BrokerInfo brokerInfo);
+
+ void networkBridgeStopped(BrokerInfo brokerInfo);
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Wed Dec 29 14:32:37 2010
@@ -40,6 +40,7 @@ import org.apache.activemq.command.Remov
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.network.NetworkBridge;
import org.apache.activemq.store.kahadb.plist.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
@@ -310,4 +311,12 @@ public class BrokerFilter implements Bro
public ThreadPoolExecutor getExecutor() {
return next.getExecutor();
}
+
+ public void networkBridgeStarted(BrokerInfo brokerInfo) {
+ next.networkBridgeStarted(brokerInfo);
+ }
+
+ public void networkBridgeStopped(BrokerInfo brokerInfo) {
+ next.networkBridgeStopped(brokerInfo);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Wed Dec 29 14:32:37 2010
@@ -282,6 +282,12 @@ public class EmptyBroker implements Brok
public void nowMasterBroker() {
}
+ public void networkBridgeStarted(BrokerInfo brokerInfo) {
+ }
+
+ public void networkBridgeStopped(BrokerInfo brokerInfo) {
+ }
+
public void processConsumerControl(ConsumerBrokerExchange consumerExchange,
ConsumerControl control) {
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Wed Dec 29 14:32:37 2010
@@ -312,4 +312,12 @@ public class ErrorBroker implements Brok
public ThreadPoolExecutor getExecutor() {
throw new BrokerStoppedException(this.message);
}
+
+ public void networkBridgeStarted(BrokerInfo brokerInfo) {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public void networkBridgeStopped(BrokerInfo brokerInfo) {
+ throw new BrokerStoppedException(this.message);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Wed Dec 29 14:32:37 2010
@@ -322,4 +322,11 @@ public class MutableBrokerFilter impleme
return getNext().getExecutor();
}
+ public void networkBridgeStarted(BrokerInfo brokerInfo) {
+ getNext().networkBridgeStarted(brokerInfo);
+ }
+
+ public void networkBridgeStopped(BrokerInfo brokerInfo) {
+ getNext().networkBridgeStopped(brokerInfo);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1053641&r1=1053640&r2=1053641&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Dec 29 14:32:37 2010
@@ -309,6 +309,7 @@ public abstract class DemandForwardingBr
localSessionInfo = new SessionInfo(localConnectionInfo, 1);
localBroker.oneway(localSessionInfo);
+
brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo);
LOG.info("Network connection between " + localBroker + "
and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
} else {
@@ -419,6 +420,7 @@ public abstract class DemandForwardingBr
ss.throwFirstException();
}
}
+ brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
LOG.info(configuration.getBrokerName() + " bridge to " +
remoteBrokerName + " stopped");
remoteBrokerNameKnownLatch.countDown();
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java?rev=1053641&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
Wed Dec 29 14:32:37 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.BrokerInfo;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.net.URI;
+
+public class AdvisoryNetworkBridgeTest extends TestCase {
+
+ BrokerService broker1;
+ BrokerService broker2;
+
+
+ public void testAdvisory() throws Exception {
+ broker1 = BrokerFactory.createBroker(new
URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+ broker1.start();
+ broker1.waitUntilStarted();
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://broker1");
+ Connection conn = factory.createConnection();
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+ MessageConsumer consumer =
sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+
+ Thread.sleep(1000);
+
+ broker2 = BrokerFactory.createBroker(new
URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+ broker2.start();
+ broker2.waitUntilStarted();
+
+ ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
+ assertNotNull(advisory);
+ assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+ assertTrue(advisory.getBooleanProperty("started"));
+
+ broker2.stop();
+ broker2.waitUntilStopped();
+
+ advisory = (ActiveMQMessage)consumer.receive(2000);
+ assertNotNull(advisory);
+ assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+ assertFalse(advisory.getBooleanProperty("started"));
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ broker1.stop();
+ broker1.waitUntilStopped();
+
+ broker2.stop();
+ broker2.waitUntilStopped();
+ }
+}