This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-8086-2 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git
commit 8b937d552b633cd6d0bfc5fac0b4951d67947979 Author: tmaret <tma...@adobe.com> AuthorDate: Fri Nov 9 14:26:56 2018 +0100 SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items --- .../queue/impl/DistributionQueueWrapper.java | 19 ++++++++++++++- .../jobhandling/JobHandlingDistributionQueue.java | 16 ++++++++++++- .../queue/impl/resource/ResourceQueue.java | 18 +++++++++++++- .../queue/impl/simple/SimpleDistributionQueue.java | 17 ++++++++++++- .../spi/{package-info.java => Clearable.java} | 28 +++++++++++++++++++--- .../sling/distribution/queue/spi/package-info.java | 2 +- .../servlet/DistributionAgentQueueServlet.java | 14 ++++++++++- 7 files changed, 105 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java index dc179d3..cc9fc45 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java @@ -18,13 +18,17 @@ */ package org.apache.sling.distribution.queue.impl; +import java.util.ArrayList; +import java.util.List; + +import org.apache.sling.distribution.queue.spi.Clearable; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.jetbrains.annotations.NotNull; -public abstract class DistributionQueueWrapper implements DistributionQueue { +public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable { final DistributionQueue wrappedQueue; DistributionQueueWrapper(DistributionQueue wrappedQueue) { @@ -69,4 +73,17 @@ public abstract class DistributionQueueWrapper implements DistributionQueue { public DistributionQueueStatus getStatus() { return wrappedQueue.getStatus(); } + + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java index d19424f..39764d3 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; + +import org.apache.sling.distribution.queue.spi.Clearable; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory; /** * a {@link DistributionQueue} based on Sling Job Handling facilities */ -public class JobHandlingDistributionQueue implements DistributionQueue { +public class JobHandlingDistributionQueue implements DistributionQueue, Clearable { public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue"; @@ -202,4 +204,16 @@ public class JobHandlingDistributionQueue implements DistributionQueue { return type; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java index 28162c3..1f6a94e 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java @@ -29,6 +29,7 @@ import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.DistributionQueueState; import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.apache.sling.distribution.queue.DistributionQueueType; +import org.apache.sling.distribution.queue.spi.Clearable; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.util.impl.DistributionUtils; import org.jetbrains.annotations.NotNull; @@ -36,10 +37,11 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; -public class ResourceQueue implements DistributionQueue { +public class ResourceQueue implements DistributionQueue, Clearable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -228,4 +230,18 @@ public class ResourceQueue implements DistributionQueue { DistributionQueueItem item = entry.getItem(); log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() }); } + + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java index 26e05d4..27eec29 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Queue; import java.util.WeakHashMap; import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.sling.distribution.queue.spi.Clearable; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -48,7 +50,7 @@ import org.slf4j.LoggerFactory; * Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item; * that way removal could be faster. */ -public class SimpleDistributionQueue implements DistributionQueue { +public class SimpleDistributionQueue implements DistributionQueue, Clearable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -168,4 +170,17 @@ public class SimpleDistributionQueue implements DistributionQueue { '}'; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + } diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java similarity index 50% copy from src/main/java/org/apache/sling/distribution/queue/spi/package-info.java copy to src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java index 96aa8bd..985811b 100644 --- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java +++ b/src/main/java/org/apache/sling/distribution/queue/spi/Clearable.java @@ -16,9 +16,31 @@ * specific language governing permissions and limitations * under the License. */ - -@Version("0.0.1") package org.apache.sling.distribution.queue.spi; -import aQute.bnd.annotation.Version; +import aQute.bnd.annotation.ConsumerType; +import org.apache.sling.distribution.queue.DistributionQueueEntry; +import org.jetbrains.annotations.NotNull; + +/** + * Trait to be added to a {@link DistributionQueue} distribution + * queue that supports clearing all or a range of entries via the + * {@link Clearable#clear} clearing methods. + * + * @since 0.1.0 + */ +@ConsumerType +public interface Clearable { + + /** + * Clear a range of entries from the queue. The range starts from + * the head entry, includes the specified #limit number of entries. + * + * @param limit The maximum number of entries to remove. All entries + * are removed when the limit is smaller than zero. + * @return an iterable over the removed entries + */ + @NotNull + Iterable<DistributionQueueEntry> clear(int limit); +} diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java index 96aa8bd..3a80790 100644 --- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java +++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java @@ -17,7 +17,7 @@ * under the License. */ -@Version("0.0.1") +@Version("0.1.0") package org.apache.sling.distribution.queue.spi; import aQute.bnd.annotation.Version; diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java index dffc64c..966657d 100644 --- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java +++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java @@ -32,6 +32,7 @@ import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageInfo; import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils; +import org.apache.sling.distribution.queue.spi.Clearable; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -79,7 +80,11 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { } catch (NumberFormatException ex) { log.warn("limit param malformed : "+limitParam, ex); } - deleteItems(resourceResolver, queue, limit); + if (queue instanceof Clearable) { + clearItems(resourceResolver, queue, limit); + } else { + deleteItems(resourceResolver, queue, limit); + } } } else if ("copy".equals(operation)) { String from = request.getParameter("from"); @@ -138,6 +143,13 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName()); } + private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) { + for (DistributionQueueEntry removed : ((Clearable)queue).clear(limit)) { + DistributionPackage distributionPackage = getPackage(resourceResolver, removed.getItem()); + DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName()); + } + } + private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) { DistributionPackageInfo info = DistributionPackageUtils.fromQueueItem(item); String type = info.getType();