Author: gtully
Date: Fri Jul 17 23:18:29 2009
New Revision: 795270
URL: http://svn.apache.org/viewvc?rev=795270&view=rev
Log:
use non compencating schedualler and ensure DLQ copies message early - ensures
accurate processing of expired messages -
https://issues.apache.org/activemq/browse/AMQ-1112
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jul 17 23:18:29 2009
@@ -187,7 +187,7 @@
}
if (getExpireMessagesPeriod() > 0) {
- scheduler.executePeriodically(expireMessagesTask,
getExpireMessagesPeriod());
+ scheduler.schedualPeriodically(expireMessagesTask,
getExpireMessagesPeriod());
}
super.initialize();
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Fri Jul 17 23:18:29 2009
@@ -45,15 +45,18 @@
* @throws IOException
*/
protected void acknowledge(final ConnectionContext context, final
MessageAck ack, final MessageReference n) throws IOException {
- if (n.isExpired()) {
- if (!broker.isExpired(n)) {
- LOG.info("ignoring ack " + ack + ", for already expired
message: " + n);
- return;
- }
- }
final Destination q = n.getRegionDestination();
final QueueMessageReference node = (QueueMessageReference)n;
final Queue queue = (Queue)q;
+
+ if (n.isExpired()) {
+ if (broker.isExpired(n)) {
+ queue.messageExpired(context, this, node);
+ } else {
+ LOG.debug("ignoring ack " + ack + ", for already expired
message: " + n);
+ }
+ return;
+ }
queue.removeMessage(context, this, node, ack);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Jul 17 23:18:29 2009
@@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
@@ -682,14 +683,14 @@
private boolean stampAsExpired(Message message) throws IOException {
boolean stamped=false;
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
- long expiration=message.getExpiration();
- message.setExpiration(0);
+ long expiration=message.getExpiration();
message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
stamped = true;
}
return stamped;
}
+
public void messageExpired(ConnectionContext context, MessageReference
node) {
if (LOG.isDebugEnabled()) {
LOG.debug("Message expired " + node);
@@ -708,11 +709,10 @@
.getRegionDestination().getDeadLetterStrategy();
if(deadLetterStrategy!=null){
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
- if
(node.getRegionDestination().getActiveMQDestination().isTopic()) {
- // message may be
inflight to other subscriptions so do not modify
- message =
message.copy();
- }
-
if(!message.isPersistent()){
+ // message may be inflight
to other subscriptions so do not modify
+ message = message.copy();
+ message.setExpiration(0);
+ if(!message.isPersistent()){
message.setPersistent(true);
message.setProperty("originalDeliveryMode",
"NON_PERSISTENT");
@@ -727,7 +727,7 @@
if
(context.getBroker()==null) {
context.setBroker(getRoot());
}
-
BrokerSupport.resend(context,message,
+
BrokerSupport.resendNoCopy(context,message,
deadLetterDestination);
}
} else {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Jul 17 23:18:29 2009
@@ -307,8 +307,8 @@
// While waiting for space to free up...
the
// message may have expired.
if (message.isExpired()) {
-
getDestinationStatistics().getExpired().increment();
broker.messageExpired(context,
message);
+
getDestinationStatistics().getExpired().increment();
} else {
doMessageSend(producerExchange,
message);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
Fri Jul 17 23:18:29 2009
@@ -88,7 +88,6 @@
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination
regionDestination;
private transient MemoryUsage memoryUsage;
- private transient boolean expired;
private BrokerId[] brokerPath;
private BrokerId[] cluster;
@@ -339,9 +338,6 @@
public void setExpiration(long expiration) {
this.expiration = expiration;
- if (this.expiration > 0) {
- expired = false;
- }
}
/**
@@ -439,13 +435,8 @@
}
public boolean isExpired() {
- if (!expired) {
- long expireTime = getExpiration();
- if (expireTime > 0 && System.currentTimeMillis() > expireTime) {
- expired = true;
- }
- }
- return expired;
+ long expireTime = getExpiration();
+ return expireTime > 0 && System.currentTimeMillis() > expireTime;
}
public boolean isAdvisory() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
Fri Jul 17 23:18:29 2009
@@ -47,6 +47,16 @@
TIMER_TASKS.put(task, timerTask);
}
+ /*
+ * execute on rough schedual based on termination of last execution. There
is no
+ * compensation (two runs in quick succession) for delays
+ */
+ public synchronized void schedualPeriodically(final Runnable task, long
period) {
+ TimerTask timerTask = new SchedulerTimerTask(task);
+ CLOCK_DAEMON.schedule(timerTask, period, period);
+ TIMER_TASKS.put(task, timerTask);
+ }
+
public synchronized void cancel(Runnable task) {
TimerTask ticket = TIMER_TASKS.remove(task);
if (ticket != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Fri Jul 17 23:18:29 2009
@@ -32,6 +32,10 @@
private BrokerSupport() {
}
+ public static void resendNoCopy(final ConnectionContext context, Message
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
+ doResend(context, originalMessage, deadLetterDestination, false);
+ }
+
/**
* @param context
* @param originalMessage
@@ -39,7 +43,11 @@
* @throws Exception
*/
public static void resend(final ConnectionContext context, Message
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
- Message message = originalMessage.copy();
+ doResend(context, originalMessage, deadLetterDestination, true);
+ }
+
+ public static void doResend(final ConnectionContext context, Message
originalMessage, ActiveMQDestination deadLetterDestination, boolean copy)
throws Exception {
+ Message message = copy ? originalMessage.copy() : originalMessage;
message.setOriginalDestination(message.getDestination());
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=795270&r1=795269&r2=795270&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Fri Jul 17 23:18:29 2009
@@ -49,12 +49,13 @@
private static final Log LOG =
LogFactory.getLog(ExpiredMessagesTest.class);
- BrokerService broker;
- Connection connection;
- Session session;
- MessageProducer producer;
- MessageConsumer consumer;
- public ActiveMQDestination destination = new ActiveMQQueue("test");
+ BrokerService broker;
+ Connection connection;
+ Session session;
+ MessageProducer producer;
+ MessageConsumer consumer;
+ public ActiveMQDestination destination = new ActiveMQQueue("test");
+ public ActiveMQDestination dlqDestination = new
ActiveMQQueue("ActiveMQ.DLQ");
public boolean useTextMessage = true;
public boolean useVMCursor = true;
@@ -103,12 +104,12 @@
consumerThread.start();
-
+ final int numMessagesToSend = 10000;
Thread producingThread = new Thread("Producing Thread") {
public void run() {
try {
int i = 0;
- while (i++ < 10000) {
+ while (i++ < numMessagesToSend) {
producer.send(session.createTextMessage("test"));
}
producer.close();
@@ -159,10 +160,36 @@
return view.getQueueSize() == 0;
}
}));
+
+ final long expiredBeforeEnqueue = numMessagesToSend -
view.getEnqueueCount();
+ final long totalExpiredCount = view.getExpiredCount() +
expiredBeforeEnqueue;
+
+ final DestinationViewMBean dlqView = createView(dlqDestination);
+ LOG.info("DLQ stats: size= " + dlqView.getQueueSize() + ", enqueues: "
+ dlqView.getDequeueCount() + ", dequeues: " + dlqView.getDequeueCount()
+ + ", dispatched: " + dlqView.getDispatchCount() + ", inflight:
" + dlqView.getInFlightCount() + ", expiries: " + dlqView.getExpiredCount());
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return totalExpiredCount == dlqView.getQueueSize();
+ }
+ });
+ assertEquals("dlq contains all expired", totalExpiredCount,
dlqView.getQueueSize());
+
+ // verify DQL
+ MessageConsumer dlqConsumer = createDlqConsumer(connection);
+ int count = 0;
+ while (dlqConsumer.receive(4000) != null) {
+ count++;
+ }
+ assertEquals("dlq returned all expired", count, totalExpiredCount);
}
- public void initCombosForTestRecoverExpiredMessages() {
+ private MessageConsumer createDlqConsumer(Connection connection) throws
Exception {
+ return connection.createSession(false,
Session.AUTO_ACKNOWLEDGE).createConsumer(dlqDestination);
+ }
+
+ public void initCombosForTestRecoverExpiredMessages() {
addCombinationValues("useVMCursor", new Object[] {Boolean.TRUE,
Boolean.FALSE});
}
@@ -266,9 +293,9 @@
String domain = "org.apache.activemq";
ObjectName name;
if (destination.isQueue()) {
- name = new ObjectName(domain +
":BrokerName=localhost,Type=Queue,Destination=test");
+ name = new ObjectName(domain +
":BrokerName=localhost,Type=Queue,Destination=" +
destination.getPhysicalName());
} else {
- name = new ObjectName(domain +
":BrokerName=localhost,Type=Topic,Destination=test");
+ name = new ObjectName(domain +
":BrokerName=localhost,Type=Topic,Destination=" +
destination.getPhysicalName());
}
return
(DestinationViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, DestinationViewMBean.class, true);
}