Author: rajdavies
Date: Thu Mar 1 16:37:19 2012
New Revision: 1295662
URL: http://svn.apache.org/viewvc?rev=1295662&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3750 - add hint to storage of
messages to enable concurrent store and dispatch
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Mar 1 16:37:19 2012
@@ -98,6 +98,7 @@ public abstract class BaseDestination im
private boolean reduceMemoryFootprint = false;
protected final Scheduler scheduler;
private boolean disposed = false;
+ private boolean doOptimzeMessageStorage = true;
/**
* @param brokerService
@@ -714,6 +715,15 @@ public abstract class BaseDestination im
return this.reduceMemoryFootprint;
}
+ public boolean isDoOptimzeMessageStorage() {
+ return doOptimzeMessageStorage;
+ }
+
+ public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+ this.doOptimzeMessageStorage = doOptimzeMessageStorage;
+ }
+
+
public abstract List<Subscription> getConsumers();
protected boolean hasRegularConsumers(List<Subscription> consumers) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Mar 1 16:37:19 2012
@@ -230,4 +230,7 @@ public interface Destination extends Ser
boolean isPrioritizedMessages();
SlowConsumerStrategy getSlowConsumerStrategy();
+
+ boolean isDoOptimzeMessageStorage();
+ void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Mar 1 16:37:19 2012
@@ -302,4 +302,12 @@ public class DestinationFilter implement
return next.getSlowConsumerStrategy();
}
+ public boolean isDoOptimzeMessageStorage() {
+ return next.isDoOptimzeMessageStorage();
+ }
+
+ public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+ next.setDoOptimzeMessageStorage(doOptimzeMessageStorage);
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 1 16:37:19 2012
@@ -721,7 +721,7 @@ public class Queue extends BaseDestinati
if (store != null && message.isPersistent()) {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
if (messages.isCacheEnabled()) {
- result = store.asyncAddQueueMessage(context, message);
+ result = store.asyncAddQueueMessage(context, message,
isOptimizeStorage());
} else {
store.addMessage(context, message);
}
@@ -2137,4 +2137,33 @@ public class Queue extends BaseDestinati
protected Logger getLog() {
return LOG;
}
+
+ protected boolean isOptimizeStorage(){
+ boolean result = false;
+ if (isDoOptimzeMessageStorage()){
+ consumersLock.readLock().lock();
+ try{
+ if (consumers.isEmpty()==false){
+ result = true;
+ for (Subscription s : consumers) {
+ if (s.getPrefetchSize()==0){
+ result = false;
+ break;
+ }
+ if (s.isSlowConsumer()){
+ result = false;
+ break;
+ }
+ if (s.getInFlightUsage() > 10){
+ result = false;
+ break;
+ }
+ }
+ }
+ }finally {
+ consumersLock.readLock().unlock();
+ }
+ }
+ return result;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Mar 1 16:37:19 2012
@@ -428,7 +428,7 @@ public class Topic extends BaseDestinati
waitForSpace(context, systemUsage.getStoreUsage(),
getStoreUsageHighWaterMark(), logMessage);
}
- result = topicStore.asyncAddTopicMessage(context, message);
+ result = topicStore.asyncAddTopicMessage(context,
message,isOptimizeStorage());
}
message.incrementReferenceCount();
@@ -688,4 +688,31 @@ public class Topic extends BaseDestinati
protected Logger getLog() {
return LOG;
}
-}
+
+ protected boolean isOptimizeStorage(){
+ boolean result = false;
+
+ if (isDoOptimzeMessageStorage() && durableSubcribers.isEmpty()==false){
+ result = true;
+ for (DurableTopicSubscription s : durableSubcribers.values()) {
+ if (s.isActive()== false){
+ result = false;
+ break;
+ }
+ if (s.getPrefetchSize()==0){
+ result = false;
+ break;
+ }
+ if (s.isSlowConsumer()){
+ result = false;
+ break;
+ }
+ if (s.getInFlightUsage() > 10){
+ result = false;
+ break;
+ }
+ }
+ }
+ return result;
+ }
+}
\ No newline at end of file
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Mar 1 16:37:19 2012
@@ -96,6 +96,7 @@ public class PolicyEntry extends Destina
private long inactiveTimoutBeforeGC =
BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean reduceMemoryFootprint;
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
+ private boolean doOptimzeMessageStorage = true;
public void configure(Broker broker,Queue queue) {
@@ -171,6 +172,7 @@ public class PolicyEntry extends Destina
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
+ destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -832,4 +834,12 @@ public class PolicyEntry extends Destina
public NetworkBridgeFilterFactory getNetworkBridgeFilterFactory() {
return networkBridgeFilterFactory;
}
+
+ public boolean isDoOptimzeMessageStorage() {
+ return doOptimzeMessageStorage;
+ }
+
+ public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
+ this.doOptimzeMessageStorage = doOptimzeMessageStorage;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java
Thu Mar 1 16:37:19 2012
@@ -73,12 +73,28 @@ abstract public class AbstractMessageSto
return this.prioritizedMessages;
}
+
+ public void addMessage(final ConnectionContext context, final Message
message,final boolean canOptimizeHint) throws IOException{
+ addMessage(context,message);
+ }
+
+
public Future<Object> asyncAddQueueMessage(final ConnectionContext
context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
}
+ public Future<Object> asyncAddQueueMessage(final ConnectionContext
context, final Message message,final boolean canOptimizeHint) throws
IOException {
+ addMessage(context, message,canOptimizeHint);
+ return FUTURE;
+ }
+
+ public Future<Object> asyncAddTopicMessage(final ConnectionContext
context, final Message message,final boolean canOptimizeHint) throws
IOException {
+ addMessage(context, message,canOptimizeHint);
+ return FUTURE;
+ }
+
public Future<Object> asyncAddTopicMessage(final ConnectionContext
context, final Message message) throws IOException {
addMessage(context, message);
return FUTURE;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
Thu Mar 1 16:37:19 2012
@@ -41,6 +41,16 @@ public interface MessageStore extends Se
* @throws IOException
*/
void addMessage(ConnectionContext context, Message message) throws
IOException;
+
+ /**
+ * Adds a message to the message store
+ *
+ * @param context context
+ * @param message
+ * @param canOptimizeHint - give a hint to the store that the message may
be consumed before it hits the disk
+ * @throws IOException
+ */
+ void addMessage(ConnectionContext context, Message message,boolean
canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
@@ -52,6 +62,18 @@ public interface MessageStore extends Se
* @throws IOException
*/
Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message) throws IOException;
+
+ /**
+ * Adds a message to the message store
+ *
+ * @param context context
+ * @param message
+ * @param canOptimizeHint - give a hint to the store that the message may
be consumed before it hits the disk
+ * @return a Future to track when this is complete
+ * @throws IOException
+ * @throws IOException
+ */
+ Future<Object> asyncAddQueueMessage(ConnectionContext context, Message
message,boolean canOptimizeHint) throws IOException;
/**
* Adds a message to the message store
@@ -64,6 +86,19 @@ public interface MessageStore extends Se
*/
Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message) throws IOException;
+/**
+ * Adds a message to the message store
+ *
+ * @param context context
+ * @param message
+ * @param canOptimizeHint - give a hint to the store that the message may
be consumed before it hits the disk
+ * @return a Future to track when this is complete
+ * @throws IOException
+ * @throws IOException
+ */
+ Future<Object> asyncAddTopicMessage(ConnectionContext context, Message
message,boolean canOptimizeHint) throws IOException;
+
+
/**
* Looks up a message using either the String messageID or the
* messageNumber. Implementations are encouraged to fill in the missing key
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
Thu Mar 1 16:37:19 2012
@@ -44,6 +44,10 @@ public class ProxyMessageStore implement
delegate.addMessage(context, message);
}
+ public void addMessage(ConnectionContext context, Message message, boolean
canOptimizeHint) throws IOException {
+ delegate.addMessage(context,message,canOptimizeHint);
+ }
+
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@@ -105,11 +109,19 @@ public class ProxyMessageStore implement
public Future<Object> asyncAddQueueMessage(ConnectionContext context,
Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
-
+
+ public Future<Object> asyncAddQueueMessage(ConnectionContext context,
Message message, boolean canOptimizeHint) throws IOException {
+ return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
+ }
+
public Future<Object> asyncAddTopicMessage(ConnectionContext context,
Message message) throws IOException {
return delegate.asyncAddTopicMessage(context, message);
}
-
+
+ public Future<Object> asyncAddTopicMessage(ConnectionContext context,
Message message, boolean canOptimizeHint) throws IOException {
+ return asyncAddTopicMessage(context,message,canOptimizeHint);
+ }
+
public void removeAsyncMessage(ConnectionContext context, MessageAck ack)
throws IOException {
delegate.removeAsyncMessage(context, ack);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1295662&r1=1295661&r2=1295662&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Thu Mar 1 16:37:19 2012
@@ -45,6 +45,10 @@ public class ProxyTopicMessageStore impl
delegate.addMessage(context, message);
}
+ public void addMessage(ConnectionContext context, Message message, boolean
canOptimizeHint) throws IOException {
+ delegate.addMessage(context,message,canOptimizeHint);
+ }
+
public Message getMessage(MessageId identity) throws IOException {
return delegate.getMessage(identity);
}
@@ -146,10 +150,18 @@ public class ProxyTopicMessageStore impl
return delegate.asyncAddTopicMessage(context, message);
}
+ public Future<Object> asyncAddTopicMessage(ConnectionContext context,
Message message, boolean canOptimizeHint) throws IOException {
+ return delegate.asyncAddTopicMessage(context,message,canOptimizeHint);
+ }
+
public Future<Object> asyncAddQueueMessage(ConnectionContext context,
Message message) throws IOException {
return delegate.asyncAddQueueMessage(context, message);
}
+ public Future<Object> asyncAddQueueMessage(ConnectionContext context,
Message message, boolean canOptimizeHint) throws IOException {
+ return delegate.asyncAddQueueMessage(context,message,canOptimizeHint);
+ }
+
public void removeAsyncMessage(ConnectionContext context, MessageAck ack)
throws IOException {
delegate.removeAsyncMessage(context, ack);
}