Author: chirino
Date: Fri Feb 23 12:22:54 2007
New Revision: 511080

URL: http://svn.apache.org/viewvc?view=rev&rev=511080
Log:
 [EMAIL PROTECTED]:  chirino | 2007-02-23 14:48:10 -0500
 Flag a ConnectionContext as being a network connection if it sends us a 
BrokerInfo.
 Disable flow control if the producer is on a network connection.. trying to 
get around a network deadlock.
 
 

Modified:
    activemq/branches/activemq-4.1/   (props changed)
    
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java

Propchange: activemq/branches/activemq-4.1/
------------------------------------------------------------------------------
--- svk:merge (original)
+++ svk:merge Fri Feb 23 12:22:54 2007
@@ -1 +1 @@
-635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:235
+635f1f41-eb29-0410-ac9d-be9e2c357fdd:/local/amq-4.1-port:236

Modified: 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
--- 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 (original)
+++ 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 Fri Feb 23 12:22:54 2007
@@ -55,6 +55,7 @@
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private AtomicInteger referenceCounter = new AtomicInteger();
     private boolean dontSendReponse;
+    private boolean networkConnection;
     
     private final MessageEvaluationContext messageEvaluationContext = new 
MessageEvaluationContext();
     
@@ -253,6 +254,14 @@
 
        public void setDontSendReponse(boolean dontSendReponse) {
                this.dontSendReponse = dontSendReponse;
+       }
+
+       public synchronized boolean isNetworkConnection() {
+               return networkConnection;
+       }
+
+       public synchronized void setNetworkConnection(boolean 
networkConnection) {
+               this.networkConnection = networkConnection;
        }
 
 }

Modified: 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
--- 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Fri Feb 23 12:22:54 2007
@@ -125,6 +125,7 @@
     private CountDownLatch stopLatch = new CountDownLatch(1);
     protected final AtomicBoolean asyncException = new AtomicBoolean(false);
     private ConnectionContext context;
+    private boolean networkConnection;
     
     static class ConnectionState extends 
org.apache.activemq.state.ConnectionState {
         private final ConnectionContext context;
@@ -693,6 +694,7 @@
         context.setUserName(info.getUserName());
         context.setConnectionId(info.getConnectionId());
         context.setWireFormatInfo(wireFormatInfo);
+        context.setNetworkConnection(networkConnection);
         context.incrementReference();
         this.manageable = info.isManageable();
         
@@ -1058,6 +1060,12 @@
         
         this.brokerInfo = info;
         broker.addBroker(this, info);
+        networkConnection = true;
+        for (Iterator iter = localConnectionStates.values().iterator(); 
iter.hasNext();) {
+            ConnectionState cs = (ConnectionState) iter.next();
+            cs.getContext().setNetworkConnection(true);
+        }       
+
         return null;
     }
 

Modified: 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
--- 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Fri Feb 23 12:22:54 2007
@@ -280,7 +280,7 @@
 
     public void send(final ConnectionContext context, final Message message) 
throws Exception {
 
-        if (context.isProducerFlowControl()) {
+        if (context.isProducerFlowControl() && !context.isNetworkConnection()) 
{
             if( message.isResponseRequired() ) {
                if( usageManager.isFull() ) {
 //                     System.out.println("Registering callback...");

Modified: 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
--- 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Fri Feb 23 12:22:54 2007
@@ -232,7 +232,7 @@
 
     public void send(final ConnectionContext context, final Message message) 
throws Exception {
 
-        if (context.isProducerFlowControl()) {
+        if (context.isProducerFlowControl()  && !context.isNetworkConnection() 
) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager 
memory limit reached");
             } else {

Modified: 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java?view=diff&rev=511080&r1=511079&r2=511080
==============================================================================
--- 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
 (original)
+++ 
activemq/branches/activemq-4.1/activemq-core/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java
 Fri Feb 23 12:22:54 2007
@@ -56,9 +56,13 @@
     }
 
 
-    public boolean matches(MessageEvaluationContext message) throws 
JMSException{
+    public boolean matches(MessageEvaluationContext mec) throws JMSException{
         try{
-            return matchesForwardingFilter(message.getMessage());
+            //for Queues - the message can be acknowledged and dropped whilst 
still
+            //in the dispatch loop
+            //so need to get the reference to it
+            Message message = mec.getMessage();
+            return message != null &&  matchesForwardingFilter(message);
         }catch(IOException e){
             throw JMSExceptionSupport.create(e);
         }
@@ -132,4 +136,4 @@
         this.networkBrokerId = remoteBrokerPath;
     }
 
-}
\ No newline at end of file
+}


Reply via email to