Author: chirino
Date: Fri Feb 23 12:22:54 2007
New Revision: 511080
URL: http://svn.apache.org/viewvc?view=rev&rev=511080
Log:
[EMAIL PROTECTED]: chirino | 2007-02-23 14:48:10 -0500
Flag a ConnectionContext as being a network connection if it sends us a
BrokerInfo.
Disable flow control if the producer is on a network connection.. trying to
get around a network deadlock.
Modified:
activemq/branches/activemq-4.1/ (props changed)
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:22:54 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:235
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:236
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Fri Feb 23 12:22:54 2007
@@ -55,6 +55,7 @@
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private boolean dontSendReponse;
+ private boolean networkConnection;
private final MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
@@ -253,6 +254,14 @@
public void setDontSendReponse(boolean dontSendReponse) {
this.dontSendReponse = dontSendReponse;
+ }
+
+ public synchronized boolean isNetworkConnection() {
+ return networkConnection;
+ }
+
+ public synchronized void setNetworkConnection(boolean
networkConnection) {
+ this.networkConnection = networkConnection;
}
}
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Feb 23 12:22:54 2007
@@ -125,6 +125,7 @@
private CountDownLatch stopLatch = new CountDownLatch(1);
protected final AtomicBoolean asyncException = new AtomicBoolean(false);
private ConnectionContext context;
+ private boolean networkConnection;
static class ConnectionState extends
org.apache.activemq.state.ConnectionState {
private final ConnectionContext context;
@@ -693,6 +694,7 @@
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
context.setWireFormatInfo(wireFormatInfo);
+ context.setNetworkConnection(networkConnection);
context.incrementReference();
this.manageable = info.isManageable();
@@ -1058,6 +1060,12 @@
this.brokerInfo = info;
broker.addBroker(this, info);
+ networkConnection = true;
+ for (Iterator iter = localConnectionStates.values().iterator();
iter.hasNext();) {
+ ConnectionState cs = (ConnectionState) iter.next();
+ cs.getContext().setNetworkConnection(true);
+ }
+
return null;
}
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Feb 23 12:22:54 2007
@@ -280,7 +280,7 @@
public void send(final ConnectionContext context, final Message message)
throws Exception {
- if (context.isProducerFlowControl()) {
+ if (context.isProducerFlowControl() && !context.isNetworkConnection())
{
if( message.isResponseRequired() ) {
if( usageManager.isFull() ) {
// System.out.println("Registering callback...");
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Feb 23 12:22:54 2007
@@ -232,7 +232,7 @@
public void send(final ConnectionContext context, final Message message)
throws Exception {
- if (context.isProducerFlowControl()) {
+ if (context.isProducerFlowControl() && !context.isNetworkConnection()
) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
} else {
Modified:
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
---
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
(original)
+++
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
Fri Feb 23 12:22:54 2007
@@ -56,9 +56,13 @@
}
- public boolean matches(MessageEvaluationContext message) throws
JMSException{
+ public boolean matches(MessageEvaluationContext mec) throws JMSException{
try{
- return matchesForwardingFilter(message.getMessage());
+ //for Queues - the message can be acknowledged and dropped whilst
still
+ //in the dispatch loop
+ //so need to get the reference to it
+ Message message = mec.getMessage();
+ return message != null && matchesForwardingFilter(message);
}catch(IOException e){
throw JMSExceptionSupport.create(e);
}
@@ -132,4 +136,4 @@
this.networkBrokerId = remoteBrokerPath;
}
-}
\ No newline at end of file
+}