Author: rajdavies
Date: Tue Apr 11 12:09:11 2006
New Revision: 393294
URL: http://svn.apache.org/viewcvs?rev=393294&view=rev
Log:
extra peformance tuning parameters
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Apr 11 12:09:11 2006
@@ -116,8 +116,10 @@
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
- protected boolean asyncDispatch = true;
+ protected boolean asyncDispatch = false;
+ protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
+ private boolean optimizeAcknowledge = true;
private boolean useRetroactiveConsumer;
private int closeTimeout = 15000;
@@ -247,15 +249,18 @@
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
- public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
+ public Session createSession(boolean transacted,int acknowledgeMode)
throws JMSException{
checkClosedOrFailed();
ensureConnectionInfoSent();
- return new ActiveMQSession(this, getNextSessionId(), (transacted ?
Session.SESSION_TRANSACTED
- : (acknowledgeMode == Session.SESSION_TRANSACTED ?
Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch);
+ boolean
doSessionAsync=alwaysSessionAsync||sessions.size()>0||transacted
+ ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
+ return new
ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
+
:(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
+ asyncDispatch,alwaysSessionAsync);
}
/**
- * @return
+ * @return sessionId
*/
protected SessionId getNextSessionId() {
return new SessionId(info.getConnectionId(),
sessionIdGenerator.getNextSequenceId());
@@ -1325,6 +1330,37 @@
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicy = redeliveryPolicy;
}
+
+ /**
+ * @return Returns the alwaysSessionAsync.
+ */
+ public boolean isAlwaysSessionAsync(){
+ return alwaysSessionAsync;
+ }
+
+
+ /**
+ * @param alwaysSessionAsync The alwaysSessionAsync to set.
+ */
+ public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
+ this.alwaysSessionAsync=alwaysSessionAsync;
+ }
+
+ /**
+ * @return Returns the optimizeAcknowledge.
+ */
+ public boolean isOptimizeAcknowledge(){
+ return optimizeAcknowledge;
+ }
+
+
+ /**
+ * @param optimizeAcknowledge The optimizeAcknowledge to set.
+ */
+ public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
+ this.optimizeAcknowledge=optimizeAcknowledge;
+ }
+
private void waitForBrokerInfo() throws JMSException {
try {
@@ -1516,7 +1552,7 @@
}
public void setAsyncDispatch(boolean asyncDispatch) {
- this.asyncDispatch = asyncDispatch;
+ //this.asyncDispatch = asyncDispatch;
}
public boolean isObjectMessageSerializationDefered() {
@@ -1702,4 +1738,7 @@
public String toString() {
return "ActiveMQConnection
{id="+info.getConnectionId()+",clientId="+info.getClientId()+",started="+started.get()+"}";
}
+
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Tue Apr 11 12:09:11 2006
@@ -79,8 +79,10 @@
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
- protected boolean asyncDispatch = true;
+ protected boolean asyncDispatch = false;
+ protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
+ private boolean optimizeAcknowledge = true;
private int closeTimeout = 15000;
private boolean useRetroactiveConsumer;
@@ -233,6 +235,8 @@
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setAsyncDispatch(isAsyncDispatch());
connection.setUseAsyncSend(isUseAsyncSend());
+ connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
+ connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
@@ -417,6 +421,9 @@
props.setProperty("useRetroactiveConsumer",
Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("userName", getUserName());
props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
+ props.setProperty("alwaysSessionAsync",
Boolean.toString(isAlwaysSessionAsync()));
+ props.setProperty("optimizeAcknowledge",
Boolean.toString(isOptimizeAcknowledge()));
+
}
public boolean isOnSendPrepareMessageBody() {
@@ -463,5 +470,33 @@
*/
public void setCloseTimeout(int closeTimeout){
this.closeTimeout=closeTimeout;
+ }
+
+ /**
+ * @return Returns the alwaysSessionAsync.
+ */
+ public boolean isAlwaysSessionAsync(){
+ return alwaysSessionAsync;
+ }
+
+ /**
+ * @param alwaysSessionAsync The alwaysSessionAsync to set.
+ */
+ public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
+ this.alwaysSessionAsync=alwaysSessionAsync;
+ }
+
+ /**
+ * @return Returns the optimizeAcknowledge.
+ */
+ public boolean isOptimizeAcknowledge(){
+ return optimizeAcknowledge;
+ }
+
+ /**
+ * @param optimizeAcknowledge The optimizeAcknowledge to set.
+ */
+ public void setOptimizeAcknowledge(boolean optimizeAcknowledge){
+ this.optimizeAcknowledge=optimizeAcknowledge;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Apr 11 12:09:11 2006
@@ -100,7 +100,7 @@
private int additionalWindowSize = 0;
private int rollbackCounter = 0;
private long redeliveryDelay = 0;
-
+ private int ackCounter = 0;
private MessageListener messageListener;
private JMSConsumerStatsImpl stats;
@@ -111,6 +111,7 @@
private MessageAvailableListener availableListener;
private RedeliveryPolicy redeliveryPolicy;
+ private boolean optimizeAcknowledge;
/**
* Create a MessageConsumer
@@ -188,7 +189,9 @@
this.session.removeConsumer(this);
throw e;
}
-
+
this.optimizeAcknowledge=session.connection.isOptimizeAcknowledge()&&!info.isDurable()
+ &&!info.getDestination().isQueue()
+ &&session.isAutoAcknowledge();
if (session.connection.isStarted())
start();
}
@@ -540,27 +543,36 @@
deliveredMessages.addFirst(md);
}
- private void afterMessageIsConsumed(MessageDispatch md, boolean
messageExpired) throws JMSException {
- if (unconsumedMessages.isClosed())
+ private void afterMessageIsConsumed(MessageDispatch md,boolean
messageExpired) throws JMSException{
+ if(unconsumedMessages.isClosed())
return;
-
- if (messageExpired) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else {
+ if(messageExpired){
+ ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+ }else{
stats.onMessage();
- if (session.isTransacted()) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else if (session.isAutoAcknowledge()) {
- if (!deliveredMessages.isEmpty()) {
- MessageAck ack = new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
- session.asyncSendPacket(ack);
- deliveredMessages.clear();
+ if(session.isTransacted()){
+ ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+ }else if(session.isAutoAcknowledge()){
+ if(!deliveredMessages.isEmpty()){
+ if(this.optimizeAcknowledge){
+ ackCounter++;
+ if(ackCounter>=(info.getPrefetchSize()*.75)){
+ MessageAck ack=new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,ackCounter);
+ session.asyncSendPacket(ack);
+ ackCounter=0;
+ deliveredMessages.clear();
+ }
+ }else{
+ MessageAck ack=new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ session.asyncSendPacket(ack);
+ deliveredMessages.clear();
+ }
}
- } else if (session.isDupsOkAcknowledge()) {
- ackLater(md, MessageAck.STANDARD_ACK_TYPE);
- } else if (session.isClientAcknowledge()) {
- ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else {
+ }else if(session.isDupsOkAcknowledge()){
+ ackLater(md,MessageAck.STANDARD_ACK_TYPE);
+ }else if(session.isClientAcknowledge()){
+ ackLater(md,MessageAck.DELIVERED_ACK_TYPE);
+ }else{
throw new IllegalStateException("Invalid session state.");
}
}
@@ -645,63 +657,59 @@
redeliveryDelay = 0;
}
- public void rollback() throws JMSException {
- synchronized (unconsumedMessages.getMutex()) {
- if (deliveredMessages.isEmpty())
+ public void rollback() throws JMSException{
+ synchronized(unconsumedMessages.getMutex()){
+ if(optimizeAcknowledge){
+
+ // remove messages read but not acked at the broker yet
through optimizeAcknowledge
+ for(int i=0;(i<deliveredMessages.size())&&(i<ackCounter);i++){
+ deliveredMessages.removeLast();
+ }
+ }
+ if(deliveredMessages.isEmpty())
return;
-
rollbackCounter++;
- if (rollbackCounter > redeliveryPolicy.getMaximumRedeliveries()) {
-
+ if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
// We need to NACK the messages so that they get sent to the
// DLQ.
-
// Acknowledge the last message.
- MessageDispatch lastMd = (MessageDispatch)
deliveredMessages.get(0);
- MessageAck ack = new MessageAck(lastMd,
MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+ MessageDispatch lastMd=(MessageDispatch)
deliveredMessages.get(0);
+ MessageAck ack=new
MessageAck(lastMd,MessageAck.POSION_ACK_TYPE,deliveredMessages.size());
session.asyncSendPacket(ack);
-
// Adjust the window size.
- additionalWindowSize = Math.max(0, additionalWindowSize -
deliveredMessages.size());
- rollbackCounter = 0;
- redeliveryDelay = 0;
-
- } else {
-
+
additionalWindowSize=Math.max(0,additionalWindowSize-deliveredMessages.size());
+ rollbackCounter=0;
+ redeliveryDelay=0;
+ }else{
// stop the delivery of messages.
unconsumedMessages.stop();
-
// Start up the delivery again a little later.
- if (redeliveryDelay == 0) {
- redeliveryDelay =
redeliveryPolicy.getInitialRedeliveryDelay();
- } else {
- if (redeliveryPolicy.isUseExponentialBackOff())
- redeliveryDelay *=
redeliveryPolicy.getBackOffMultiplier();
+ if(redeliveryDelay==0){
+
redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
+ }else{
+ if(redeliveryPolicy.isUseExponentialBackOff())
+
redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
}
-
- Scheduler.executeAfterDelay(new Runnable() {
- public void run() {
- try {
- if (started.get())
+ Scheduler.executeAfterDelay(new Runnable(){
+ public void run(){
+ try{
+ if(started.get())
start();
- } catch (JMSException e) {
+ }catch(JMSException e){
session.connection.onAsyncException(e);
}
}
- }, redeliveryDelay);
-
- for (Iterator iter = deliveredMessages.iterator();
iter.hasNext();) {
- MessageDispatch md = (MessageDispatch) iter.next();
+ },redeliveryDelay);
+ for(Iterator
iter=deliveredMessages.iterator();iter.hasNext();){
+ MessageDispatch md=(MessageDispatch) iter.next();
md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md);
}
}
-
- deliveredCounter -= deliveredMessages.size();
+ deliveredCounter-=deliveredMessages.size();
deliveredMessages.clear();
}
-
- if (messageListener != null) {
+ if(messageListener!=null){
session.redispatch(unconsumedMessages);
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Tue Apr 11 12:09:11 2006
@@ -72,6 +72,7 @@
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.thread.Scheduler;
+import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -196,24 +197,29 @@
protected boolean closed;
protected boolean asyncDispatch;
+ protected boolean sessionAsyncDispatch;
+ protected TaskRunner taskRunner;
/**
* Construct the Session
*
* @param connection
+ * @param sessionId
* @param acknowledgeMode
* n.b if transacted - the acknowledgeMode ==
* Session.SESSION_TRANSACTED
+ * @param asyncDispatch
+ * @param sessionAsyncDispatch
* @throws JMSException
* on internal error
*/
- protected ActiveMQSession(ActiveMQConnection connection, SessionId
sessionId, int acknowledgeMode, boolean asyncDispatch)
+ protected ActiveMQSession(ActiveMQConnection connection, SessionId
sessionId, int acknowledgeMode, boolean asyncDispatch,boolean
sessionAsyncDispatch)
throws JMSException {
this.connection = connection;
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch=asyncDispatch;
-
+ this.sessionAsyncDispatch = sessionAsyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(),
sessionId.getValue());
setTransactionContext(new TransactionContext(connection));
connection.addSession(this);
@@ -224,6 +230,10 @@
start();
}
+
+ protected ActiveMQSession(ActiveMQConnection connection, SessionId
sessionId, int acknowledgeMode, boolean asyncDispatch)throws JMSException {
+ this(connection,sessionId,acknowledgeMode,asyncDispatch,true);
+ }
/**
* Sets the transaction context of the session.
@@ -1663,6 +1673,20 @@
public void setAsyncDispatch(boolean asyncDispatch) {
this.asyncDispatch = asyncDispatch;
}
+
+ /**
+ * @return Returns the sessionAsyncDispatch.
+ */
+ public boolean isSessionAsyncDispatch(){
+ return sessionAsyncDispatch;
+ }
+
+ /**
+ * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
+ */
+ public void setSessionAsyncDispatch(boolean sessionAsyncDispatch){
+ this.sessionAsyncDispatch=sessionAsyncDispatch;
+ }
public List getUnconsumedMessages() {
return executor.getUnconsumedMessages();
@@ -1683,5 +1707,7 @@
}
}
}
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?rev=393294&r1=393293&r2=393294&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
Tue Apr 11 12:09:11 2006
@@ -52,7 +52,7 @@
void execute(MessageDispatch message) throws InterruptedException {
- if (!session.isAsyncDispatch() && !dispatchedBySessionPool){
+ if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool){
dispatch(message);
}else {
messageQueue.enqueue(message);