Author: rajdavies
Date: Thu Jan 5 06:58:35 2006
New Revision: 366191
URL: http://svn.apache.org/viewcvs?rev=366191&view=rev
Log:
when sending - use quality of service applicable to message and subscriber
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=366191&r1=366190&r2=366191&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Thu Jan 5 06:58:35 2006
@@ -31,11 +31,13 @@
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -300,7 +302,7 @@
}
}
- protected void serviceLocalException(IOException error) {
+ protected void serviceLocalException(Throwable error) {
log.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown: "+error.getMessage(), error);
ServiceSupport.dispose(this);
}
@@ -347,8 +349,16 @@
if( trace )
log.trace("bridging " + localBroker + " -> " +
remoteBroker + ": "+message);
-
+ if (!message.isPersistent() ||
!sub.remoteInfo.isDurable()){
remoteBroker.oneway( message );
+ }else{
+ Response response = remoteBroker.request(message);
+ if (response.isException()) {
+ ExceptionResponse er = (ExceptionResponse)
response;
+ serviceLocalException(er.getException());
+
+ }
+ }
localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched));
}
@@ -373,7 +383,7 @@
log.warn("Unexpected local command: "+command);
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
serviceLocalException(e);
}
}