Author: dejanb
Date: Fri Mar 13 12:33:18 2009
New Revision: 753222
URL: http://svn.apache.org/viewvc?rev=753222&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-2160
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=753222&r1=753221&r2=753222&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Fri Mar 13 12:33:18 2009
@@ -2196,6 +2196,14 @@
this.producerWindowSize = producerWindowSize;
}
+ public void setAuditDepth(int auditDepth) {
+ connectionAudit.setAuditDepth(auditDepth);
+ }
+
+ public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+
connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
+ }
+
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=753222&r1=753221&r2=753222&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Fri Mar 13 12:33:18 2009
@@ -112,6 +112,8 @@
private boolean sendAcksAsync=true;
private TransportListener transportListener;
private ExceptionListener exceptionListener;
+ private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+ private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
// /////////////////////////////////////////////
//
@@ -310,6 +312,8 @@
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
connection.setSendAcksAsync(isSendAcksAsync());
+ connection.setAuditDepth(getAuditDepth());
+
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -669,6 +673,8 @@
props.setProperty("producerWindowSize",
Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
+ props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
+ props.setProperty("auditMaximumProducerNumber",
Integer.toString(getAuditMaximumProducerNumber()));
}
public boolean isUseCompression() {
@@ -882,4 +888,20 @@
public void setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
+
+ public int getAuditDepth() {
+ return auditDepth;
+ }
+
+ public void setAuditDepth(int auditDepth) {
+ this.auditDepth = auditDepth;
+ }
+
+ public int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ public void setAuditMaximumProducerNumber(int
auditMaximumProducerNumber) {
+ this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java?rev=753222&r1=753221&r2=753222&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
Fri Mar 13 12:33:18 2009
@@ -33,15 +33,15 @@
*/
public class ActiveMQMessageAudit {
- private static final int DEFAULT_WINDOW_SIZE = 2048;
- private static final int MAXIMUM_PRODUCER_COUNT = 64;
+ public static final int DEFAULT_WINDOW_SIZE = 2048;
+ public static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
/**
- * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack =
- * 128
+ * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
+ * 64
*/
public ActiveMQMessageAudit() {
this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java?rev=753222&r1=753221&r2=753222&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java
Fri Mar 13 12:33:18 2009
@@ -30,6 +30,11 @@
private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit>
destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit>
dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
+
+ private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
+ private int auditMaximumProducerNumber =
ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
+
+
synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
dispatchers.remove(dispatcher);
}
@@ -41,7 +46,7 @@
if (destination.isQueue()) {
ActiveMQMessageAudit audit = destinations.get(destination);
if (audit == null) {
- audit = new ActiveMQMessageAudit();
+ audit = new ActiveMQMessageAudit(auditDepth,
auditMaximumProducerNumber);
destinations.put(destination, audit);
}
boolean result = audit.isDuplicate(message);
@@ -49,7 +54,7 @@
}
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
if (audit == null) {
- audit = new ActiveMQMessageAudit();
+ audit = new ActiveMQMessageAudit(auditDepth,
auditMaximumProducerNumber);
dispatchers.put(dispatcher, audit);
}
boolean result = audit.isDuplicate(message);
@@ -91,4 +96,21 @@
void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
+
+ public int getAuditDepth() {
+ return auditDepth;
+ }
+
+ public void setAuditDepth(int auditDepth) {
+ this.auditDepth = auditDepth;
+ }
+
+ public int getAuditMaximumProducerNumber() {
+ return auditMaximumProducerNumber;
+ }
+
+ public void setAuditMaximumProducerNumber(int
auditMaximumProducerNumber) {
+ this.auditMaximumProducerNumber = auditMaximumProducerNumber;
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java?rev=753222&r1=753221&r2=753222&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQConnectionFactoryTest.java
Fri Mar 13 12:33:18 2009
@@ -82,6 +82,9 @@
assertTrue(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL());
+
+ cf = new
ActiveMQConnectionFactory("vm://localhost?jms.auditDepth=5000");
+ assertEquals(5000, cf.getAuditDepth());
}
public void testUseURIToConfigureRedeliveryPolicy() throws
URISyntaxException, JMSException {