Author: rajdavies
Date: Fri Oct 19 12:01:10 2007
New Revision: 586580
URL: http://svn.apache.org/viewvc?rev=586580&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1452 and
https://issues.apache.org/activemq/browse/AMQ-729
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Fri Oct 19 12:01:10 2007
@@ -16,8 +16,6 @@
*/
package org.apache.activemq;
-import java.util.Map;
-
import javax.jms.JMSException;
import javax.jms.Message;
@@ -37,8 +35,9 @@
private static final int DEFAULT_WINDOW_SIZE = 1024;
private static final int MAXIMUM_PRODUCER_COUNT = 128;
- private int windowSize;
- private Map<Object, BitArrayBin> map;
+ private int auditDepth;
+ private int maximumNumberOfProducersToTrack;
+ private LRUCache<Object, BitArrayBin> map;
/**
* Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack =
@@ -51,13 +50,44 @@
/**
* Construct a MessageAudit
*
- * @param windowSize range of ids to track
+ * @param auditDepth range of ids to track
* @param maximumNumberOfProducersToTrack number of producers expected in
* the system
*/
- public ActiveMQMessageAudit(int windowSize, final int
maximumNumberOfProducersToTrack) {
- this.windowSize = windowSize;
- map = new LRUCache<Object,
BitArrayBin>(maximumNumberOfProducersToTrack, maximumNumberOfProducersToTrack,
0.75f, true);
+ public ActiveMQMessageAudit(int auditDepth, final int
maximumNumberOfProducersToTrack) {
+ this.auditDepth = auditDepth;
+ this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
+ this.map = new LRUCache<Object, BitArrayBin>(0,
maximumNumberOfProducersToTrack, 0.75f, true);
+ }
+
+ /**
+ * @return the auditDepth
+ */
+ public int getAuditDepth() {
+ return auditDepth;
+ }
+
+ /**
+ * @param auditDepth the auditDepth to set
+ */
+ public void setAuditDepth(int auditDepth) {
+ this.auditDepth = auditDepth;
+ }
+
+ /**
+ * @return the maximumNumberOfProducersToTrack
+ */
+ public int getMaximumNumberOfProducersToTrack() {
+ return maximumNumberOfProducersToTrack;
+ }
+
+ /**
+ * @param maximumNumberOfProducersToTrack the
maximumNumberOfProducersToTrack to set
+ */
+ public void setMaximumNumberOfProducersToTrack(
+ int maximumNumberOfProducersToTrack) {
+ this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
+ this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
}
/**
@@ -67,7 +97,7 @@
* @return true if the message is a duplicate
* @throws JMSException
*/
- public boolean isDuplicateMessage(Message message) throws JMSException {
+ public boolean isDuplicate(Message message) throws JMSException {
return isDuplicate(message.getJMSMessageID());
}
@@ -84,7 +114,7 @@
if (seed != null) {
BitArrayBin bab = map.get(seed);
if (bab == null) {
- bab = new BitArrayBin(windowSize);
+ bab = new BitArrayBin(auditDepth);
map.put(seed, bab);
}
long index = IdGenerator.getSequenceFromId(id);
@@ -101,9 +131,9 @@
* @param message
* @return true if the message is a duplicate
*/
- public boolean isDuplicateMessageReference(final MessageReference message)
{
+ public boolean isDuplicate(final MessageReference message) {
MessageId id = message.getMessageId();
- return isDuplicateMessageId(id);
+ return isDuplicate(id);
}
/**
@@ -112,7 +142,7 @@
* @param id
* @return true if the message is a duplicate
*/
- public synchronized boolean isDuplicateMessageId(final MessageId id) {
+ public synchronized boolean isDuplicate(final MessageId id) {
boolean answer = false;
if (id != null) {
@@ -120,7 +150,7 @@
if (pid != null) {
BitArrayBin bab = map.get(pid);
if (bab == null) {
- bab = new BitArrayBin(windowSize);
+ bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
}
answer = bab.setBit(id.getProducerSequenceId(), true);
@@ -134,9 +164,9 @@
*
* @param message
*/
- public void rollbackMessageReference(final MessageReference message) {
+ public void rollback(final MessageReference message) {
MessageId id = message.getMessageId();
- rollbackMessageId(id);
+ rollback(id);
}
/**
@@ -144,7 +174,7 @@
*
* @param id
*/
- public synchronized void rollbackMessageId(final MessageId id) {
+ public synchronized void rollback(final MessageId id) {
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
@@ -155,4 +185,58 @@
}
}
}
+
+ /**
+ * Check the message is in order
+ * @param msg
+ * @return
+ * @throws JMSException
+ */
+ public boolean isInOrder(Message msg) throws JMSException {
+ return isInOrder(msg.getJMSMessageID());
+ }
+
+ /**
+ * Check the message id is in order
+ * @param id
+ * @return
+ */
+ public synchronized boolean isInOrder(final String id) {
+ boolean answer = true;
+
+ if (id != null) {
+ String seed = IdGenerator.getSeedFromId(id);
+ if (seed != null) {
+ BitArrayBin bab = map.get(seed);
+ if (bab != null) {
+ long index = IdGenerator.getSequenceFromId(id);
+ answer = bab.isInOrder(index);
+ }
+
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Check the MessageId is in order
+ * @param id
+ * @return
+ */
+ public synchronized boolean isInOrder(final MessageId id) {
+ boolean answer = true;
+
+ if (id != null) {
+ ProducerId pid = id.getProducerId();
+ if (pid != null) {
+ BitArrayBin bab = map.get(pid);
+ if (bab != null) {
+ answer = bab.isInOrder(id.getProducerSequenceId());
+ }
+
+ }
+ }
+ return answer;
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
Fri Oct 19 12:01:10 2007
@@ -44,7 +44,7 @@
audit = new ActiveMQMessageAudit();
destinations.put(destination, audit);
}
- boolean result =
audit.isDuplicateMessageReference(message);
+ boolean result = audit.isDuplicate(message);
return result;
}
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
@@ -52,7 +52,7 @@
audit = new ActiveMQMessageAudit();
dispatchers.put(dispatcher, audit);
}
- boolean result = audit.isDuplicateMessageReference(message);
+ boolean result = audit.isDuplicate(message);
return result;
}
}
@@ -66,12 +66,12 @@
if (destination.isQueue()) {
ActiveMQMessageAudit audit = destinations.get(destination);
if (audit != null) {
- audit.rollbackMessageReference(message);
+ audit.rollback(message);
}
} else {
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
if (audit != null) {
- audit.rollbackMessageReference(message);
+ audit.rollback(message);
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
Fri Oct 19 12:01:10 2007
@@ -211,14 +211,14 @@
public void afterRollback() {
if (audit != null) {
- audit.rollbackMessageReference(message);
+ audit.rollback(message);
}
}
};
transaction.addSynchronization(sync);
}
}
- if (audit == null || !audit.isDuplicateMessageReference(message)) {
+ if (audit == null || !audit.isDuplicate(message)) {
context.setTransaction(transaction);
try {
next.send(producerExchange, message);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Oct 19 12:01:10 2007
@@ -23,12 +23,57 @@
public abstract class BaseDestination implements Destination {
private boolean producerFlowControl = true;
-
+ private int maxProducersToAudit=1024;
+ private int maxAuditDepth=1;
+ private boolean enableAudit=true;
+ /**
+ * @return the producerFlowControl
+ */
public boolean isProducerFlowControl() {
- return this.producerFlowControl;
+ return producerFlowControl;
}
-
- public void setProducerFlowControl(boolean value) {
- this.producerFlowControl = value;
+ /**
+ * @param producerFlowControl the producerFlowControl to set
+ */
+ public void setProducerFlowControl(boolean producerFlowControl) {
+ this.producerFlowControl = producerFlowControl;
+ }
+ /**
+ * @return the maxProducersToAudit
+ */
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+ /**
+ * @param maxProducersToAudit the maxProducersToAudit to set
+ */
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+ /**
+ * @return the maxAuditDepth
+ */
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
}
+ /**
+ * @param maxAuditDepth the maxAuditDepth to set
+ */
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+ /**
+ * @return the enableAudit
+ */
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+ /**
+ * @param enableAudit the enableAudit to set
+ */
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ }
+
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Oct 19 12:01:10 2007
@@ -142,6 +142,9 @@
if (store != null) {
// Restore the persistent messages.
messages.setSystemUsage(systemUsage);
+ messages.setEnableAudit(isEnableAudit());
+ messages.setMaxAuditDepth(getMaxAuditDepth());
+ messages.setMaxProducersToAudit(getMaxProducersToAudit());
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {
@@ -442,7 +445,7 @@
}
}
- void doMessageSend(final ProducerBrokerExchange producerExchange, final
Message message) throws IOException, Exception {
+ synchronized void doMessageSend(final ProducerBrokerExchange
producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context =
producerExchange.getConnectionContext();
message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
@@ -567,7 +570,7 @@
doPageIn(false);
}
- public void stop() throws Exception {
+ public void stop() throws Exception{
if (taskRunner != null) {
taskRunner.shutdown();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -17,9 +17,12 @@
package org.apache.activemq.broker.region.cursors;
import java.util.LinkedList;
+
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage;
/**
@@ -32,11 +35,21 @@
protected int memoryUsageHighWaterMark = 90;
protected int maxBatchSize = 100;
protected SystemUsage systemUsage;
-
- public void start() throws Exception {
+ protected int maxProducersToAudit=1024;
+ protected int maxAuditDepth=1;
+ protected boolean enableAudit=true;
+ protected ActiveMQMessageAudit audit;
+ private boolean started=false;
+
+ public synchronized void start() throws Exception {
+ if (!started && enableAudit && audit==null) {
+ audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ }
+ started=true;
}
- public void stop() throws Exception {
+ public synchronized void stop() throws Exception {
+ started=false;
gc();
}
@@ -168,4 +181,68 @@
public LinkedList pageInList(int maxItems) {
throw new RuntimeException("Not supported");
}
+
+ /**
+ * @return the maxProducersToAudit
+ */
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ /**
+ * @param maxProducersToAudit the maxProducersToAudit to set
+ */
+ public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ if (audit != null) {
+ this.audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+ }
+ }
+
+ /**
+ * @return the maxAuditDepth
+ */
+ public int getMaxAuditDepth() {
+ return this.maxAuditDepth;
+ }
+
+
+ /**
+ * @param maxAuditDepth the maxAuditDepth to set
+ */
+ public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ if (audit != null) {
+ this.audit.setAuditDepth(maxAuditDepth);
+ }
+ }
+
+
+ /**
+ * @return the enableAudit
+ */
+ public boolean isEnableAudit() {
+ return this.enableAudit;
+ }
+
+ /**
+ * @param enableAudit the enableAudit to set
+ */
+ public synchronized void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ if (this.enableAudit && started && audit==null) {
+ audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ }
+ }
+
+
+ protected synchronized boolean isDuplicate(MessageId messageId) {
+ if (!this.enableAudit || this.audit==null) {
+ return false;
+ }
+ return this.audit.isDuplicate(messageId);
+ }
+
+
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -63,17 +63,18 @@
this.store = store;
}
- public void start() {
+ public void start() throws Exception {
if (started.compareAndSet(false, true)) {
+ super.start();
if (systemUsage != null) {
systemUsage.getMemoryUsage().addUsageListener(this);
}
}
}
- public void stop() {
+ public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
- gc();
+ super.stop();
if (systemUsage != null) {
systemUsage.getMemoryUsage().removeUsageListener(this);
}
@@ -118,7 +119,7 @@
}
}
- public synchronized void destroy() {
+ public synchronized void destroy() throws Exception {
stop();
for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
Message node = (Message)i.next();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
Fri Oct 19 12:01:10 2007
@@ -210,5 +210,37 @@
* @return a list of paged in messages
*/
LinkedList pageInList(int maxItems);
+
+ /**
+ * set the maximum number of producers to track at one time
+ * @param value
+ */
+ void setMaxProducersToAudit(int value);
+
+ /**
+ * @return the maximum number of producers to audit
+ */
+ int getMaxProducersToAudit();
+
+ /**
+ * Set the maximum depth of message ids to track
+ * @param depth
+ */
+ void setMaxAuditDepth(int depth);
+
+ /**
+ * @return the audit depth
+ */
+ int getMaxAuditDepth();
+
+ /**
+ * @return the enableAudit
+ */
+ public boolean isEnableAudit();
+ /**
+ * @param enableAudit the enableAudit to set
+ */
+ public void setEnableAudit(boolean enableAudit);
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
Fri Oct 19 12:01:10 2007
@@ -29,7 +29,7 @@
import org.apache.commons.logging.LogFactory;
/**
- * perist pending messages pending message (messages awaiting disptach to a
+ * persist pending messages pending message (messages awaiting dispatch to a
* consumer) cursor
*
* @version $Revision: 474985 $
@@ -42,6 +42,7 @@
private final LinkedList<Message> batchList = new LinkedList<Message>();
private Destination regionDestination;
private int size;
+ private boolean fillBatchDuplicates;
/**
* @param topic
@@ -55,13 +56,14 @@
}
- public void start() throws Exception {
+ public void start() throws Exception{
+ super.start();
store.resetBatching();
}
public void stop() throws Exception {
store.resetBatching();
- gc();
+ super.stop();
}
/**
@@ -127,10 +129,18 @@
public void finished() {
}
- public boolean recoverMessage(Message message) throws Exception {
- message.setRegionDestination(regionDestination);
- message.incrementReferenceCount();
- batchList.addLast(message);
+ public synchronized boolean recoverMessage(Message message)
+ throws Exception {
+ if (!isDuplicate(message.getMessageId())) {
+ message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
+ batchList.addLast(message);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring batched duplicated from store: " +
message);
+ }
+ fillBatchDuplicates=true;
+ }
return true;
}
@@ -153,8 +163,13 @@
}
// implementation
- protected void fillBatch() throws Exception {
+ protected synchronized void fillBatch() throws Exception {
store.recoverNextMessages(maxBatchSize, this);
+ while (fillBatchDuplicates && batchList.isEmpty()) {
+ fillBatchDuplicates=false;
+ store.recoverNextMessages(maxBatchSize, this);
+ }
+ fillBatchDuplicates=false;
}
public String toString() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
Fri Oct 19 12:01:10 2007
@@ -69,6 +69,7 @@
public synchronized void start() throws Exception {
if (!started) {
started = true;
+ super.start();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.start();
}
@@ -78,6 +79,7 @@
public synchronized void stop() throws Exception {
if (started) {
started = false;
+ super.stop();
for (PendingMessageCursor tsp : storePrefetches) {
tsp.stop();
}
@@ -96,6 +98,9 @@
TopicStorePrefetch tsp = new
TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setSystemUsage(systemUsage);
+ tsp.setEnableAudit(isEnableAudit());
+ tsp.setMaxAuditDepth(getMaxAuditDepth());
+ tsp.setMaxProducersToAudit(getMaxProducersToAudit());
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (started) {
@@ -251,6 +256,36 @@
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator();
i.hasNext();) {
PendingMessageCursor tsp = i.next();
tsp.setSystemUsage(usageManager);
+ }
+ }
+
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ super.setMaxProducersToAudit(maxProducersToAudit);
+ for (PendingMessageCursor cursor : storePrefetches) {
+ cursor.setMaxAuditDepth(maxAuditDepth);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
+ }
+ }
+
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ super.setMaxAuditDepth(maxAuditDepth);
+ for (PendingMessageCursor cursor : storePrefetches) {
+ cursor.setMaxAuditDepth(maxAuditDepth);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setMaxAuditDepth(maxAuditDepth);
+ }
+ }
+
+ public synchronized void setEnableAudit(boolean enableAudit) {
+ super.setEnableAudit(enableAudit);
+ for (PendingMessageCursor cursor : storePrefetches) {
+ cursor.setEnableAudit(enableAudit);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setEnableAudit(enableAudit);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Fri Oct 19 12:01:10 2007
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.cursors;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
@@ -55,10 +56,14 @@
public synchronized void start() throws Exception {
started = true;
+ super.start();
if (nonPersistent == null) {
nonPersistent = new
FilePendingMessageCursor(queue.getDestination(), tmpStore);
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setSystemUsage(systemUsage);
+ nonPersistent.setEnableAudit(isEnableAudit());
+ nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
+ nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
}
nonPersistent.start();
persistent.start();
@@ -67,6 +72,7 @@
public synchronized void stop() throws Exception {
started = false;
+ super.stop();
if (nonPersistent != null) {
nonPersistent.stop();
nonPersistent.gc();
@@ -191,6 +197,39 @@
}
super.setMaxBatchSize(maxBatchSize);
}
+
+
+ public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+ super.setMaxProducersToAudit(maxProducersToAudit);
+ if (persistent != null) {
+ persistent.setMaxProducersToAudit(maxProducersToAudit);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
+ }
+ }
+
+ public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+ super.setMaxAuditDepth(maxAuditDepth);
+ if (persistent != null) {
+ persistent.setMaxAuditDepth(maxAuditDepth);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setMaxAuditDepth(maxAuditDepth);
+ }
+ }
+
+ public synchronized void setEnableAudit(boolean enableAudit) {
+ super.setEnableAudit(enableAudit);
+ if (persistent != null) {
+ persistent.setEnableAudit(enableAudit);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setEnableAudit(enableAudit);
+ }
+ }
+
+
public synchronized void gc() {
if (persistent != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Fri Oct 19 12:01:10 2007
@@ -65,16 +65,18 @@
this.subscriberName = subscriberName;
}
- public synchronized void start() {
+ public synchronized void start() throws Exception {
if (!started) {
started = true;
+ super.start();
safeFillBatch();
}
}
- public synchronized void stop() {
+ public synchronized void stop() throws Exception {
if (started) {
started = false;
+ super.stop();
store.resetBatching(clientId, subscriberName);
gc();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Oct 19 12:01:10 2007
@@ -51,6 +51,9 @@
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy
pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
+ private int maxProducersToAudit=1024;
+ private int maxAuditDepth=1;
+ private boolean enableAudit=true;
private boolean producerFlowControl = true;
public void configure(Queue queue, Store tmpStore) {
@@ -69,6 +72,9 @@
queue.setMessages(messages);
}
queue.setProducerFlowControl(isProducerFlowControl());
+ queue.setEnableAudit(isEnableAudit());
+ queue.setMaxAuditDepth(getMaxAuditDepth());
+ queue.setMaxProducersToAudit(getMaxProducersToAudit());
}
public void configure(Topic topic) {
@@ -86,6 +92,9 @@
topic.getBrokerMemoryUsage().setLimit(memoryLimit);
}
topic.setProducerFlowControl(isProducerFlowControl());
+ topic.setEnableAudit(isEnableAudit());
+ topic.setMaxAuditDepth(getMaxAuditDepth());
+ topic.setMaxProducersToAudit(getMaxProducersToAudit());
}
public void configure(Broker broker, SystemUsage memoryManager,
TopicSubscription subscription) {
@@ -266,12 +275,60 @@
this.pendingSubscriberPolicy = pendingSubscriberPolicy;
}
+ /**
+ * @return true if producer flow control enabled
+ */
public boolean isProducerFlowControl() {
return producerFlowControl;
}
+ /**
+ * @param producerFlowControl
+ */
public void setProducerFlowControl(boolean producerFlowControl) {
this.producerFlowControl = producerFlowControl;
+ }
+
+ /**
+ * @return the maxProducersToAudit
+ */
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ /**
+ * @param maxProducersToAudit the maxProducersToAudit to set
+ */
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+
+ /**
+ * @return the maxAuditDepth
+ */
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
+ }
+
+ /**
+ * @param maxAuditDepth the maxAuditDepth to set
+ */
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+
+ /**
+ * @return the enableAudit
+ */
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ /**
+ * @param enableAudit the enableAudit to set
+ */
+ public void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
Fri Oct 19 12:01:10 2007
@@ -29,6 +29,7 @@
private int maxNumberOfArrays;
private int firstIndex = -1;
private int firstBin = -1;
+ private long lastBitSet=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to
@@ -60,8 +61,25 @@
if (offset >= 0) {
answer = ba.set(offset, value);
}
+ if (value) {
+ lastBitSet=index;
+ }else {
+ lastBitSet=-1;
+ }
}
return answer;
+ }
+
+ /**
+ * Test if in order
+ * @param index
+ * @return true if next message is in order
+ */
+ public boolean isInOrder(long index) {
+ if (lastBitSet== -1) {
+ return true;
+ }
+ return lastBitSet+1==index;
}
/**
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java?rev=586580&r1=586579&r2=586580&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
Fri Oct 19 12:01:10 2007
@@ -88,10 +88,32 @@
ActiveMQMessage msg = new ActiveMQMessage();
msg.setMessageId(id);
list.add(msg);
- assertFalse(audit.isDuplicateMessageReference(msg));
+ assertFalse(audit.isDuplicate(msg.getMessageId()));
}
for (MessageReference msg : list) {
- assertTrue(audit.isDuplicateMessageReference(msg));
+ assertTrue(audit.isDuplicate(msg));
+ }
+ }
+
+ public void testIsInOrderString() {
+ int count = 10000;
+ ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
+ IdGenerator idGen = new IdGenerator();
+ // add to a list
+ List<String> list = new ArrayList<String>();
+ for (int i = 0; i < count; i++) {
+ String id = idGen.generateId();
+ if (i==0) {
+ assertFalse(audit.isDuplicate(id));
+ }
+ if (i > 1 && i%2 != 0) {
+ list.add(id);
+ }
+
+ }
+ for (String id : list) {
+ assertFalse(audit.isInOrder(id));
+ assertFalse(audit.isDuplicate(id));
}
}
}