Author: rgodfrey
Date: Thu Jun 16 13:26:38 2016
New Revision: 1748723

URL: http://svn.apache.org/viewvc?rev=1748723&view=rev
Log:
QPID-7305 : Add optional selector argument to queue operations copy/move/delete

Removed:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ClearQueueTransaction.java
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
    
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
 Thu Jun 16 13:26:38 2016
@@ -272,15 +272,20 @@ public interface Queue<X extends Queue<X
     long getOldestMessageAge();
 
     @ManagedOperation
-    List<Long> moveMessages(@Param(name = "destination") Queue<?> destination, 
@Param(name = "messageIds") List<Long> messageIds);
+    List<Long> moveMessages(@Param(name = "destination", description = "The 
queue to which the messages should be moved") Queue<?> destination,
+                            @Param(name = "messageIds", description = "If 
provided, only messages in the queue whose (internal) message-id is supplied 
will be considered for moving") List<Long> messageIds,
+                            @Param(name = "selector", description = "A (JMS) 
selector - if provided, only messages which match the selector will be 
considered for moving") String selector);
 
 
     @ManagedOperation
-    List<Long> copyMessages(@Param(name = "destination") Queue<?> destination, 
@Param(name = "messageIds") List<Long> messageIds);
+    List<Long> copyMessages(@Param(name = "destination", description = "The 
queue to which the messages should be copied") Queue<?> destination,
+                            @Param(name = "messageIds", description = "If 
provided, only messages in the queue whose (internal) message-id is supplied 
will be considered for copying") List<Long> messageIds,
+                            @Param(name = "selector", description = "A (JMS) 
selector - if provided, only messages which match the selector will be 
considered for copying")  String selector);
 
 
     @ManagedOperation
-    List<Long> deleteMessages(@Param(name = "messageIds") List<Long> 
messageIds);
+    List<Long> deleteMessages(@Param(name = "messageIds", description = "If 
provided, only messages in the queue whose (internal) message-id is supplied 
will be considered for deletion") List<Long> messageIds,
+                              @Param(name = "selector", description = "A (JMS) 
selector - if provided, only messages which match the selector will be 
considered for deletion") String selector);
 
 
     @ManagedOperation

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
 Thu Jun 16 13:26:38 2016
@@ -234,6 +234,8 @@ public interface VirtualHost<X extends V
     interface TransactionalOperation
     {
         void withinTransaction(Transaction txn);
+
+        List<Long> getModifiedMessageIds();
     }
 
     void executeTransaction(TransactionalOperation op);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Thu Jun 16 13:26:38 2016
@@ -52,7 +52,11 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.filter.SelectorParsingException;
+import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.filter.selector.TokenMgrError;
 import org.apache.qpid.server.configuration.updater.Task;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
 import org.apache.qpid.server.message.MessageInfo;
 import org.apache.qpid.server.message.MessageInfoImpl;
 import org.apache.qpid.server.model.*;
@@ -3412,48 +3416,55 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public List<Long> moveMessages(Queue<?> destination, List<Long> messageIds)
+    public List<Long> moveMessages(Queue<?> destination, List<Long> 
messageIds, final String selector)
     {
         // FIXME: added temporary authorization check until we introduce 
management layer
         // and review current ACL rules to have common rules for all 
management interfaces
         authorizeMethod("moveMessages");
 
-        List<Long> copy = new ArrayList<>(messageIds);
-        _virtualHost.executeTransaction(new MoveMessagesTransaction(this, 
copy, destination));
-        List<Long> returnVal = new ArrayList<>(messageIds);
-        returnVal.removeAll(copy);
-        return returnVal;
+        MoveMessagesTransaction transaction = new 
MoveMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+        _virtualHost.executeTransaction(transaction);
+        return transaction.getModifiedMessageIds();
 
     }
 
+    private JMSSelectorFilter parseSelector(final String selector)
+    {
+        try
+        {
+            return selector == null ? null : new JMSSelectorFilter(selector);
+        }
+        catch (ParseException | SelectorParsingException | TokenMgrError e)
+        {
+            throw new IllegalArgumentException("Cannot parse JMS selector \"" 
+ selector + "\"", e);
+        }
+    }
+
     @Override
-    public List<Long> copyMessages(Queue<?> destination, List<Long> messageIds)
+    public List<Long> copyMessages(Queue<?> destination, List<Long> 
messageIds, final String selector)
     {
         // FIXME: added temporary authorization check until we introduce 
management layer
         // and review current ACL rules to have common rules for all 
management interfaces
         authorizeMethod("copyMessages");
 
-        List<Long> copy = new ArrayList<>(messageIds);
-        _virtualHost.executeTransaction(new CopyMessagesTransaction(this, 
copy, destination));
-        List<Long> returnVal = new ArrayList<>(messageIds);
-        returnVal.removeAll(copy);
-        return returnVal;
+        CopyMessagesTransaction transaction = new 
CopyMessagesTransaction(this, messageIds, destination, parseSelector(selector));
+        _virtualHost.executeTransaction(transaction);
+        return transaction.getModifiedMessageIds();
 
     }
 
     @Override
-    public List<Long> deleteMessages(final List<Long> messageIds)
+    public List<Long> deleteMessages(final List<Long> messageIds, final String 
selector)
     {
 
         // FIXME: added temporary authorization check until we introduce 
management layer
         // and review current ACL rules to have common rules for all 
management interfaces
         authorizeMethod("deleteMessages");
 
-        List<Long> copy = new ArrayList<>(messageIds);
-        _virtualHost.executeTransaction(new DeleteMessagesTransaction(this, 
copy));
-        List<Long> returnVal = new ArrayList<>(messageIds);
-        returnVal.removeAll(copy);
-        return returnVal;
+        DeleteMessagesTransaction transaction = new 
DeleteMessagesTransaction(this, messageIds, parseSelector(selector));
+        _virtualHost.executeTransaction(transaction);
+
+        return transaction.getModifiedMessageIds();
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/CopyMessagesTransaction.java
 Thu Jun 16 13:26:38 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
 
 import java.util.List;
 
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
@@ -31,9 +32,12 @@ public class CopyMessagesTransaction ext
 {
     private final Queue _destinationQueue;
 
-    public CopyMessagesTransaction(Queue sourceQueue, List<Long> messageIds, 
Queue destinationQueue)
+    public CopyMessagesTransaction(Queue sourceQueue,
+                                   List<Long> messageIds,
+                                   Queue destinationQueue,
+                                   final MessageFilter filter)
     {
-        super(sourceQueue, messageIds);
+        super(sourceQueue, messageIds, filter);
         _destinationQueue = destinationQueue;
     }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/DeleteMessagesTransaction.java
 Thu Jun 16 13:26:38 2016
@@ -22,14 +22,15 @@ package org.apache.qpid.server.queue;
 
 import java.util.List;
 
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
 
 public class DeleteMessagesTransaction extends QueueEntryTransaction
 {
-    public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds)
+    public DeleteMessagesTransaction(Queue sourceQueue, List<Long> messageIds, 
final MessageFilter filter)
     {
-        super(sourceQueue, messageIds);
+        super(sourceQueue, messageIds, filter);
     }
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/MoveMessagesTransaction.java
 Thu Jun 16 13:26:38 2016
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
 
 import java.util.List;
 
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
@@ -31,9 +32,12 @@ public class MoveMessagesTransaction ext
 {
     private final Queue _destinationQueue;
 
-    public MoveMessagesTransaction(Queue sourceQueue, List<Long> messageIds, 
Queue destinationQueue)
+    public MoveMessagesTransaction(Queue sourceQueue,
+                                   List<Long> messageIds,
+                                   Queue destinationQueue,
+                                   final MessageFilter filter)
     {
-        super(sourceQueue, messageIds);
+        super(sourceQueue, messageIds, filter);
         _destinationQueue = destinationQueue;
     }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryTransaction.java
 Thu Jun 16 13:26:38 2016
@@ -20,8 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.qpid.server.filter.MessageFilter;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.VirtualHost;
@@ -29,16 +31,19 @@ import org.apache.qpid.server.model.Virt
 abstract class QueueEntryTransaction implements 
VirtualHost.TransactionalOperation
 {
     private final Queue _sourceQueue;
-    private final List _messageIds;
+    private final List<Long> _messageIds;
+    private final MessageFilter _filter;
+    private final List<Long> _modifiedMessageIds = new ArrayList<>();
 
-    protected QueueEntryTransaction(Queue sourceQueue, List messageIds)
+    QueueEntryTransaction(Queue sourceQueue, List<Long> messageIds, final 
MessageFilter filter)
     {
         _sourceQueue = sourceQueue;
-        _messageIds = messageIds;
+        _messageIds = messageIds == null ? null : new ArrayList<>(messageIds);
+        _filter = filter;
     }
 
     @Override
-    public void withinTransaction(final VirtualHost.Transaction txn)
+    public final void withinTransaction(final VirtualHost.Transaction txn)
     {
 
         _sourceQueue.visit(new QueueEntryVisitor()
@@ -50,17 +55,24 @@ abstract class QueueEntryTransaction imp
                 if(message != null)
                 {
                     final long messageId = message.getMessageNumber();
-                    if (_messageIds.remove(messageId) || (messageId <= (long) 
Integer.MAX_VALUE
-                                                          && 
_messageIds.remove(Integer.valueOf((int)messageId))))
+                    if ((_messageIds == null || _messageIds.remove(messageId))
+                        && (_filter == null || 
_filter.matches(entry.asFilterable())))
                     {
                         updateEntry(entry, txn);
+                        _modifiedMessageIds.add(messageId);
                     }
                 }
-                return _messageIds.isEmpty();
+                return _messageIds != null && _messageIds.isEmpty();
             }
         });
     }
 
 
     protected abstract void updateEntry(QueueEntry entry, 
VirtualHost.Transaction txn);
+
+    @Override
+    public final List<Long> getModifiedMessageIds()
+    {
+        return _modifiedMessageIds;
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java?rev=1748723&r1=1748722&r2=1748723&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
 Thu Jun 16 13:26:38 2016
@@ -42,6 +42,7 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.Part;
 
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -897,8 +898,14 @@ public class RestServlet extends Abstrac
         }
         else
         {
-
-            providedObject = mapper.readValue(request.getInputStream(), 
LinkedHashMap.class);
+            try
+            {
+                providedObject = mapper.readValue(request.getInputStream(), 
LinkedHashMap.class);
+            }
+            catch (JsonParseException e)
+            {
+                throw new IllegalArgumentException("Cannot parse the operation 
body as json",e);
+            }
         }
         return providedObject;
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to