Author: dejanb
Date: Thu Nov 26 17:00:49 2009
New Revision: 884644
URL: http://svn.apache.org/viewvc?rev=884644&view=rev
Log:
merging 884633 - https://issues.apache.org/activemq/browse/AMQ-2515 - Optimized
Acknowledgements and interrupted transport
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=884644&r1=884643&r2=884644&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Nov 26 17:00:49 2009
@@ -134,6 +134,9 @@
private long lastDeliveredSequenceId;
private IOException failureError;
+
+ private long optimizeAckTimestamp = System.currentTimeMillis();
+ private long optimizeAckTimeout = 300;
/**
* Create a MessageConsumer
@@ -788,7 +791,7 @@
}
}
}
-
+
private void afterMessageIsConsumed(MessageDispatch md, boolean
messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
@@ -809,12 +812,13 @@
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
- if (ackCounter >=
(info.getCurrentPrefetchSize() * .65)) {
+ if (ackCounter >= (info.getPrefetchSize() *
.65) || System.currentTimeMillis() >= (optimizeAckTimestamp +
optimizeAckTimeout)) {
MessageAck ack =
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
+ optimizeAckTimestamp =
System.currentTimeMillis();
}
}
} else {
@@ -1074,14 +1078,13 @@
session.connection.rollbackDuplicate(this,
old.getMessage());
}
}
- if (pendingAck != null && pendingAck.isDeliveredAck()) {
- // on resumption a pending delivered ack will be out
of sync with
- // re deliveries.
- if (LOG.isDebugEnabled()) {
- LOG.debug("removing pending delivered ack on
transport interupt: " + pendingAck);
- }
- pendingAck = null;
+ if (!session.isTransacted()) {
+ // clean, so we don't have duplicates with
optimizeAcknowledge
+ synchronized (deliveredMessages) {
+ deliveredMessages.clear();
+ }
}
+ pendingAck = null;
}
if (!unconsumedMessages.isClosed()) {
if (this.info.isBrowser() ||
!session.connection.isDuplicate(this, md.getMessage())) {