Author: jlim
Date: Mon Mar  5 08:26:08 2007
New Revision: 514720

URL: http://svn.apache.org/viewvc?view=rev&rev=514720
Log:
ported fix to trunk : 
http://issues.apache.org/activemq/browse/AMQ-1181

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
 Mon Mar  5 08:26:08 2007
@@ -54,7 +54,7 @@
     private boolean producerFlowControl=true;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private AtomicInteger referenceCounter = new AtomicInteger();
-    
+    private boolean networkConnection;
     private final MessageEvaluationContext messageEvaluationContext = new 
MessageEvaluationContext();
     
     public ConnectionContext() {
@@ -246,4 +246,11 @@
                return referenceCounter.decrementAndGet();
        }
 
+       public synchronized boolean isNetworkConnection() {
+               return networkConnection;
+       }
+
+       public synchronized void setNetworkConnection(boolean 
networkConnection) {
+               this.networkConnection = networkConnection;
+       }       
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Mon Mar  5 08:26:08 2007
@@ -119,7 +119,8 @@
     private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = 
new HashMap<ConsumerId,ConsumerBrokerExchange>();
     private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
-
+    private boolean networkConnection;
+    
     static class ConnectionState extends 
org.apache.activemq.state.ConnectionState{
 
         private final ConnectionContext context;
@@ -627,6 +628,7 @@
         context.setUserName(info.getUserName());
         context.setConnectionId(info.getConnectionId());
         context.setWireFormatInfo(wireFormatInfo);
+        context.setNetworkConnection(networkConnection);
         context.incrementReference();
         this.manageable=info.isManageable();
         state=new ConnectionState(info,context,this);
@@ -1027,6 +1029,12 @@
         }
         this.brokerInfo=info;
         broker.addBroker(this,info);
+        networkConnection = true;
+        for (Iterator iter = localConnectionStates.values().iterator(); 
iter.hasNext();) {
+            ConnectionState cs = (ConnectionState) iter.next();
+            cs.getContext().setNetworkConnection(true);
+        }   
+        
         return null;
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Mon Mar  5 08:26:08 2007
@@ -325,7 +325,7 @@
             }
             return;
         }
-        if(context.isProducerFlowControl()){
+        if (context.isProducerFlowControl() && !context.isNetworkConnection()) 
{
             if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager 
memory limit reached");
             }else{

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Mon Mar  5 08:26:08 2007
@@ -243,7 +243,7 @@
        if( message.isExpired() ) {
                return;
        }
-        if (context.isProducerFlowControl()) {
+       if (context.isProducerFlowControl()  && !context.isNetworkConnection() 
) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager 
memory limit reached");
             } else {


Reply via email to