Author: rajdavies
Date: Thu Apr 20 07:58:50 2006
New Revision: 395611
URL: http://svn.apache.org/viewcvs?rev=395611&view=rev
Log:
finese tuning
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Apr 20 07:58:50 2006
@@ -625,7 +625,7 @@
if(optimizeAcknowledge){
if(deliveryingAcknowledgements.compareAndSet(false,true)){
ackCounter++;
-
if(ackCounter>=(info.getCurrentPrefetchSize()*.50)){
+
if(ackCounter>=(info.getCurrentPrefetchSize()*.65)){
MessageAck ack=new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
ackCounter=0;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQPrefetchPolicy.java
Thu Apr 20 07:58:50 2006
@@ -32,6 +32,7 @@
private int queueBrowserPrefetch;
private int topicPrefetch;
private int durableTopicPrefetch;
+ private int optimizeDurableTopicPrefetch;
private int inputStreamPrefetch;
private int maximumPendingMessageLimit;
@@ -43,6 +44,7 @@
this.queueBrowserPrefetch = 500;
this.topicPrefetch = MAX_PREFETCH_SIZE;
this.durableTopicPrefetch = 100;
+ this.optimizeDurableTopicPrefetch=1000;
this.inputStreamPrefetch = 100;
}
@@ -102,6 +104,20 @@
this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
}
+ /**
+ * @return Returns the optimizeDurableTopicPrefetch.
+ */
+ public int getOptimizeDurableTopicPrefetch(){
+ return optimizeDurableTopicPrefetch;
+ }
+
+ /**
+ * @param optimizeDurableTopicPrefetch The optimizeDurableTopicPrefetch to
set.
+ */
+ public void setOptimizeDurableTopicPrefetch(int
optimizeAcknowledgePrefetch){
+ this.optimizeDurableTopicPrefetch=optimizeAcknowledgePrefetch;
+ }
+
public int getMaximumPendingMessageLimit() {
return maximumPendingMessageLimit;
}
@@ -129,6 +145,7 @@
this.queuePrefetch=i;
this.topicPrefetch=i;
this.inputStreamPrefetch=1;
+ this.optimizeDurableTopicPrefetch=i;
}
public int getInputStreamPrefetch() {
@@ -138,4 +155,6 @@
public void setInputStreamPrefetch(int inputStreamPrefetch) {
this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
}
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Apr 20 07:58:50 2006
@@ -1057,14 +1057,17 @@
* if the message selector is invalid.
* @since 1.1
*/
- public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal)
- throws JMSException {
+ public TopicSubscriber createDurableSubscriber(Topic topic,String
name,String messageSelector,boolean noLocal)
+ throws JMSException{
checkClosed();
connection.checkClientIDWasManuallySpecified();
- ActiveMQPrefetchPolicy prefetchPolicy =
this.connection.getPrefetchPolicy();
- return new ActiveMQTopicSubscriber(this, getNextConsumerId(),
ActiveMQMessageTransformation
- .transformDestination(topic), name, messageSelector,
prefetchPolicy.getDurableTopicPrefetch(),
- prefetchPolicy.getMaximumPendingMessageLimit(), noLocal,
false, asyncDispatch);
+ ActiveMQPrefetchPolicy
prefetchPolicy=this.connection.getPrefetchPolicy();
+ int
prefetch=isAutoAcknowledge()&&connection.isOptimizedMessageDispatch()?prefetchPolicy
+
.getOptimizeDurableTopicPrefetch():prefetchPolicy.getDurableTopicPrefetch();
+ int maxPrendingLimit=prefetchPolicy.getMaximumPendingMessageLimit();
+ return new
ActiveMQTopicSubscriber(this,getNextConsumerId(),ActiveMQMessageTransformation
+
.transformDestination(topic),name,messageSelector,prefetch,maxPrendingLimit,noLocal,false,
+ asyncDispatch);
}
/**
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
Thu Apr 20 07:58:50 2006
@@ -32,7 +32,7 @@
*/
final class DataManager{
private static final Log log=LogFactory.getLog(DataManager.class);
- protected static long MAX_FILE_LENGTH=1024*1024*16;
+ protected static long MAX_FILE_LENGTH=1024*1024*32;
private final File dir;
private final String prefix;
private StoreDataReader reader;
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
Thu Apr 20 07:58:50 2006
@@ -281,7 +281,7 @@
session.createTextMessage("Second Message")
};
- // lets consume any outstanding messages from previous test runs
+ // lets consume any outstanding messages from prev test runs
while (consumer.receive(1000) != null) {
}
session.commit();
@@ -306,7 +306,7 @@
assertEquals(outbound[1], message);
session.rollback();
- // Consume again.. the previous message should
+ // Consume again.. the prev message should
// get redelivered.
message = consumer.receive(5000);
assertNotNull("Should have re-received the message again!", message);
@@ -329,7 +329,7 @@
session.createTextMessage("Second Message")
};
- // lets consume any outstanding messages from previous test runs
+ // lets consume any outstanding messages from prev test runs
while (consumer.receive(1000) != null) {
}
session.commit();
@@ -351,7 +351,7 @@
assertEquals(outbound[1], message);
session.rollback();
- // Consume again.. the previous message should
+ // Consume again.. the prev message should
// get redelivered.
message = consumer.receive(5000);
assertNotNull("Should have re-received the first message again!",
message);
@@ -445,7 +445,7 @@
session.createTextMessage("Second Message")
};
- // lets consume any outstanding messages from previous test runs
+ // lets consume any outstanding messages from prev test runs
while (consumer.receiveNoWait() != null) {
}
@@ -529,7 +529,7 @@
protected void reconnect() throws JMSException {
if (connection != null) {
- // Close the previous connection.
+ // Close the prev connection.
connection.close();
}
session = null;
@@ -562,6 +562,7 @@
prefetchPolicy.setQueuePrefetch(1);
prefetchPolicy.setTopicPrefetch(1);
prefetchPolicy.setDurableTopicPrefetch(1);
+ prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
}
public void testMessageListener() throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java?rev=395611&r1=395610&r2=395611&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/LargeMessageTestSupport.java
Thu Apr 20 07:58:50 2006
@@ -130,6 +130,7 @@
activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
+
activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
}
public void tearDown() throws Exception {