Author: jlim
Date: Mon Mar 5 07:41:17 2007
New Revision: 514694
URL: http://svn.apache.org/viewvc?view=rev&rev=514694
Log:
ported fix to trunk :
http://issues.apache.org/activemq/browse/AMQ-1172
http://issues.apache.org/activemq/browse/AMQ-1174
http://issues.apache.org/activemq/browse/AMQ-1175
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
Mon Mar 5 07:41:17 2007
@@ -105,5 +105,7 @@
public String getRemoteAddress();
public void serviceExceptionAsync(IOException e);
+
+ public String getConnectionId();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Mar 5 07:41:17 2007
@@ -26,6 +26,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
@@ -97,7 +98,7 @@
// Used to do async dispatch.. this should perhaps be pushed down into the
transport layer..
protected final List dispatchQueue=Collections.synchronizedList(new
LinkedList());
protected final TaskRunner taskRunner;
- protected IOException transportException;
+ protected final AtomicReference transportException = new AtomicReference();
private boolean inServiceException=false;
private ConnectionStatistics statistics=new ConnectionStatistics();
private boolean manageable;
@@ -116,6 +117,8 @@
private final AtomicBoolean asyncException=new AtomicBoolean(false);
private final Map<ProducerId,ProducerBrokerExchange>producerExchanges =
new HashMap<ProducerId,ProducerBrokerExchange>();
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges =
new HashMap<ConsumerId,ConsumerBrokerExchange>();
+ private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+ protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
static class ConnectionState extends
org.apache.activemq.state.ConnectionState{
@@ -166,7 +169,7 @@
Command command=(Command)o;
Response response=service(command);
if(response!=null){
- dispatch(response);
+ dispatchSync(response);
}
}
@@ -186,7 +189,7 @@
public void serviceTransportException(IOException e){
if(!disposed.get()){
- transportException=e;
+ transportException.set(e);
if(transportLog.isDebugEnabled())
transportLog.debug("Transport failed: "+e,e);
ServiceSupport.dispose(this);
@@ -683,47 +686,96 @@
}
public void dispatchSync(Command message){
- processDispatch(message);
+ getStatistics().getEnqueues().increment();
+ try {
+ processDispatch(message);
+ } catch (IOException e) {
+ serviceExceptionAsync(e);
+ }
}
public void dispatchAsync(Command message){
- if(taskRunner==null){
- dispatchSync(message);
- }else{
- dispatchQueue.add(message);
- try{
- taskRunner.wakeup();
- }catch(InterruptedException e){
- Thread.currentThread().interrupt();
+ if( !disposed.get() ) {
+ getStatistics().getEnqueues().increment();
+ if( taskRunner==null ) {
+ dispatchSync( message );
+ } else {
+ dispatchQueue.add(message);
+ try {
+ taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
+ } else {
+ if(message.isMessageDispatch()) {
+ MessageDispatch md=(MessageDispatch) message;
+ Runnable sub=(Runnable) md.getConsumer();
+ broker.processDispatch(md);
+ if(sub!=null){
+ sub.run();
+ }
+ }
}
}
- protected void processDispatch(Command command){
- if(command.isMessageDispatch()){
- MessageDispatch md=(MessageDispatch)command;
- Runnable sub=(Runnable)md.getConsumer();
- broker.processDispatch(md);
- try{
- dispatch(command);
- }finally{
+ protected void processDispatch(Command command) throws IOException {
+ try {
+ if( !disposed.get() ) {
+ dispatch(command);
+ }
+ } finally {
+
+ if(command.isMessageDispatch()){
+ MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=(Runnable) md.getConsumer();
+ broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
- }else{
- dispatch(command);
+
+ getStatistics().getDequeues().increment();
}
- }
+ }
+
+
public boolean iterate(){
- if(dispatchQueue.isEmpty()||broker.isStopped()){
- return false;
- }else{
- Command command=(Command)dispatchQueue.remove(0);
- processDispatch(command);
- return true;
- }
+ try {
+ if( disposed.get() ) {
+ if( dispatchStopped.compareAndSet(false, true)) {
+ if( transportException.get()==null ) {
+ try {
+ dispatch(new ShutdownInfo());
+ } catch (Throwable ignore) {
+ }
+ }
+ dispatchStoppedLatch.countDown();
+ }
+ return false;
+ }
+
+ if( !dispatchStopped.get() ) {
+
+ if( dispatchQueue.isEmpty() ) {
+ return false;
+ } else {
+ Command command = (Command) dispatchQueue.remove(0);
+ processDispatch( command );
+ return true;
+ }
+ } else {
+ return false;
+ }
+
+ } catch (IOException e) {
+ if( dispatchStopped.compareAndSet(false, true)) {
+ dispatchStoppedLatch.countDown();
+ }
+ serviceExceptionAsync(e);
+ return false;
+ }
}
/**
@@ -792,11 +844,24 @@
transport.stop();
active=false;
if(disposed.compareAndSet(false,true)){
- if(taskRunner!=null)
- taskRunner.shutdown();
- // Clear out the dispatch queue to release any memory that
- // is being held on to.
- dispatchQueue.clear();
+ taskRunner.wakeup();
+ dispatchStoppedLatch.await();
+
+ if( taskRunner!=null )
+ taskRunner.shutdown();
+
+ // Run the MessageDispatch callbacks so that message
references get cleaned up.
+ for (Iterator iter = dispatchQueue.iterator();
iter.hasNext();) {
+ Command command = (Command) iter.next();
+ if(command.isMessageDispatch()) {
+ MessageDispatch md=(MessageDispatch) command;
+ Runnable sub=(Runnable) md.getConsumer();
+ broker.processDispatch(md);
+ if(sub!=null){
+ sub.run();
+ }
+ }
+ }
//
// Remove all logical connection associated with this
connection
// from the broker.
@@ -965,13 +1030,10 @@
return null;
}
- protected void dispatch(Command command){
+ protected void dispatch(Command command) throws IOException{
try{
setMarkedCandidate(true);
transport.oneway(command);
- getStatistics().onCommand(command);
- }catch(IOException e){
- serviceExceptionAsync(e);
}finally{
setMarkedCandidate(false);
}
@@ -980,6 +1042,17 @@
public String getRemoteAddress(){
return transport.getRemoteAddress();
}
+
+ public String getConnectionId() {
+ Iterator iterator = localConnectionStates.values().iterator();
+ ConnectionState object = (ConnectionState) iterator.next();
+ if( object == null ) {
+ return null;
+ }
+ if( object.getInfo().getClientId() !=null )
+ return object.getInfo().getClientId();
+ return object.getInfo().getConnectionId().toString();
+ }
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id){
ProducerBrokerExchange result=producerExchanges.get(id);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ConnectionView.java
Mon Mar 5 07:41:17 2007
@@ -102,4 +102,8 @@
return connection.getRemoteAddress();
}
+ public String getConnectionId() {
+ return connection.getConnectionId();
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/ConnectionStatistics.java
Mon Mar 5 07:41:17 2007
@@ -18,8 +18,7 @@
package org.apache.activemq.broker.region;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Message;
+
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
@@ -73,16 +72,5 @@
}
}
- /**
- * Updates the statistics as a command is dispatched into the connection
- */
- public void onCommand(Command command) {
- if (command.isMessageDispatch()) {
- enqueues.increment();
- }
- }
- public void onMessageDequeue(Message message) {
- dequeues.increment();
- }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Mar 5 07:41:17 2007
@@ -453,7 +453,6 @@
if(node.getRegionDestination()!=null){
if(node!=QueueMessageReference.NULL_MESSAGE){
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
-
context.getConnection().getStatistics().onMessageDequeue(message);
}
try{
dispatchMatched();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=514694&r1=514693&r2=514694
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Mar 5 07:41:17 2007
@@ -44,8 +44,8 @@
private static final AtomicLong cursorNameCounter=new AtomicLong(0);
protected PendingMessageCursor matched;
final protected UsageManager usageManager;
- protected AtomicLong dispatched=new AtomicLong();
- protected AtomicLong delivered=new AtomicLong();
+ protected AtomicLong dispatchedCounter=new AtomicLong();
+ protected AtomicLong prefetchExtension=new AtomicLong();
private int maximumPendingMessages=-1;
private MessageEvictionStrategy messageEvictionStrategy=new
OldestMessageEvictionStrategy();
private int discarded=0;
@@ -136,7 +136,7 @@
MessageReference node=matched.next();
if(node.isExpired()){
matched.remove();
- dispatched.incrementAndGet();
+ dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
break;
}
@@ -154,7 +154,7 @@
MessageReference node=matched.next();
if(node.getMessageId().equals(mdn.getMessageId())){
matched.remove();
- dispatched.incrementAndGet();
+ dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
break;
}
@@ -170,7 +170,7 @@
boolean wasFull=isFull();
if(ack.isStandardAck()||ack.isPoisonAck()){
if(context.isInTransaction()){
- delivered.addAndGet(ack.getMessageCount());
+ prefetchExtension.addAndGet(ack.getMessageCount());
context.getTransaction().addSynchronization(new
Synchronization(){
public void afterCommit() throws Exception{
@@ -180,8 +180,7 @@
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
- dispatched.addAndGet(-ack.getMessageCount());
-
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+ prefetchExtension.addAndGet(ack.getMessageCount());
}
});
}else{
@@ -189,8 +188,7 @@
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
- dispatched.addAndGet(-ack.getMessageCount());
-
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
+ prefetchExtension.addAndGet(ack.getMessageCount());
}
if(wasFull&&!isFull()){
dispatchMatched();
@@ -198,7 +196,7 @@
return;
}else if(ack.isDeliveredAck()){
// Message was delivered but not acknowledged: update pre-fetch
counters.
- delivered.addAndGet(ack.getMessageCount());
+ prefetchExtension.addAndGet(ack.getMessageCount());
if(wasFull&&!isFull()){
dispatchMatched();
}
@@ -217,7 +215,7 @@
}
public int getDispatchedQueueSize(){
- return (int)(dispatched.get()-delivered.get());
+ return (int)(dispatchedCounter.get()-dequeueCounter.get());
}
public int getMaximumPendingMessages(){
@@ -225,7 +223,7 @@
}
public long getDispatchedCounter(){
- return dispatched.get();
+ return dispatchedCounter.get();
}
public long getEnqueueCounter(){
@@ -277,21 +275,21 @@
// Implementation methods
//
-------------------------------------------------------------------------
private boolean isFull(){
- return dispatched.get()-delivered.get()>=info.getPrefetchSize();
+ return
getDispatchedQueueSize()-prefetchExtension.get()>=info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark(){
- return (dispatched.get()-delivered.get())<=(info.getPrefetchSize()*.4);
+ return (getDispatchedQueueSize()-prefetchExtension.get()) <=
(info.getPrefetchSize() *.4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark(){
- return (dispatched.get()-delivered.get())>=(info.getPrefetchSize()*.9);
+ return (getDispatchedQueueSize()-prefetchExtension.get()) >=
(info.getPrefetchSize() *.9);
}
/**
@@ -386,7 +384,7 @@
md.setMessage(message);
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
- dispatched.incrementAndGet();
+ dispatchedCounter.incrementAndGet();
// Keep track if this subscription is receiving messages from a single
destination.
if(singleDestination){
if(destination==null){
@@ -429,6 +427,8 @@
}
}
-
+ public int getPrefetchSize() {
+ return (int) (info.getPrefetchSize() + prefetchExtension.get());
+ }
}