This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-8086-2
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git

commit 8b937d552b633cd6d0bfc5fac0b4951d67947979
Author: tmaret <tma...@adobe.com>
AuthorDate: Fri Nov 9 14:26:56 2018 +0100

    SLING-8086 - Extend distribution queue SPI with the ability to clear and 
batch remove items
---
 .../queue/impl/DistributionQueueWrapper.java       | 19 ++++++++++++++-
 .../jobhandling/JobHandlingDistributionQueue.java  | 16 ++++++++++++-
 .../queue/impl/resource/ResourceQueue.java         | 18 +++++++++++++-
 .../queue/impl/simple/SimpleDistributionQueue.java | 17 ++++++++++++-
 .../spi/{package-info.java => Clearable.java}      | 28 +++++++++++++++++++---
 .../sling/distribution/queue/spi/package-info.java |  2 +-
 .../servlet/DistributionAgentQueueServlet.java     | 14 ++++++++++-
 7 files changed, 105 insertions(+), 9 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
 
b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index dc179d3..cc9fc45 100644
--- 
a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ 
b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -18,13 +18,17 @@
  */
 package org.apache.sling.distribution.queue.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.jetbrains.annotations.NotNull;
 
-public abstract class DistributionQueueWrapper implements DistributionQueue {
+public abstract class DistributionQueueWrapper implements DistributionQueue, 
Clearable {
     final DistributionQueue wrappedQueue;
 
     DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -69,4 +73,17 @@ public abstract class DistributionQueueWrapper implements 
DistributionQueue {
     public DistributionQueueStatus getStatus() {
         return wrappedQueue.getStatus();
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new 
ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
 
b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index d19424f..39764d3 100644
--- 
a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
 /**
  * a {@link DistributionQueue} based on Sling Job Handling facilities
  */
-public class JobHandlingDistributionQueue implements DistributionQueue {
+public class JobHandlingDistributionQueue implements DistributionQueue, 
Clearable {
 
     public final static String DISTRIBUTION_QUEUE_TOPIC = 
"org/apache/sling/distribution/queue";
 
@@ -202,4 +204,16 @@ public class JobHandlingDistributionQueue implements 
DistributionQueue {
         return type;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new 
ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
 
b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 28162c3..1f6a94e 100644
--- 
a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -29,6 +29,7 @@ import 
org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueState;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.util.impl.DistributionUtils;
 import org.jetbrains.annotations.NotNull;
@@ -36,10 +37,11 @@ import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 
 
-public class ResourceQueue implements DistributionQueue {
+public class ResourceQueue implements DistributionQueue, Clearable {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
 
@@ -228,4 +230,18 @@ public class ResourceQueue implements DistributionQueue {
         DistributionQueueItem item = entry.getItem();
         log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { 
queueName, scope, entryId, item.getPackageId() });
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new 
ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
 
b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 26e05d4..27eec29 100644
--- 
a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -48,7 +50,7 @@ import org.slf4j.LoggerFactory;
  * Note: potentially the Queue could contain the ordered package ids, with a 
sidecar map id->item;
  * that way removal could be faster.
  */
-public class SimpleDistributionQueue implements DistributionQueue {
+public class SimpleDistributionQueue implements DistributionQueue, Clearable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -168,4 +170,17 @@ public class SimpleDistributionQueue implements 
DistributionQueue {
                 '}';
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new 
ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java 
b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
similarity index 50%
copy from 
src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
copy to src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
index 96aa8bd..985811b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java
@@ -16,9 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-@Version("0.0.1")
 package org.apache.sling.distribution.queue.spi;
 
-import aQute.bnd.annotation.Version;
+import aQute.bnd.annotation.ConsumerType;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Trait to be added to a {@link DistributionQueue} distribution
+ * queue that supports clearing all or a range of entries via the
+ * {@link Clearable#clear} clearing methods.
+ *
+ * @since 0.1.0
+ */
+@ConsumerType
+public interface Clearable {
+
+    /**
+     * Clear a range of entries from the queue. The range starts from
+     * the head entry, includes the specified #limit number of entries.
+     *
+     * @param limit The maximum number of entries to remove. All entries
+     *              are removed when the limit is smaller than zero.
+     * @return an iterable over the removed entries
+     */
+    @NotNull
+    Iterable<DistributionQueueEntry> clear(int limit);
 
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java 
b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
index 96aa8bd..3a80790 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-@Version("0.0.1")
+@Version("0.1.0")
 package org.apache.sling.distribution.queue.spi;
 
 import aQute.bnd.annotation.Version;
diff --git 
a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
 
b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index dffc64c..966657d 100644
--- 
a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ 
b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -32,6 +32,7 @@ import 
org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
+import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -79,7 +80,11 @@ public class DistributionAgentQueueServlet extends 
SlingAllMethodsServlet {
                 } catch (NumberFormatException ex) {
                     log.warn("limit param malformed : "+limitParam, ex);
                 }
-                deleteItems(resourceResolver, queue, limit);
+                if (queue instanceof Clearable) {
+                    clearItems(resourceResolver, queue, limit);
+                } else {
+                    deleteItems(resourceResolver, queue, limit);
+                }
             }
         } else if ("copy".equals(operation)) {
             String from = request.getParameter("from");
@@ -138,6 +143,13 @@ public class DistributionAgentQueueServlet extends 
SlingAllMethodsServlet {
         DistributionPackageUtils.releaseOrDelete(distributionPackage, 
queue.getName());
     }
 
+    private void clearItems(ResourceResolver resourceResolver, 
DistributionQueue queue, int limit) {
+        for (DistributionQueueEntry removed : ((Clearable)queue).clear(limit)) 
{
+            DistributionPackage distributionPackage = 
getPackage(resourceResolver, removed.getItem());
+            DistributionPackageUtils.releaseOrDelete(distributionPackage, 
queue.getName());
+        }
+    }
+
     private DistributionPackage getPackage(ResourceResolver resourceResolver, 
DistributionQueueItem item) {
         DistributionPackageInfo info = 
DistributionPackageUtils.fromQueueItem(item);
         String type = info.getType();

Reply via email to