Author: jlim
Date: Mon Mar 5 09:03:36 2007
New Revision: 514734
URL: http://svn.apache.org/viewvc?view=rev&rev=514734
Log:
ported fix to trunk :
http://issues.apache.org/activemq/browse/AMQ-1179
http://issues.apache.org/activemq/browse/AMQ-1180
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Mon Mar 5 09:03:36 2007
@@ -18,6 +18,7 @@
package org.apache.activemq.broker;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.region.MessageReference;
@@ -55,6 +56,7 @@
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private boolean networkConnection;
+ private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
public ConnectionContext() {
@@ -253,4 +255,9 @@
public synchronized void setNetworkConnection(boolean
networkConnection) {
this.networkConnection = networkConnection;
}
+
+ public AtomicBoolean getStopping() {
+ return stopping;
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Mar 5 09:03:36 2007
@@ -111,7 +111,8 @@
private boolean starting;
private boolean pendingStop;
private long timeStamp=0;
- private AtomicBoolean stopped=new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final AtomicBoolean transportDisposed = new AtomicBoolean();
private final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch=new CountDownLatch(1);
private final AtomicBoolean asyncException=new AtomicBoolean(false);
@@ -846,8 +847,24 @@
transport.stop();
active=false;
if(disposed.compareAndSet(false,true)){
- taskRunner.wakeup();
- dispatchStoppedLatch.await();
+
+ // Let all the connection contexts know we are shutting down
+ // so that in progress operations can notice and unblock.
+ ArrayList l=new ArrayList(localConnectionStates.values());
+ for(Iterator iter=l.iterator();iter.hasNext();){
+ ConnectionState cs=(ConnectionState) iter.next();
+ cs.getContext().getStopping().set(true);
+ }
+
+ if( taskRunner!=null ) {
+ taskRunner.wakeup();
+ // Give it a change to stop gracefully.
+ dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
+ disposeTransport();
+ taskRunner.shutdown();
+ } else {
+ disposeTransport();
+ }
if( taskRunner!=null )
taskRunner.shutdown();
@@ -868,7 +885,7 @@
// Remove all logical connection associated with this
connection
// from the broker.
if(!broker.isStopped()){
- ArrayList l=new ArrayList(localConnectionStates.keySet());
+ l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId)iter.next();
try{
@@ -884,7 +901,6 @@
}
stopLatch.countDown();
}
- log.debug("Stopped connection: "+transport.getRemoteAddress());
}
}
@@ -1122,4 +1138,16 @@
consumerExchanges.remove(id);
}
}
+
+ protected void disposeTransport() {
+ if( transportDisposed.compareAndSet(false, true) ) {
+ try {
+ transport.stop();
+ active = false;
+ log.debug("Stopped connection:
"+transport.getRemoteAddress());
+ } catch (Exception e) {
+ log.debug("Could not stop transport: "+e,e);
+ }
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Mar 5 09:03:36 2007
@@ -329,7 +329,10 @@
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
}else{
- usageManager.waitForSpace();
+ while( !usageManager.waitForSpace(1000) ) {
+ if( context.getStopping().get() )
+ throw new IOException("Connection closed, send
aborted.");
+ }
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if(message.isExpired()){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Mar 5 09:03:36 2007
@@ -247,6 +247,10 @@
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
} else {
+ while( !usageManager.waitForSpace(1000) ) {
+ if( context.getStopping().get() )
+ throw new IOException("Connection closed, send
aborted.");
+ }
usageManager.waitForSpace();
// The usage manager could have delayed us by the time
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Mar 5 09:03:36 2007
@@ -175,7 +175,7 @@
public void afterCommit() throws Exception{
synchronized(TopicSubscription.this){
- if(singleDestination){
+ if( singleDestination && destination!=null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
@@ -184,7 +184,7 @@
}
});
}else{
- if(singleDestination){
+ if( singleDestination && destination!=null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=514734&r1=514733&r2=514734
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Mon Mar 5 09:03:36 2007
@@ -116,6 +116,24 @@
}
}
}
+
+ /**
+ * @throws InterruptedException
+ *
+ * @param timeout
+ */
+ public boolean waitForSpace(long timeout) throws InterruptedException {
+ if(parent!=null) {
+ if( !parent.waitForSpace(timeout) )
+ return false;
+ }
+ synchronized (usageMutex) {
+ if( percentUsage >= 100 ) {
+ usageMutex.wait(timeout);
+ }
+ return percentUsage < 100;
+ }
+ }
/**
* Increases the usage by the value amount.