Author: jstrachan
Date: Fri Aug 25 11:41:45 2006
New Revision: 436899
URL: http://svn.apache.org/viewvc?rev=436899&view=rev
Log:
fix for AMQ-896 and AMQ-837. Also tidied up the Queue / QueueView /
QueueViewMBean code a little to make it easier to work with queues via Java /
JMX allowing messages to be copied, moved and removed via a selector or
MessageReferenceFilter
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
(with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
Fri Aug 25 11:41:45 2006
@@ -20,6 +20,7 @@
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
@@ -39,16 +40,56 @@
return OpenTypeSupport.convert(rc);
}
- public boolean removeMessage(String messageId){
- return ((Queue) destination).removeMessage(messageId);
- }
-
public void purge(){
((Queue) destination).purge();
}
+ public boolean removeMessage(String messageId) throws Exception{
+ return ((Queue) destination).removeMessage(messageId);
+ }
+
+ public int removeMatchingMessages(String selector) throws Exception {
+ return ((Queue) destination).removeMatchingMessages(selector);
+ }
+
+ public int removeMatchingMessages(String selector, int maximumMessages)
throws Exception {
+ return ((Queue) destination).removeMatchingMessages(selector,
maximumMessages);
+ }
+
public boolean copyMessageTo(String messageId, String destinationName)
throws Exception {
- return ((Queue)
destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()),
messageId, ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE));
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).copyMessageTo(context, messageId,
toDestination);
+ }
+
+ public int copyMatchingMessagesTo(String selector, String destinationName)
throws Exception {
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).copyMatchingMessagesTo(context, selector,
toDestination);
+ }
+
+ public int copyMatchingMessagesTo(String selector, String destinationName,
int maximumMessages) throws Exception {
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).copyMatchingMessagesTo(context, selector,
toDestination, maximumMessages);
+ }
+
+ public boolean moveMessageTo(String messageId, String destinationName)
throws Exception {
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).moveMessageTo(context, messageId,
toDestination);
+ }
+
+ public int moveMatchingMessagesTo(String selector, String destinationName)
throws Exception {
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).moveMatchingMessagesTo(context, selector,
toDestination);
}
+ public int moveMatchingMessagesTo(String selector, String destinationName,
int maximumMessages) throws Exception {
+ ConnectionContext context =
BrokerView.getConnectionContext(broker.getContextBroker());
+ ActiveMQDestination toDestination =
ActiveMQDestination.createDestination(destinationName,
ActiveMQDestination.QUEUE_TYPE);
+ return ((Queue) destination).moveMatchingMessagesTo(context, selector,
toDestination, maximumMessages);
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
Fri Aug 25 11:41:45 2006
@@ -24,37 +24,92 @@
public interface QueueViewMBean extends DestinationViewMBean {
/**
- * Retrieve a message from the destination's queue.
- *
- * @param messageId the message id of the message to retreive
- * @return A CompositeData object which is a JMX version of the messages
- * @throws OpenDataException
- */
+ * Retrieve a message from the destination's queue.
+ *
+ * @param messageId
+ * the message id of the message to retrieve
+ * @return A CompositeData object which is a JMX version of the messages
+ * @throws OpenDataException
+ */
public CompositeData getMessage(String messageId) throws OpenDataException;
/**
- * Removes a message from the queue. If the message has allready been
dispatched
- * to another consumer, the message cannot be delted and this method will
return
- * false.
+ * Removes a message from the queue. If the message has already been
+ * dispatched to another consumer, the message cannot be deleted and this
+ * method will return false.
*
- * @param messageId
- * @return true if the message was found and could be succesfully deleted.
+ * @param messageId
+ * @return true if the message was found and could be successfully deleted.
+ * @throws Exception
*/
- public boolean removeMessage(String messageId);
-
+ public boolean removeMessage(String messageId) throws Exception;
+
/**
- * Emptys out all the messages in the queue.
+ * Removes the messages matching the given selector
+ *
+ * @return the number of messages removed
+ */
+ public int removeMatchingMessages(String selector) throws Exception;
+
+ /**
+ * Removes the messages matching the given selector up to the maximum
number of matched messages
+ *
+ * @return the number of messages removed
+ */
+ public int removeMatchingMessages(String selector, int maximumMessages)
throws Exception;
+
+
+ /**
+ * Removes all of the messages in the queue.
*/
public void purge();
/**
- * Copys a given message to another destination.
+ * Copies a given message to another destination.
*
* @param messageId
* @param destinationName
- * @return true if the message was found and was successfuly copied to the
other destination.
+ * @return true if the message was found and was successfully copied to the
+ * other destination.
* @throws Exception
*/
public boolean copyMessageTo(String messageId, String destinationName)
throws Exception;
+
+ /**
+ * Copies the messages matching the given selector
+ *
+ * @return the number of messages copied
+ */
+ public int copyMatchingMessagesTo(String selector, String destinationName)
throws Exception;
+
+ /**
+ * Copies the messages matching the given selector up to the maximum
number of matched messages
+ *
+ * @return the number of messages copied
+ */
+ public int copyMatchingMessagesTo(String selector, String destinationName,
int maximumMessages) throws Exception;
+
+ /**
+ * Moves the message to another destination.
+ *
+ * @param messageId
+ * @param destinationName
+ * @return true if the message was found and was successfully copied to the
+ * other destination.
+ * @throws Exception
+ */
+ public boolean moveMessageTo(String messageId, String destinationName)
throws Exception;
+
+ /**
+ * Moves the messages matching the given selector
+ *
+ * @return the number of messages removed
+ */
+ public int moveMatchingMessagesTo(String selector, String destinationName)
throws Exception;
+
+ /**
+ * Moves the messages matching the given selector up to the maximum number
of matched messages
+ */
+ public int moveMatchingMessagesTo(String selector, String destinationName,
int maximumMessages) throws Exception;
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java?rev=436899&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
Fri Aug 25 11:41:45 2006
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import org.apache.activemq.broker.ConnectionContext;
+
+import javax.jms.JMSException;
+
+/**
+ * Represents a filter on message references
+ *
+ * @version $Revision$
+ */
+public interface MessageReferenceFilter {
+
+ public boolean evaluate(ConnectionContext context, MessageReference
messageReference) throws JMSException;
+}
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/MessageReferenceFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Aug 25 11:41:45 2006
@@ -33,8 +33,10 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -44,6 +46,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -77,20 +82,21 @@
protected int highestSubscriptionPriority;
private DeadLetterStrategy deadLetterStrategy = new
SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
-
- public Queue(ActiveMQDestination destination, final UsageManager
memoryManager, MessageStore store,
- DestinationStatistics parentStats, TaskRunnerFactory taskFactory)
throws Exception {
+
+ public Queue(ActiveMQDestination destination, final UsageManager
memoryManager, MessageStore store, DestinationStatistics parentStats,
+ TaskRunnerFactory taskFactory) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store;
- // Let the store know what usage manager we are using so that he can
flush messages to disk
+ // Let the store know what usage manager we are using so that he can
+ // flush messages to disk
// when usage gets high.
- if( store!=null ) {
+ if (store != null) {
store.setUsageManager(usageManager);
}
-
+
destinationStatistics.setParent(parentStats);
this.log = LogFactory.getLog(getClass().getName() + "." +
destination.getPhysicalName());
@@ -110,8 +116,8 @@
public void recoverMessageReference(String messageReference)
throws Exception {
throw new RuntimeException("Should not be called.");
}
-
- public void finished(){
+
+ public void finished() {
}
});
}
@@ -164,13 +170,15 @@
if (sub.matches(node, msgContext)) {
sub.add(node);
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
log.warn("Could not load message: " + e, e);
}
}
}
- } finally {
+ }
+ finally {
msgContext.clear();
dispatchValve.turnOn();
}
@@ -225,8 +233,9 @@
}
}
}
-
- // now lets dispatch from the copy of the collection to
avoid deadlocks
+
+ // now lets dispatch from the copy of the collection to
+ // avoid deadlocks
for (Iterator iter = messagesToDispatch.iterator();
iter.hasNext();) {
IndirectMessageReference node =
(IndirectMessageReference) iter.next();
node.incrementRedeliveryCounter();
@@ -239,7 +248,8 @@
msgContext.clear();
}
}
- } finally {
+ }
+ finally {
dispatchValve.turnOn();
}
@@ -250,9 +260,10 @@
if (context.isProducerFlowControl()) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager
memory limit reached");
- } else {
- usageManager.waitForSpace();
- }
+ }
+ else {
+ usageManager.waitForSpace();
+ }
}
message.setRegionDestination(this);
@@ -269,10 +280,12 @@
dispatch(context, node, message);
}
});
- } else {
+ }
+ else {
dispatch(context, node, message);
}
- } finally {
+ }
+ finally {
node.decrementReferenceCount();
}
}
@@ -315,9 +328,10 @@
public void acknowledge(ConnectionContext context, Subscription sub,
MessageAck ack, MessageReference node) throws IOException {
if (store != null && node.isPersistent()) {
- // the original ack may be a ranged ack, but we are trying to
delete a specific
+ // the original ack may be a ranged ack, but we are trying to
delete
+ // a specific
// message store here so we need to convert to a non ranged ack.
- if( ack.getMessageCount() > 0 ) {
+ if (ack.getMessageCount() > 0) {
// Dup the ack
MessageAck a = new MessageAck();
ack.copy(a);
@@ -344,9 +358,8 @@
synchronized (messages) {
size = messages.size();
}
- return "Queue: destination=" + destination.getPhysicalName() + ",
subscriptions=" + consumers.size()
- + ", memory=" + usageManager.getPercentUsage() + "%, size=" +
size + ", in flight groups="
- + messageGroupOwners;
+ return "Queue: destination=" + destination.getPhysicalName() + ",
subscriptions=" + consumers.size() + ", memory=" +
usageManager.getPercentUsage()
+ + "%, size=" + size + ", in flight groups=" +
messageGroupOwners;
}
public void start() throws Exception {
@@ -364,7 +377,7 @@
public String getDestination() {
return destination.getPhysicalName();
}
-
+
public UsageManager getUsageManager() {
return usageManager;
}
@@ -443,8 +456,7 @@
public void setMemoryLimit(long limit) {
getUsageManager().setLimit(limit);
}
-
-
+
// Implementation methods
//
-------------------------------------------------------------------------
private MessageReference createMessageReference(Message message) {
@@ -472,7 +484,8 @@
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(context, node, msgContext, consumers);
- } finally {
+ }
+ finally {
msgContext.clear();
dispatchValve.decrement();
}
@@ -508,10 +521,12 @@
if (m != null) {
l.add(m);
}
- } finally {
+ }
+ finally {
r.decrementReferenceCount();
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
}
}
}
@@ -519,33 +534,6 @@
return (Message[]) l.toArray(new Message[l.size()]);
}
- public boolean removeMessage(String messageId) {
- synchronized (messages) {
- ConnectionContext c = new ConnectionContext();
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- try {
- IndirectMessageReference r = (IndirectMessageReference)
iter.next();
- if (messageId.equals(r.getMessageId().toString())) {
-
- // We should only delete messages that can be locked.
- if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) {
- MessageAck ack = new MessageAck();
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
- ack.setDestination(destination);
- ack.setMessageID(r.getMessageId());
- acknowledge(c, null, ack, r);
- r.drop();
- dropEvent();
- return true;
- }
- }
- } catch (IOException e) {
- }
- }
- }
- return false;
- }
-
public Message getMessage(String messageId) {
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
@@ -558,12 +546,14 @@
if (m != null) {
return m;
}
- } finally {
+ }
+ finally {
r.decrementReferenceCount();
}
break;
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
}
}
}
@@ -572,13 +562,13 @@
public void purge() {
synchronized (messages) {
- ConnectionContext c = new ConnectionContext();
+ ConnectionContext c = createConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
IndirectMessageReference r = (IndirectMessageReference)
iter.next();
-
+
// We should only delete messages that can be locked.
- if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) ) {
+ if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
@@ -587,7 +577,8 @@
r.drop();
dropEvent(true);
}
- } catch (IOException e) {
+ }
+ catch (IOException e) {
}
}
@@ -596,27 +587,207 @@
gc();
}
}
+
+
+ /**
+ * Removes the message matching the given messageId
+ */
+ public boolean removeMessage(String messageId) throws Exception {
+ return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
+ }
+
+ /**
+ * Removes the messages matching the given selector
+ *
+ * @return the number of messages removed
+ */
+ public int removeMatchingMessages(String selector) throws Exception {
+ return removeMatchingMessages(selector, -1);
+ }
+
+ /**
+ * Removes the messages matching the given selector up to the maximum
number of matched messages
+ *
+ * @return the number of messages removed
+ */
+ public int removeMatchingMessages(String selector, int maximumMessages)
throws Exception {
+ return removeMatchingMessages(createSelectorFilter(selector),
maximumMessages);
+ }
+
+ /**
+ * Removes the messages matching the given filter up to the maximum number
of matched messages
+ *
+ * @return the number of messages removed
+ */
+ public int removeMatchingMessages(MessageReferenceFilter filter, int
maximumMessages) throws Exception {
+ int counter = 0;
+ synchronized (messages) {
+ ConnectionContext c = createConnectionContext();
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ IndirectMessageReference r = (IndirectMessageReference)
iter.next();
+ if (filter.evaluate(c, r)) {
+ // We should only delete messages that can be locked.
+ if (lockMessage(r)) {
+ removeMessage(c, r);
+ if (++counter >= maximumMessages && maximumMessages >
0) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ return counter;
+ }
+ /**
+ * Copies the message matching the given messageId
+ */
public boolean copyMessageTo(ConnectionContext context, String messageId,
ActiveMQDestination dest) throws Exception {
+ return copyMatchingMessages(context, createMessageIdFilter(messageId),
dest, 1) > 0;
+ }
+
+ /**
+ * Copies the messages matching the given selector
+ *
+ * @return the number of messages copied
+ */
+ public int copyMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest) throws Exception {
+ return copyMatchingMessagesTo(context, selector, dest, -1);
+ }
+
+ /**
+ * Copies the messages matching the given selector up to the maximum
number of matched messages
+ *
+ * @return the number of messages copied
+ */
+ public int copyMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ return copyMatchingMessages(context, createSelectorFilter(selector),
dest, maximumMessages);
+ }
+
+ /**
+ * Copies the messages matching the given filter up to the maximum number
of matched messages
+ *
+ * @return the number of messages copied
+ */
+ public int copyMatchingMessages(ConnectionContext context,
MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)
throws Exception {
+ int counter = 0;
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
- try {
- MessageReference r = (MessageReference) iter.next();
- if (messageId.equals(r.getMessageId().toString())) {
+ MessageReference r = (MessageReference) iter.next();
+ if (filter.evaluate(context, r)) {
+ r.incrementReferenceCount();
+ try {
+ Message m = r.getMessage();
+ BrokerSupport.resend(context, m, dest);
+ if (++counter >= maximumMessages && maximumMessages >
0) {
+ break;
+ }
+ }
+ finally {
+ r.decrementReferenceCount();
+ }
+ }
+ }
+ }
+ return counter;
+ }
+
+ /**
+ * Moves the message matching the given messageId
+ */
+ public boolean moveMessageTo(ConnectionContext context, String messageId,
ActiveMQDestination dest) throws Exception {
+ return moveMatchingMessagesTo(context,
createMessageIdFilter(messageId), dest, 1) > 0;
+ }
+
+ /**
+ * Moves the messages matching the given selector
+ *
+ * @return the number of messages removed
+ */
+ public int moveMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest) throws Exception {
+ return moveMatchingMessagesTo(context, selector, dest, -1);
+ }
+
+ /**
+ * Moves the messages matching the given selector up to the maximum number
of matched messages
+ */
+ public int moveMatchingMessagesTo(ConnectionContext context, String
selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
+ return moveMatchingMessagesTo(context, createSelectorFilter(selector),
dest, maximumMessages);
+ }
+
+ /**
+ * Moves the messages matching the given filter up to the maximum number
of matched messages
+ */
+ public int moveMatchingMessagesTo(ConnectionContext context,
MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages)
throws Exception {
+ int counter = 0;
+ synchronized (messages) {
+ for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ IndirectMessageReference r = (IndirectMessageReference)
iter.next();
+ if (filter.evaluate(context, r)) {
+ // We should only move messages that can be locked.
+ if (lockMessage(r)) {
r.incrementReferenceCount();
try {
Message m = r.getMessage();
- BrokerSupport.resend(context, m, dest);
- } finally {
+ BrokerSupport.resend(context, m, dest);
+ removeMessage(context, r);
+ if (++counter >= maximumMessages &&
maximumMessages > 0) {
+ break;
+ }
+ }
+ finally {
r.decrementReferenceCount();
- }
- return true;
+ }
}
- } catch (IOException e) {
}
}
}
- return false;
+ return counter;
+ }
+
+ protected MessageReferenceFilter createMessageIdFilter(final String
messageId) {
+ return new MessageReferenceFilter() {
+ public boolean evaluate(ConnectionContext context,
MessageReference r) {
+ return messageId.equals(r.getMessageId().toString());
+ }
+ };
+ }
+
+ protected MessageReferenceFilter createSelectorFilter(String selector)
throws InvalidSelectorException {
+ final BooleanExpression selectorExpression = new
SelectorParser().parse(selector);
+
+ return new MessageReferenceFilter() {
+ public boolean evaluate(ConnectionContext context,
MessageReference r) throws JMSException {
+ MessageEvaluationContext messageEvaluationContext =
context.getMessageEvaluationContext();
+
+ messageEvaluationContext.setMessageReference(r);
+ if (messageEvaluationContext.getDestination() == null) {
+
messageEvaluationContext.setDestination(getActiveMQDestination());
+ }
+
+ return selectorExpression.matches(messageEvaluationContext);
+ }
+ };
+ }
+
+ protected void removeMessage(ConnectionContext c, IndirectMessageReference
r) throws IOException {
+ MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setDestination(destination);
+ ack.setMessageID(r.getMessageId());
+ acknowledge(c, null, ack, r);
+ r.drop();
+ dropEvent();
+ }
+
+ protected boolean lockMessage(IndirectMessageReference r) {
+ return r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
+ }
+
+ protected ConnectionContext createConnectionContext() {
+ ConnectionContext answer = new ConnectionContext();
+
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
+ return answer;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Fri Aug 25 11:41:45 2006
@@ -33,7 +33,6 @@
import javax.jms.JMSException;
import java.io.IOException;
-import java.util.Iterator;
public class QueueSubscription extends PrefetchSubscription implements
LockOwner {
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=436899&r1=436898&r2=436899&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Fri Aug 25 11:41:45 2006
@@ -74,6 +74,58 @@
assertQueueBrowseWorks();
assertCreateAndDestroyDurableSubscriptions();
}
+
+ public void testMoveMessagesBySelector() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain +
":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
+
+ String newDestination = "test.new.destination." + getClass() + "." +
getName();
+ queue.moveMatchingMessagesTo("counter > 2", newDestination );
+
+ queueViewMBeanName = assertRegisteredObjectName(domain +
":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
+
+ assertTrue("Should have at least one message in the queue: " +
queueViewMBeanName, queue.getQueueSize() > 0);
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("counter > 2");
+
+ assertEquals("Should have no more messages in the queue: " +
queueViewMBeanName, 0, queue.getQueueSize());
+ }
+
+ public void testCopyMessagesBySelector() throws Exception {
+ connection = connectionFactory.createConnection();
+ useConnection(connection);
+
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain +
":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
+
+ QueueViewMBean queue = (QueueViewMBean)
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
+
+ String newDestination = "test.new.destination." + getClass() + "." +
getName();
+ long queueSize = queue.getQueueSize();
+ queue.copyMatchingMessagesTo("counter > 2", newDestination);
+
+ assertEquals("Should have same number of messages in the queue: " +
queueViewMBeanName, queueSize, queueSize);
+
+ queueViewMBeanName = assertRegisteredObjectName(domain +
":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
+
+ queue = (QueueViewMBean)
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName,
QueueViewMBean.class, true);
+
+ log.info("Queue: " + queueViewMBeanName + " now has: " +
queue.getQueueSize() + " message(s)");
+
+ assertTrue("Should have at least one message in the queue: " +
queueViewMBeanName, queue.getQueueSize() > 0);
+
+ // now lets remove them by selector
+ queue.removeMatchingMessages("counter > 2");
+
+ assertEquals("Should have no more messages in the queue: " +
queueViewMBeanName, 0, queue.getQueueSize());
+ }
+
protected void assertQueueBrowseWorks() throws Exception {
Integer mbeancnt = mbeanServer.getMBeanCount();
@@ -205,6 +257,7 @@
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < messageCount; i++) {
Message message = session.createTextMessage("Message: " + i);
+ message.setIntProperty("counter", i);
producer.send(message);
}
Thread.sleep(1000);