Author: rgodfrey
Date: Thu Jun 16 13:26:38 2016
New Revision: 1748723
URL: http://svn.apache.org/viewvc?rev=1748723&view=rev
Log:
QPID-7305 : Add optional selector argument to queue operations copy/move/delete
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ClearQueueTransaction.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
Thu Jun 16 13:26:38 2016
@@ -272,15 +272,20 @@ public interface Queue<X extends Queue<X
long getOldestMessageAge();
@ManagedOperation
- List<Long> moveMessages(@Param(name = "destination") Queue<?> destination,
@Param(name = "messageIds") List<Long> messageIds);
+ List<Long> moveMessages(@Param(name = "destination", description = "The
queue to which the messages should be moved") Queue<?> destination,
+ @Param(name = "messageIds", description = "If
provided, only messages in the queue whose (internal) message-id is supplied
will be considered for moving") List<Long> messageIds,
+ @Param(name = "selector", description = "A (JMS)
selector - if provided, only messages which match the selector will be
considered for moving") String selector);
@ManagedOperation
- List<Long> copyMessages(@Param(name = "destination") Queue<?> destination,
@Param(name = "messageIds") List<Long> messageIds);
+ List<Long> copyMessages(@Param(name = "destination", description = "The
queue to which the messages should be copied") Queue<?> destination,
+ @Param(name = "messageIds", description = "If
provided, only messages in the queue whose (internal) message-id is supplied
will be considered for copying") List<Long> messageIds,
+ @Param(name = "selector", description = "A (JMS)
selector - if provided, only messages which match the selector will be
considered for copying") String selector);
@ManagedOperation
- List<Long> deleteMessages(@Param(name = "messageIds") List<Long>
messageIds);
+ List<Long> deleteMessages(@Param(name = "messageIds", description = "If
provided, only messages in the queue whose (internal) message-id is supplied
will be considered for deletion") List<Long> messageIds,
+ @Param(name = "selector", description = "A (JMS)
selector - if provided, only messages which match the selector will be
considered for deletion") String selector);
@ManagedOperation
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
Thu Jun 16 13:26:38 2016
@@ -234,6 +234,8 @@ public interface VirtualHost<X extends V
interface TransactionalOperation
{
void withinTransaction(Transaction txn);
+
+ List<Long> getModifiedMessageIds();
}
void executeTransaction(TransactionalOperation op);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Thu Jun 16 13:26:38 2016
@@ -52,7 +52,11 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.model.*;
@@ -3412,48 +3416,55 @@ public abstract class AbstractQueue<X ex
}
@Override
- public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds)
+ public List<Long> moveMessages(Queue<?> destination, List<Long>
messageIds, final String selector)
{
// FIXME: added temporary authorization check until we introduce
management layer
// and review current ACL rules to have common rules for all
management interfaces
authorizeMethod("moveMessages");
- List<Long> copy = new ArrayList<>(messageIds);
- _virtualHost.executeTransaction(new MoveMessagesTransaction(this,
copy, destination));
- List<Long> returnVal = new ArrayList<>(messageIds);
- returnVal.removeAll(copy);
- return returnVal;
+ MoveMessagesTransaction transaction = new
MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+ _virtualHost.executeTransaction(transaction);
+ return transaction.getModifiedMessageIds();
}
+ private JMSSelectorFilter parseSelector(final String selector)
+ {
+ try
+ {
+ return selector == null ? null : new JMSSelectorFilter(selector);
+ }
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
+ {
+ throw new IllegalArgumentException("Cannot parse JMS selector \""
+ selector + "\"", e);
+ }
+ }
+
@Override
- public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds)
+ public List<Long> copyMessages(Queue<?> destination, List<Long>
messageIds, final String selector)
{
// FIXME: added temporary authorization check until we introduce
management layer
// and review current ACL rules to have common rules for all
management interfaces
authorizeMethod("copyMessages");
- List<Long> copy = new ArrayList<>(messageIds);
- _virtualHost.executeTransaction(new CopyMessagesTransaction(this,
copy, destination));
- List<Long> returnVal = new ArrayList<>(messageIds);
- returnVal.removeAll(copy);
- return returnVal;
+ CopyMessagesTransaction transaction = new
CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+ _virtualHost.executeTransaction(transaction);
+ return transaction.getModifiedMessageIds();
}
@Override
- public List<Long> deleteMessages(final List<Long> messageIds)
+ public List<Long> deleteMessages(final List<Long> messageIds, final String
selector)
{
// FIXME: added temporary authorization check until we introduce
management layer
// and review current ACL rules to have common rules for all
management interfaces
authorizeMethod("deleteMessages");
- List<Long> copy = new ArrayList<>(messageIds);
- _virtualHost.executeTransaction(new DeleteMessagesTransaction(this,
copy));
- List<Long> returnVal = new ArrayList<>(messageIds);
- returnVal.removeAll(copy);
- return returnVal;
+ DeleteMessagesTransaction transaction = new
DeleteMessagesTransaction(this, messageIds, parseSelector(selector));
+ _virtualHost.executeTransaction(transaction);
+
+ return transaction.getModifiedMessageIds();
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
Thu Jun 16 13:26:38 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.List;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
@@ -31,9 +32,12 @@ public class CopyMessagesTransaction ext
{
private final Queue _destinationQueue;
- public CopyMessagesTransaction(Queue sourceQueue, List<Long> messageIds,
Queue destinationQueue)
+ public CopyMessagesTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ Queue destinationQueue,
+ final MessageFilter filter)
{
- super(sourceQueue, messageIds);
+ super(sourceQueue, messageIds, filter);
_destinationQueue = destinationQueue;
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
Thu Jun 16 13:26:38 2016
@@ -22,14 +22,15 @@ package org.apache.qpid.server.queue;
import java.util.List;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
public class DeleteMessagesTransaction extends QueueEntryTransaction
{
- public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds)
+ public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds,
final MessageFilter filter)
{
- super(sourceQueue, messageIds);
+ super(sourceQueue, messageIds, filter);
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
Thu Jun 16 13:26:38 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.List;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
@@ -31,9 +32,12 @@ public class MoveMessagesTransaction ext
{
private final Queue _destinationQueue;
- public MoveMessagesTransaction(Queue sourceQueue, List<Long> messageIds,
Queue destinationQueue)
+ public MoveMessagesTransaction(Queue sourceQueue,
+ List<Long> messageIds,
+ Queue destinationQueue,
+ final MessageFilter filter)
{
- super(sourceQueue, messageIds);
+ super(sourceQueue, messageIds, filter);
_destinationQueue = destinationQueue;
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
Thu Jun 16 13:26:38 2016
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
@@ -29,16 +31,19 @@ import org.apache.qpid.server.model.Virt
abstract class QueueEntryTransaction implements
VirtualHost.TransactionalOperation
{
private final Queue _sourceQueue;
- private final List _messageIds;
+ private final List<Long> _messageIds;
+ private final MessageFilter _filter;
+ private final List<Long> _modifiedMessageIds = new ArrayList<>();
- protected QueueEntryTransaction(Queue sourceQueue, List messageIds)
+ QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final
MessageFilter filter)
{
_sourceQueue = sourceQueue;
- _messageIds = messageIds;
+ _messageIds = messageIds == null ? null : new ArrayList<>(messageIds);
+ _filter = filter;
}
@Override
- public void withinTransaction(final VirtualHost.Transaction txn)
+ public final void withinTransaction(final VirtualHost.Transaction txn)
{
_sourceQueue.visit(new QueueEntryVisitor()
@@ -50,17 +55,24 @@ abstract class QueueEntryTransaction imp
if(message != null)
{
final long messageId = message.getMessageNumber();
- if (_messageIds.remove(messageId) || (messageId <= (long)
Integer.MAX_VALUE
- &&
_messageIds.remove(Integer.valueOf((int)messageId))))
+ if ((_messageIds == null || _messageIds.remove(messageId))
+ && (_filter == null ||
_filter.matches(entry.asFilterable())))
{
updateEntry(entry, txn);
+ _modifiedMessageIds.add(messageId);
}
}
- return _messageIds.isEmpty();
+ return _messageIds != null && _messageIds.isEmpty();
}
});
}
protected abstract void updateEntry(QueueEntry entry,
VirtualHost.Transaction txn);
+
+ @Override
+ public final List<Long> getModifiedMessageIds()
+ {
+ return _modifiedMessageIds;
+ }
}
Modified:
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
(original)
+++
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
Thu Jun 16 13:26:38 2016
@@ -42,6 +42,7 @@ import javax.servlet.http.HttpServletReq
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;
+import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -897,8 +898,14 @@ public class RestServlet extends Abstrac
}
else
{
-
- providedObject = mapper.readValue(request.getInputStream(),
LinkedHashMap.class);
+ try
+ {
+ providedObject = mapper.readValue(request.getInputStream(),
LinkedHashMap.class);
+ }
+ catch (JsonParseException e)
+ {
+ throw new IllegalArgumentException("Cannot parse the operation
body as json",e);
+ }
}
return providedObject;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]