This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-8086-4 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git
commit 070f3a479e837250d6fcdbc251baa54e9fc11cfb Author: tmaret <[email protected]> AuthorDate: Fri Nov 9 14:26:56 2018 +0100 SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items --- ...nfo.java => DistributionQueueCapabilities.java} | 24 +++++++- .../queue/impl/DistributionQueueWrapper.java | 47 ++++++++++++++ .../jobhandling/JobHandlingDistributionQueue.java | 42 +++++++++++++ .../queue/impl/resource/ResourceQueue.java | 45 ++++++++++++++ .../queue/impl/simple/SimpleDistributionQueue.java | 42 +++++++++++++ .../distribution/queue/spi/DistributionQueue.java | 28 +++++++++ .../sling/distribution/queue/spi/package-info.java | 2 +- .../servlet/DistributionAgentQueueServlet.java | 72 ++++++++++++++-------- 8 files changed, 273 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java similarity index 59% copy from src/main/java/org/apache/sling/distribution/queue/spi/package-info.java copy to src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java index 96aa8bd..048e815 100644 --- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java +++ b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java @@ -16,9 +16,27 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.sling.distribution.queue; -@Version("0.0.1") -package org.apache.sling.distribution.queue.spi; +import aQute.bnd.annotation.ProviderType; -import aQute.bnd.annotation.Version; +@ProviderType +public final class DistributionQueueCapabilities { + /** + * Indicates that the queue supports removing random entries. + */ + public static final String REMOVABLE = "removable"; + + /** + * Indicates that the queue supports clearing entries. + */ + public static final String CLEARABLE = "clearable"; + + /** + * Indicates that the queue supports adding entries. + */ + public static final String APPENDABLE = "appendable"; + + +} diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java index dc179d3..4522325 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java @@ -18,13 +18,29 @@ */ package org.apache.sling.distribution.queue.impl; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.jetbrains.annotations.NotNull; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE; + public abstract class DistributionQueueWrapper implements DistributionQueue { + + private static final Set<String> CAPABILITIES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE))); + + final DistributionQueue wrappedQueue; DistributionQueueWrapper(DistributionQueue wrappedQueue) { @@ -69,4 +85,35 @@ public abstract class DistributionQueueWrapper implements DistributionQueue { public DistributionQueueStatus getStatus() { return wrappedQueue.getStatus(); } + + @NotNull + @Override + public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) { + List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>(); + for (String entryId : entryIds) { + DistributionQueueEntry entry = remove(entryId); + if (entry != null) { + removed.add(entry); + } + } + return removed; + } + + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + + @Override + public boolean hasCapability(@NotNull String capability) { + return CAPABILITIES.contains(capability); + } } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java index d19424f..e8b9081 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java @@ -19,9 +19,14 @@ package org.apache.sling.distribution.queue.impl.jobhandling; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; + import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -37,6 +42,10 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE; + /** * a {@link DistributionQueue} based on Sling Job Handling facilities */ @@ -44,6 +53,9 @@ public class JobHandlingDistributionQueue implements DistributionQueue { public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue"; + private static final Set<String> CAPABILITIES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE))); + private final Logger log = LoggerFactory.getLogger(getClass()); private final String name; @@ -162,6 +174,19 @@ public class JobHandlingDistributionQueue implements DistributionQueue { return null; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) { + List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>(); + for (String entryId : entryIds) { + DistributionQueueEntry entry = remove(entryId); + if (entry != null) { + removed.add(entry); + } + } + return removed; + } + public DistributionQueueEntry remove(@NotNull String id) { boolean removed = false; Job job = getJob(id); @@ -202,4 +227,21 @@ public class JobHandlingDistributionQueue implements DistributionQueue { return type; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + + @Override + public boolean hasCapability(@NotNull String capability) { + return CAPABILITIES.contains(capability); + } } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java index 28162c3..612211d 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java @@ -36,10 +36,23 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; + +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE; public class ResourceQueue implements DistributionQueue { + + private static final Set<String> CAPABILITIES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE))); + private final Logger log = LoggerFactory.getLogger(getClass()); @@ -160,6 +173,19 @@ public class ResourceQueue implements DistributionQueue { } } + @NotNull + @Override + public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) { + List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>(); + for (String entryId : entryIds) { + DistributionQueueEntry entry = remove(entryId); + if (entry != null) { + removed.add(entry); + } + } + return removed; + } + @Nullable @Override public DistributionQueueEntry remove(@NotNull String itemId) { @@ -228,4 +254,23 @@ public class ResourceQueue implements DistributionQueue { DistributionQueueItem item = entry.getItem(); log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() }); } + + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + + @Override + public boolean hasCapability(@NotNull String capability) { + return CAPABILITIES.contains(capability); + } + } diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java index 26e05d4..24b376b 100644 --- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java @@ -19,12 +19,17 @@ package org.apache.sling.distribution.queue.impl.simple; import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.WeakHashMap; import java.util.concurrent.LinkedBlockingQueue; + import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -39,6 +44,10 @@ import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE; + /** * A simple implementation of a {@link DistributionQueue}. * <p/> @@ -52,6 +61,9 @@ public class SimpleDistributionQueue implements DistributionQueue { private final Logger log = LoggerFactory.getLogger(getClass()); + private static final Set<String> CAPABILITIES = Collections.unmodifiableSet( + new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE))); + private final String name; private final Queue<DistributionQueueItem> queue; @@ -122,6 +134,11 @@ public class SimpleDistributionQueue implements DistributionQueue { return DistributionQueueType.ORDERED; } + @Override + public boolean hasCapability(@NotNull String capability) { + return CAPABILITIES.contains(capability); + } + @NotNull public Iterable<DistributionQueueEntry> getItems(int skip, int limit) { @@ -144,6 +161,18 @@ public class SimpleDistributionQueue implements DistributionQueue { return null; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) { + List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>(); + for (String entryId : entryIds) { + DistributionQueueEntry entry = remove(entryId); + if (entry != null) { + removed.add(entry); + } + } + return removed; + } @Nullable public DistributionQueueEntry remove(@NotNull String id) { @@ -168,4 +197,17 @@ public class SimpleDistributionQueue implements DistributionQueue { '}'; } + @NotNull + @Override + public Iterable<DistributionQueueEntry> clear(int limit) { + final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>(); + for (DistributionQueueEntry entry : getItems(0, limit)) { + DistributionQueueEntry removed = remove(entry.getId()); + if (removed != null) { + removedEntries.add(removed); + } + } + return removedEntries; + } + } diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java index 7cff77e..cd052e1 100644 --- a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java +++ b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java @@ -18,6 +18,8 @@ */ package org.apache.sling.distribution.queue.spi; +import java.util.Set; + import aQute.bnd.annotation.ConsumerType; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.packaging.DistributionPackage; @@ -99,6 +101,26 @@ public interface DistributionQueue { DistributionQueueEntry remove(@NotNull String itemId); /** + * Remove a set entries from the queue by specifying their identifiers. + * + * @param entryIds The identifiers of the entries to be removed + * @return an iterable over the removed entries + */ + @NotNull + Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds); + + /** + * Clear a range of entries from the queue. The range starts from + * the head entry, includes the specified #limit number of entries. + * + * @param limit The maximum number of entries to remove. All entries + * are removed when the limit is {@code -1}. + * @return an iterable over the removed entries + */ + @NotNull + Iterable<DistributionQueueEntry> clear(int limit); + + /** * get the status of the queue * @return the queue status */ @@ -111,4 +133,10 @@ public interface DistributionQueue { */ @NotNull DistributionQueueType getType(); + + /** + * @return {@code true} if the queue supports the capability ; + * {@code false} otherwise + */ + boolean hasCapability(@NotNull String capability); } diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java index 96aa8bd..912273a 100644 --- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java +++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java @@ -17,7 +17,7 @@ * under the License. */ -@Version("0.0.1") +@Version("1.0.0") package org.apache.sling.distribution.queue.spi; import aQute.bnd.annotation.Version; diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java index dffc64c..9a0ee8c 100644 --- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java +++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java @@ -20,6 +20,9 @@ package org.apache.sling.distribution.servlet; import javax.servlet.ServletException; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import org.apache.felix.scr.annotations.Reference; import org.apache.felix.scr.annotations.sling.SlingServlet; @@ -42,6 +45,10 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE; +import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE; + /** * Servlet to retrieve a {@link DistributionQueue} status. */ @@ -69,9 +76,12 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { if ("delete".equals(operation)) { String limitParam = request.getParameter("limit"); String[] idParam = request.getParameterValues("id"); - if (idParam != null) { - deleteItems(resourceResolver, queue, idParam); + if (queue.hasCapability(REMOVABLE)) { + deleteItems(resourceResolver, queue, new HashSet<String>(Arrays.asList(idParam))); + } else { + unsupported(REMOVABLE); + } } else { int limit = 1; try { @@ -79,28 +89,41 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { } catch (NumberFormatException ex) { log.warn("limit param malformed : "+limitParam, ex); } - deleteItems(resourceResolver, queue, limit); + if (queue.hasCapability(CLEARABLE)) { + clearItems(resourceResolver, queue, limit); + } else { + unsupported(CLEARABLE); + } } } else if ("copy".equals(operation)) { String from = request.getParameter("from"); String[] idParam = request.getParameterValues("id"); if (idParam != null && from != null) { - DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class); - DistributionQueue sourceQueue = getQueueOrThrow(agent,from); + if (queue.hasCapability(APPENDABLE)) { + DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class); + DistributionQueue sourceQueue = getQueueOrThrow(agent,from); + + addItems(resourceResolver, queue, sourceQueue, idParam); + } else { + unsupported(APPENDABLE); + } - addItems(resourceResolver, queue, sourceQueue, idParam); } } else if ("move".equals(operation)) { String from = request.getParameter("from"); String[] idParam = request.getParameterValues("id"); if (idParam != null && from != null) { - DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class); - DistributionQueue sourceQueue = getQueueOrThrow(agent,from); - - addItems(resourceResolver, queue, sourceQueue, idParam); - deleteItems(resourceResolver, sourceQueue, idParam); + if (queue.hasCapability(APPENDABLE) && queue.hasCapability(REMOVABLE)) { + DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class); + DistributionQueue sourceQueue = getQueueOrThrow(agent,from); + + addItems(resourceResolver, queue, sourceQueue, idParam); + deleteItems(resourceResolver, queue, new HashSet<String>(Arrays.asList(idParam))); + } else { + unsupported(APPENDABLE, REMOVABLE); + } } } } @@ -116,26 +139,21 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { } } - private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) { - for (DistributionQueueEntry item : queue.getItems(0, limit)) { - deleteItem(resourceResolver, queue, item); + private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, Set<String> entryIds) { + for (DistributionQueueEntry removed : queue.remove(entryIds)) { + releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName()); } } - private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, String[] ids) { - for (String id : ids) { - DistributionQueueEntry entry = queue.getItem(id); - deleteItem(resourceResolver, queue, entry); + private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) { + for (DistributionQueueEntry removed : queue.clear(limit)) { + releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName()); } } - private void deleteItem(ResourceResolver resourceResolver, DistributionQueue queue, DistributionQueueEntry entry) { - DistributionQueueItem item = entry.getItem(); - String id = entry.getId(); - queue.remove(id); - - DistributionPackage distributionPackage = getPackage(resourceResolver, item); - DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName()); + private void releaseOrDeletePackage(ResourceResolver resourceResolver, DistributionQueueItem queueItem, String queueName) { + DistributionPackage distributionPackage = getPackage(resourceResolver, queueItem); + DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName); } private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) { @@ -156,6 +174,10 @@ public class DistributionAgentQueueServlet extends SlingAllMethodsServlet { return null; } + private void unsupported(String ... capability) { + throw new UnsupportedOperationException(String.format("Capabilities %s not supported", capability)); + } + @NotNull private static DistributionQueue getQueueOrThrow(@NotNull DistributionAgent agent, @NotNull String queueName) { DistributionQueue queue = agent.getQueue(queueName);
