Author: chirino
Date: Fri Apr 28 14:24:06 2006
New Revision: 398015
URL: http://svn.apache.org/viewcvs?rev=398015&view=rev
Log:
- Gaurd access to dispatched list ( a sync was missing).
- Added better exception messages to know what happened when a slave
subscription gets out of sync with the master.
- Implemented a simpler isFull()
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=398015&r1=398014&r2=398015&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Fri Apr 28 14:24:06 2006
@@ -60,35 +60,31 @@
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
- if(!isFull()&&!isSlaveBroker()){
+ if(!isFull()){
dispatch(node);
}else{
optimizePrefetch();
synchronized(pending){
- if( pending.isEmpty() )
- if (log.isDebugEnabled()){
- log.debug("Prefetch limit.");
- }
+ if( pending.isEmpty() ) {
+ log.debug("Prefetch limit.");
+ }
pending.addLast(node);
}
}
}
- public void processMessageDispatchNotification(MessageDispatchNotification
mdn){
+ synchronized public void
processMessageDispatchNotification(MessageDispatchNotification mdn) throws
Exception {
synchronized(pending){
for(Iterator i=pending.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next();
if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
- try{
- MessageDispatch
md=createMessageDispatch(node,node.getMessage());
- dispatched.addLast(node);
- }catch(Exception e){
- log.error("Problem processing
MessageDispatchNotification: "+mdn,e);
- }
- break;
+ createMessageDispatch(node,node.getMessage());
+ dispatched.addLast(node);
+ return;
}
}
+ throw new JMSException("Slave broker out of sync with master:
Dispatched message ("+mdn.getMessageId()+") was not in the pending list:
"+pending);
}
}
@@ -178,7 +174,12 @@
}
throw new JMSException("Could not correlate acknowledgment with
dispatched message: "+ack);
}
- throw new JMSException("Invalid acknowledgment: "+ack);
+
+ if( isSlaveBroker() ) {
+ throw new JMSException("Slave broker out of sync with master:
Acknowledgment ("+ack+") was not in the dispatch list: "+dispatched);
+ } else {
+ throw new JMSException("Invalid acknowledgment: "+ack);
+ }
}
/**
@@ -201,8 +202,12 @@
}
}
+ /**
+ * Used to determine if the broker can dispatch to the consumer.
+ * @return
+ */
protected boolean isFull(){
- return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+ return isSlaveBroker() ||
dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
/**
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=398015&r1=398014&r2=398015&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Fri Apr 28 14:24:06 2006
@@ -95,8 +95,9 @@
/**
* Used by a Slave Broker to update dispatch infomation
* @param mdn
+ * @throws Exception
*/
- void processMessageDispatchNotification(MessageDispatchNotification mdn);
+ void processMessageDispatchNotification(MessageDispatchNotification mdn)
throws Exception;
/**
* @return true if the broker is currently in slave mode