Author: rajdavies
Date: Tue Mar 13 08:53:24 2007
New Revision: 517741
URL: http://svn.apache.org/viewvc?view=rev&rev=517741
Log:
add method to retrieve the URI used by the local VMTransport
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Tue Mar 13 08:53:24 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq.broker;
+import java.net.URI;
import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination;
@@ -239,6 +240,14 @@
* @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext
adminConnectionContext);
-
+
+ /**
+ * @return the temp data store
+ */
public Store getTempDataStore();
+
+ /**
+ * @return the URI that can be used to connect to the local Broker
+ */
+ public URI getVmConnectorURI();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
+import java.net.URI;
import java.util.Map;
import java.util.Set;
@@ -238,5 +239,8 @@
return next.getTempDataStore();
}
+ public URI getVmConnectorURI(){
+ return next.getVmConnectorURI();
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
+import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -234,6 +235,10 @@
public Store getTempDataStore() {
+ return null;
+ }
+
+ public URI getVmConnectorURI(){
return null;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Tue Mar 13 08:53:24 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq.broker;
+import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -233,6 +234,10 @@
}
public Store getTempDataStore() {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public URI getVmConnectorURI(){
throw new BrokerStoppedException(this.message);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Tue Mar 13 08:53:24 2007
@@ -38,6 +38,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
+import java.net.URI;
import java.util.Map;
import java.util.Set;
@@ -248,6 +249,10 @@
public Store getTempDataStore() {
return getNext().getTempDataStore();
+ }
+
+ public URI getVmConnectorURI(){
+ return getNext().getVmConnectorURI();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Mar 13 08:53:24 2007
@@ -22,6 +22,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -66,6 +67,8 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.network.DemandForwardingBridge;
+import org.apache.activemq.network.NetworkBridgeConfiguration;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConsumerState;
@@ -77,6 +80,8 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -129,6 +134,7 @@
private ConnectionContext context;
private boolean networkConnection;
private AtomicInteger protocolVersion=new
AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+ private DemandForwardingBridge duplexBridge = null;
static class ConnectionState extends
org.apache.activemq.state.ConnectionState{
@@ -464,6 +470,10 @@
if(seq>producerState.getLastSequenceId()){
producerState.setLastSequenceId(seq);
broker.send(producerExchange,messageSend);
+ }else {
+ if (log.isDebugEnabled()) {
+ log.debug("Discarding duplicate: " + messageSend);
+ }
}
}else{
// producer not local to this broker
@@ -1063,6 +1073,19 @@
masterBroker=new MasterBroker(parent,transport);
masterBroker.startProcessing();
log.info("Slave Broker "+info.getBrokerName()+" is attached");
+ }else if (info.isNetworkConnection() && info.isDuplexConnection()) {
+ //so this TransportConnection is the rear end of a network bridge
+ //We have been requested to create a two way pipe ...
+ try{
+ Properties props =
MarshallingSupport.stringToProperties(info.getNetworkProperties());
+ NetworkBridgeConfiguration config = new
NetworkBridgeConfiguration();
+ IntrospectionSupport.setProperties(config,props,null);
+ config.setLocalBrokerName(broker.getBrokerName());
+
+
+ }catch(IOException e){
+ log.error("Creating duplex network bridge",e);
+ }
}
// We only expect to get one broker info command per connection
if(this.brokerInfo!=null){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Tue Mar 13 08:53:24 2007
@@ -18,6 +18,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -607,5 +608,9 @@
public Store getTempDataStore() {
return brokerService.getTempDataStore();
+ }
+
+ public URI getVmConnectorURI(){
+ return brokerService.getVmConnectorURI();
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=517741&r1=517740&r2=517741
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Tue Mar 13 08:53:24 2007
@@ -39,6 +39,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
+import java.net.URI;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
@@ -231,4 +232,8 @@
public Store getTempDataStore() {
return null;
}
+
+ public URI getVmConnectorURI(){
+ return null;
+ }
}