Author: chirino
Date: Tue Dec 20 10:05:38 2005
New Revision: 358056
URL: http://svn.apache.org/viewcvs?rev=358056&view=rev
Log:
- updated openwire marshalers.
- added some more toString() methods to the transports
- Fixed up the advisories so that duplicate consumer infos are not sent
- Changed the demand forwarding bridge so that loop back message filtering
occurs in the broker instead of on the bridge
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Message.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/MessageList.java
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/URISupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/jndi.properties
incubator/activemq/trunk/activemq-core/src/test/resources/spring-embedded.xml
incubator/activemq/trunk/activemq-core/src/test/resources/spring-jndi.xml
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/ActiveMQConnectionFactory.java
Tue Dec 20 10:05:38 2005
@@ -66,8 +66,6 @@
protected String password;
protected String clientID;
- protected boolean useEmbeddedBroker;
-
// optimization flags
private ActiveMQPrefetchPolicy prefetchPolicy = new
ActiveMQPrefetchPolicy();
private boolean disableTimeStampsByDefault = false;
@@ -315,14 +313,6 @@
public void setUseAsyncSend(boolean useAsyncSend) {
this.useAsyncSend = useAsyncSend;
- }
-
- public boolean isUseEmbeddedBroker() {
- return useEmbeddedBroker;
- }
-
- public void setUseEmbeddedBroker(boolean useEmbeddedBroker) {
- this.useEmbeddedBroker = useEmbeddedBroker;
}
public String getUserName() {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/advisory/AdvisoryBroker.java
Tue Dec 20 10:05:38 2005
@@ -118,7 +118,7 @@
for (Iterator iter = consumers.values().iterator();
iter.hasNext();) {
ConsumerInfo value = (ConsumerInfo) iter.next();
ActiveMQTopic topic =
AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
- fireConsumerAdvisory(context, topic, value);
+ fireConsumerAdvisory(context, topic, value,
info.getConsumerId());
}
}
}
@@ -194,9 +194,12 @@
}
protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command) throws Throwable {
+ fireConsumerAdvisory(context, topic, command, null);
+ }
+ protected void fireConsumerAdvisory(ConnectionContext context,
ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws
Throwable {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("consumerCount", consumers.size());
- fireAdvisory(context, topic, command, null, advisoryMessage);
+ fireAdvisory(context, topic, command, targetConsumerId,
advisoryMessage);
}
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic
topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage
advisoryMessage) throws Throwable {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/AbstractConnection.java
Tue Dec 20 10:05:38 2005
@@ -78,7 +78,6 @@
protected final List dispatchQueue = Collections.synchronizedList(new
LinkedList());
protected final TaskRunner taskRunner;
protected final Connector connector;
- protected boolean demandForwardingBridge;
private ConnectionStatistics statistics = new ConnectionStatistics();
protected final ConcurrentHashMap connectionStates = new
ConcurrentHashMap();
@@ -326,7 +325,6 @@
public Response processMessage(Message messageSend) throws Throwable {
- messageSend.setRecievedByDFBridge(demandForwardingBridge);
broker.send(lookupConnectionState(messageSend.getProducerId()).getContext(),
messageSend);
return null;
}
@@ -337,7 +335,6 @@
}
public Response processBrokerInfo(BrokerInfo info) {
- demandForwardingBridge = true;
return null;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/broker/TransportConnection.java
Tue Dec 20 10:05:38 2005
@@ -21,10 +21,6 @@
import java.io.IOException;
import org.activemq.command.Command;
-import org.activemq.command.CommandTypes;
-import org.activemq.command.ConsumerInfo;
-import org.activemq.command.Message;
-import org.activemq.command.MessageDispatch;
import org.activemq.command.Response;
import org.activemq.thread.TaskRunnerFactory;
import org.activemq.transport.Transport;
@@ -183,34 +179,15 @@
protected void dispatch(Command command){
- if(isValidForNetwork(command)){
- try{
- setMarkedCandidate(true);
- transport.oneway(command);
- getStatistics().onCommand(command);
- }catch(IOException e){
- serviceException(e);
- }finally{
- setMarkedCandidate(false);
- }
- }
- }
-
- protected boolean isValidForNetwork(Command command){
- boolean result=true;
- if(demandForwardingBridge&&command.isMessageDispatch()){
- MessageDispatch md=(MessageDispatch) command;
- Message message=md.getMessage();
- if(message.isAdvisory()&&message.getDataStructure()!=null
-
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
- ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
- if(info.isNetworkSubscription()){
- // don't want to forward these
- result=false;
- }
- }
+ try{
+ setMarkedCandidate(true);
+ transport.oneway(command);
+ getStatistics().onCommand(command);
+ }catch(IOException e){
+ serviceException(e);
+ }finally{
+ setMarkedCandidate(false);
}
- return result;
- }
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/BaseCommand.java
Tue Dec 20 10:05:38 2005
@@ -20,6 +20,7 @@
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Arrays;
import java.util.LinkedHashMap;
/**
@@ -118,7 +119,14 @@
}
try {
- map.put(field.getName(), field.get(this));
+ Object o = field.get(this);
+ if( o!=null && o.getClass().isArray() ) {
+ try {
+ o = Arrays.asList((Object[]) o);
+ } catch (Throwable e) {
+ }
+ }
+ map.put(field.getName(), o);
} catch (Throwable e) {
e.printStackTrace();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/ConsumerInfo.java
Tue Dec 20 10:05:38 2005
@@ -278,6 +278,7 @@
}
/**
+ * @openwire:property version=1
* @return Returns the networkSubscription.
*/
public boolean isNetworkSubscription(){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/DiscoveryEvent.java
Tue Dec 20 10:05:38 2005
@@ -1,5 +1,4 @@
/**
-* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
@@ -49,16 +48,17 @@
this.serviceName = serviceName;
}
+ /**
+ * @openwire:property version=1
+ */
public String getBrokerName(){
return brokerName;
}
-
public void setBrokerName(String name){
this.brokerName = name;
}
public boolean isMarshallAware() {
return false;
- }
-
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Message.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Message.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Message.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/command/Message.java
Tue Dec 20 10:05:38 2005
@@ -604,6 +604,7 @@
}
/**
+ * @openwire:property version=1
* @return Returns the recievedByDFBridge.
*/
public boolean isRecievedByDFBridge(){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/DemandForwardingBridge.java
Tue Dec 20 10:05:38 2005
@@ -19,6 +19,9 @@
package org.activemq.network;
import java.io.IOException;
+
+import javax.jms.JMSException;
+
import org.activemq.advisory.AdvisorySupport;
import org.activemq.command.ActiveMQTopic;
import org.activemq.command.BrokerId;
@@ -37,9 +40,12 @@
import org.activemq.command.RemoveInfo;
import org.activemq.command.SessionInfo;
import org.activemq.command.ShutdownInfo;
+import org.activemq.filter.BooleanExpression;
+import org.activemq.filter.MessageEvaluationContext;
import org.activemq.transport.Transport;
import org.activemq.transport.TransportListener;
import org.activemq.util.IdGenerator;
+import org.activemq.util.JMSExceptionSupport;
import org.activemq.util.LongSequenceGenerator;
import org.activemq.util.ServiceStopper;
import org.activemq.util.ServiceSupport;
@@ -231,18 +237,22 @@
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
BrokerId[] path = info.getBrokerPath();
- String pathStr = "{";
- for (int i =0; path != null && i < path.length; i++){
- pathStr += path[i] + " , ";
+
+ if( (path!=null && path.length>0) || info.isNetworkSubscription()
) {
+ // Ignore: We only support directly connected brokers for now.
+ return;
}
- pathStr += "}";
if( contains(info.getBrokerPath(), localBrokerPath[0]) ) {
// Ignore this consumer as it's a consumer we locally sent to
the broker.
return;
}
-
+
+ if( log.isTraceEnabled() )
+ log.trace("Forwarding sub on " + localBroker + " from " +
remoteBroker + " on "+info);
+
// Update the packet to show where it came from.
+ info = info.copy();
info.setBrokerPath( appendToBrokerPath(info.getBrokerPath(),
remoteBrokerPath) );
DemandSubscription sub = new DemandSubscription(info);
@@ -259,6 +269,21 @@
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(), sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
sub.localInfo.setNetworkSubscription(true);
+ // This works for now since we use a VM connection to the local
broker.
+ // may need to change if we ever subscribe to a remote broker.
+ sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
+ public boolean matches(MessageEvaluationContext message)
throws JMSException {
+ try {
+ return matchesForwardingFilter(message.getMessage());
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ public Object evaluate(MessageEvaluationContext message)
throws JMSException {
+ return matches(message) ? Boolean.TRUE : Boolean.FALSE;
+ }
+ });
+
localBroker.oneway(sub.localInfo);
}
if( data.getClass() == RemoveInfo.class ) {
@@ -275,30 +300,35 @@
log.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown: "+error.getMessage(), error);
ServiceSupport.dispose(this);
}
+
+ boolean matchesForwardingFilter(Message message) {
+ if( message.isRecievedByDFBridge() ||
contains(message.getBrokerPath(), remoteBrokerPath[0]) )
+ return false;
+
+ // Don't propagate advisory messages about network subscriptions
+ if( message.isAdvisory()
+ && message.getDataStructure()!=null
+ &&
message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO) {
+
+ ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
+ if(info.isNetworkSubscription()) {
+ return false;
+ }
+ }
+ return true;
+ }
protected void serviceLocalCommand(Command command) {
+ boolean trace = log.isTraceEnabled();
try {
if( command.isMessageDispatch() ) {
MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
- //only allow one network hop for this type of bridge
- if (message.isRecievedByDFBridge()){
- return;
- }
- if (message.isAdvisory() && message.getDataStructure() != null
&&
message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
- ConsumerInfo info =
(ConsumerInfo)message.getDataStructure();
- if (info.isNetworkSubscription()){
- //don't want to forward these
- return;
- }
- }
DemandSubscription sub =
(DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
if( sub!=null ) {
- if( contains(message.getBrokerPath(), remoteBrokerPath[0])
) {
- // Don't send the message back to the originator
- return;
- }
+ message = message.copy();
+
// Update the packet to show where it came from.
message.setBrokerPath(
appendToBrokerPath(message.getBrokerPath(), localBrokerPath) );
@@ -308,9 +338,14 @@
if( message.getOriginalTransactionId()==null )
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
+ message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
+ if( trace )
+ log.trace("bridging " + localBroker + " -> " +
remoteBroker + ": "+message);
+
remoteBroker.oneway( message );
+
sub.dispatched++;
if( sub.dispatched > (sub.localInfo.getPrefetchSize()*.75)
) {
localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/network/NetworkConnector.java
Tue Dec 20 10:05:38 2005
@@ -48,6 +48,7 @@
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private String brokerName;
+ boolean failover=true;
public NetworkConnector() {
}
@@ -88,7 +89,17 @@
if (bridges.containsKey(uri) || localURI.equals(uri))
return;
- log.info("Establishing network connection between " + localURI + "
and " + event.getBrokerName() + " at " + uri);
+ URI connectUri = uri;
+ if( failover ) {
+ try {
+ connectUri = new URI("failover:"+connectUri);
+ } catch (URISyntaxException e) {
+ log.warn("Could not create failover URI: "+connectUri);
+ return;
+ }
+ }
+
+ log.info("Establishing network connection between " + localURI + "
and " + event.getBrokerName() + " at " + connectUri);
Transport localTransport;
try {
@@ -101,15 +112,15 @@
Transport remoteTransport;
try {
- remoteTransport = TransportFactory.connect(uri);
+ remoteTransport = TransportFactory.connect(connectUri);
}
catch (Exception e) {
ServiceSupport.dispose(localTransport);
- log.warn("Could not connect to remote URI: " + uri + ": " + e,
e);
+ log.warn("Could not connect to remote URI: " + connectUri + ":
" + e, e);
return;
}
- Bridge bridge = createBridge(localTransport, remoteTransport);
+ Bridge bridge = createBridge(localTransport, remoteTransport,
event);
bridges.put(uri, bridge);
try {
bridge.start();
@@ -170,8 +181,17 @@
// Implementation methods
//
-------------------------------------------------------------------------
- protected Bridge createBridge(Transport localTransport, Transport
remoteTransport) {
- return new DemandForwardingBridge(localTransport, remoteTransport);
+ protected Bridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
+ return new DemandForwardingBridge(localTransport, remoteTransport) {
+ protected void serviceRemoteException(IOException error) {
+ super.serviceRemoteException(error);
+ try {
+ // Notify the discovery agent that the remote broker
failed.
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ };
}
public void setBrokerName(String brokerName) {
@@ -179,6 +199,14 @@
if( discoveryAgent!=null ) {
discoveryAgent.setBrokerName(brokerName);
}
+ }
+
+ public boolean isFailover() {
+ return failover;
+ }
+
+ public void setFailover(boolean reliable) {
+ this.failover = reliable;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/ConsumerInfoMarshaller.java
Tue Dec 20 10:05:38 2005
@@ -89,6 +89,7 @@
info.setBrokerPath(null);
}
+ info.setNetworkSubscription(bs.readBoolean());
}
@@ -113,6 +114,7 @@
bs.writeBoolean(info.isRetroactive());
rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs);
+ bs.writeBoolean(info.isNetworkSubscription());
return rc+5;
}
@@ -140,6 +142,7 @@
bs.readBoolean();
dataOut.writeByte(info.getPriority());
marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs);
+ bs.readBoolean();
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/openwire/v1/MessageMarshaller.java
Tue Dec 20 10:05:38 2005
@@ -107,6 +107,7 @@
info.setArrival(unmarshalLong(wireFormat, dataIn, bs));
info.setUserID(readString(dataIn, bs));
+ info.setRecievedByDFBridge(bs.readBoolean());
info.afterUnmarshall(wireFormat);
@@ -153,6 +154,7 @@
rc += marshalObjectArray(wireFormat, info.getBrokerPath(), bs);
rc+=marshal1Long(wireFormat, info.getArrival(), bs);
rc += writeString(info.getUserID(), bs);
+ bs.writeBoolean(info.isRecievedByDFBridge());
return rc+9;
}
@@ -204,6 +206,7 @@
marshalObjectArray(wireFormat, info.getBrokerPath(), dataOut, bs);
marshal2Long(wireFormat, info.getArrival(), dataOut, bs);
writeString(info.getUserID(), dataOut, bs);
+ bs.readBoolean();
info.afterMarshall(wireFormat);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/MutexTransport.java
Tue Dec 20 10:05:38 2005
@@ -53,4 +53,8 @@
}
}
+ public String toString() {
+ return next.toString();
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/ResponseCorrelator.java
Tue Dec 20 10:05:38 2005
@@ -84,4 +84,9 @@
commandListener.onCommand(command);
}
}
+
+ public String toString() {
+ return next.toString();
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/TransportLogger.java
Tue Dec 20 10:05:38 2005
@@ -52,4 +52,8 @@
}
next.oneway(command);
}
+
+ public String toString() {
+ return next.toString();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/WireFormatNegotiator.java
Tue Dec 20 10:05:38 2005
@@ -106,4 +106,8 @@
}
commandListener.onCommand(command);
}
+
+ public String toString() {
+ return next.toString();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgent.java
Tue Dec 20 10:05:38 2005
@@ -23,6 +23,7 @@
import javax.jms.JMSException;
import org.activemq.Service;
+import org.activemq.command.DiscoveryEvent;
/**
* An agent used to discover other instances of a service.
@@ -47,6 +48,12 @@
*/
void registerService(String name) throws IOException;
+ /**
+ * A process actively using a service may see it go down before the
DiscoveryAgent notices the
+ * service's failure. That process can use this method to notify the
DiscoveryAgent of the failure
+ * so that other listeners of this DiscoveryAgent can also be made aware
of the failure.
+ */
+ void serviceFailed(DiscoveryEvent event) throws IOException;
String getGroup();
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Tue Dec 20 10:05:38 2005
@@ -348,4 +348,8 @@
}
return result;
}
+
+ public void serviceFailed(DiscoveryEvent event) throws IOException {
+ processDead(event.getBrokerName(), event.getServiceName());
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgent.java
Tue Dec 20 10:05:38 2005
@@ -236,4 +236,8 @@
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
+
+ public void serviceFailed(DiscoveryEvent event) throws IOException {
+ // TODO: is there a way to notify the JmDNS that the service failed?
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Tue Dec 20 10:05:38 2005
@@ -82,4 +82,7 @@
public void setBrokerName(String brokerName) {
}
+ public void serviceFailed(DiscoveryEvent event) throws IOException {
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/failover/FailoverTransport.java
Tue Dec 20 10:05:38 2005
@@ -397,4 +397,8 @@
this.useExponentialBackOff = useExponentialBackOff;
}
+ public String toString() {
+ return connectedTransportURI==null ? "unconnected" :
connectedTransportURI.toString();
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
Tue Dec 20 10:05:38 2005
@@ -67,7 +67,7 @@
private VMTransportFactory createTransportFactory(URI location) throws
IOException {
try {
String group = location.getHost();
- String broker = location.getPath();
+ String broker = URISupport.stripPrefix(location.getPath(), "/");
if( group == null ) {
group = "default";
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/tcp/TcpTransport.java
Tue Dec 20 10:05:38 2005
@@ -121,7 +121,7 @@
* @return pretty print of 'this'
*/
public String toString() {
- return "TcpTransport: " + socket;
+ return "tcp://"+socket.getInetAddress()+":"+socket.getPort();
}
/**
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransport.java
Tue Dec 20 10:05:38 2005
@@ -16,10 +16,12 @@
package org.activemq.transport.vm;
import java.io.IOException;
+import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+
import org.activemq.command.Command;
import org.activemq.command.Response;
import org.activemq.transport.FutureResponse;
@@ -27,6 +29,8 @@
import org.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;
/**
* A Transport implementation that uses direct method invocations.
*
@@ -34,13 +38,21 @@
*/
public class VMTransport implements Transport{
private static final Log log=LogFactory.getLog(VMTransport.class);
+ private static final AtomicLong nextId = new AtomicLong(0);
+
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
protected boolean marshal;
protected boolean network;
protected List queue = Collections.synchronizedList(new LinkedList());
+ protected final URI location;
+ protected final long id;
+ public VMTransport(URI location) {
+ this.location = location;
+ this.id=nextId.getAndIncrement();
+ }
synchronized public VMTransport getPeer(){
return peer;
@@ -115,6 +127,10 @@
public void setNetwork(boolean network){
this.network=network;
+ }
+
+ public String toString() {
+ return location+"#"+id;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportServer.java
Tue Dec 20 10:05:38 2005
@@ -74,7 +74,7 @@
throw new IOException("Server TransportAcceptListener is null.");
connectionCount.incrementAndGet();
- VMTransport client = new VMTransport() {
+ VMTransport client = new VMTransport(location) {
public void stop() throws Exception {
if( disposed )
return;
@@ -85,7 +85,7 @@
};
};
- VMTransport server = new VMTransport();
+ VMTransport server = new VMTransport(location);
client.setPeer(server);
server.setPeer(client);
al.onAccept(configure(server));
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/MessageList.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/MessageList.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/MessageList.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/MessageList.java
Tue Dec 20 10:05:38 2005
@@ -17,14 +17,16 @@
**/
package org.activemq.util;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
import javax.jms.Message;
import javax.jms.MessageListener;
+import javax.jms.TextMessage;
import junit.framework.Assert;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* A simple container for performing testing and rendezvous style code.
*
@@ -57,6 +59,21 @@
public synchronized List getMessages() {
synchronized (semaphore) {
return new ArrayList(messages);
+ }
+ }
+
+ public synchronized List getTextMessages() {
+ synchronized (semaphore) {
+ ArrayList l = new ArrayList();
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ try {
+ TextMessage m = (TextMessage) iter.next();
+ l.add(m.getText());
+ } catch (Throwable e) {
+ l.add(""+e);
+ }
+ }
+ return l;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/URISupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/URISupport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/URISupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/util/URISupport.java
Tue Dec 20 10:05:38 2005
@@ -236,7 +236,7 @@
return rc;
}
- private static String stripPrefix(String value, String prefix) {
+ public static String stripPrefix(String value, String prefix) {
if( value.startsWith(prefix) )
return value.substring(prefix.length());
return value;
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/jndi/JNDITestSupport.java
Tue Dec 20 10:05:38 2005
@@ -91,7 +91,6 @@
}
protected void configureEnvironment() {
- environment.put("useEmbeddedBroker", "true");
environment.put("brokerURL", "vm://localhost");
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/peer/PeerTransportTest.java
Tue Dec 20 10:05:38 2005
@@ -48,27 +48,28 @@
protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
protected MessageProducer[] producers;
protected Connection[] connections;
- protected MessageList messageList = new MessageList();
+ protected MessageList messageList[];
protected void setUp() throws Exception {
- messageList.setVerbose(true);
connections = new Connection[NUMBER_IN_CLUSTER];
producers = new MessageProducer[NUMBER_IN_CLUSTER];
+ messageList = new MessageList[NUMBER_IN_CLUSTER];
Destination destination = createDestination();
String root = System.getProperty("activemq.store.dir");
for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
- System.setProperty("activemq.store.dir", root + "_broker_" + i);
- connections[i] = createConnection();
+ connections[i] = createConnection(i);
connections[i].setClientID("ClusterTest" + i);
connections[i].start();
+
Session session = connections[i].createSession(false,
Session.AUTO_ACKNOWLEDGE);
producers[i] = session.createProducer(destination);
producers[i].setDeliveryMode(deliveryMode);
MessageConsumer consumer = createMessageConsumer(session,
destination);
- consumer.setMessageListener(messageList);
+ messageList[i] = new MessageList();
+ consumer.setMessageListener(messageList[i]);
}
System.out.println("Sleeping to ensure cluster is fully connected");
Thread.sleep(10000);
@@ -87,13 +88,9 @@
return session.createConsumer(destination);
}
- protected int expectedReceiveCount() {
- return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
- }
-
- protected Connection createConnection() throws JMSException {
+ protected Connection createConnection(int i) throws JMSException {
System.err.println("creating connection ....");
- ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory("peer://" + getClass().getName());
+ ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory("peer://" + getClass().getName()+"/node"+i);
return fac.createConnection();
}
@@ -120,10 +117,16 @@
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("MSG-NO: " + i + " in cluster: " + x);
producers[x].send(textMessage);
- // System.out.println("SENT MSG: " + textMessage);
}
}
- messageList.assertMessagesReceived(expectedReceiveCount());
+ for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
+ messageList[i].assertMessagesReceived(expectedReceiveCount());
+ }
+ }
+
+ protected int expectedReceiveCount() {
+ return MESSAGE_COUNT * NUMBER_IN_CLUSTER;
}
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/jndi.properties
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/jndi.properties?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/jndi.properties
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/jndi.properties
Tue Dec 20 10:05:38 2005
@@ -5,9 +5,6 @@
# use the following property to configure the default connector
java.naming.provider.url = vm://localhost
-# use the following property to embed a broker inside this JVM
-#useEmbeddedBroker = true
-
# use the following property to specify a class path resource or URL
# used to configure an embedded broker using the XML configuration file
#brokerXmlConfig = file:src/conf/sample-conf/default.xml
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/spring-embedded.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/spring-embedded.xml?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/resources/spring-embedded.xml
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/resources/spring-embedded.xml
Tue Dec 20 10:05:38 2005
@@ -12,9 +12,6 @@
<!-- JMS ConnectionFactory to use, configuring the embedded broker using
XML -->
<bean id="jmsFactory" class="org.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost" />
- <property name="useEmbeddedBroker">
- <value>true</value>
- </property>
</bean>
<!-- Spring JMS Template -->
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/spring-jndi.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/spring-jndi.xml?rev=358056&r1=358055&r2=358056&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/spring-jndi.xml
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/spring-jndi.xml
Tue Dec 20 10:05:38 2005
@@ -12,8 +12,6 @@
<property name="environment">
<props>
<prop
key="java.naming.factory.initial">org.activemq.jndi.ActiveMQInitialContextFactory</prop>
- <prop key="useEmbeddedBroker">true</prop>
-
<!-- lets register some destinations -->
<prop key="topic.MyTopic">example.Spring.MyTopic</prop>
</props>