This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git
The following commit(s) were added to refs/heads/master by this push:
new cac4580 SLING-8086 - Extend distribution queue SPI with the ability
to clear and batch remove items
cac4580 is described below
commit cac45808dd4efbd1089f3a34d8b4bd456ac2dc26
Author: tmaret <[email protected]>
AuthorDate: Fri Nov 9 14:26:56 2018 +0100
SLING-8086 - Extend distribution queue SPI with the ability to clear and
batch remove items
---
...nfo.java => DistributionQueueCapabilities.java} | 24 +++++++++--
.../queue/impl/DistributionQueueWrapper.java | 33 ++++++++++++++
.../jobhandling/JobHandlingDistributionQueue.java | 42 ++++++++++++++++++
.../queue/impl/resource/ResourceQueue.java | 45 +++++++++++++++++++
.../queue/impl/simple/SimpleDistributionQueue.java | 42 ++++++++++++++++++
.../sling/distribution/queue/package-info.java | 2 +-
.../distribution/queue/spi/DistributionQueue.java | 28 ++++++++++++
.../sling/distribution/queue/spi/package-info.java | 2 +-
.../servlet/DistributionAgentQueueServlet.java | 50 ++++++++++++++--------
9 files changed, 244 insertions(+), 24 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/queue/package-info.java
b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
similarity index 62%
copy from src/main/java/org/apache/sling/distribution/queue/package-info.java
copy to
src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
index c81b5db..048e815 100644
--- a/src/main/java/org/apache/sling/distribution/queue/package-info.java
+++
b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
@@ -16,9 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-@Version("0.0.1")
package org.apache.sling.distribution.queue;
-import aQute.bnd.annotation.Version;
+import aQute.bnd.annotation.ProviderType;
+
+@ProviderType
+public final class DistributionQueueCapabilities {
+
+ /**
+ * Indicates that the queue supports removing random entries.
+ */
+ public static final String REMOVABLE = "removable";
+
+ /**
+ * Indicates that the queue supports clearing entries.
+ */
+ public static final String CLEARABLE = "clearable";
+
+ /**
+ * Indicates that the queue supports adding entries.
+ */
+ public static final String APPENDABLE = "appendable";
+
+}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index dc179d3..d7aa307 100644
---
a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++
b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -18,13 +18,29 @@
*/
package org.apache.sling.distribution.queue.impl;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.jetbrains.annotations.NotNull;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
public abstract class DistributionQueueWrapper implements DistributionQueue {
+
+ private static final Set<String> CAPABILITIES =
Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE,
CLEARABLE)));
+
+
final DistributionQueue wrappedQueue;
DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -69,4 +85,21 @@ public abstract class DistributionQueueWrapper implements
DistributionQueue {
public DistributionQueueStatus getStatus() {
return wrappedQueue.getStatus();
}
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String>
entryIds) {
+ return wrappedQueue.remove(entryIds);
+ }
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ return wrappedQueue.clear(limit);
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index d19424f..e8b9081 100644
---
a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++
b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -19,9 +19,14 @@
package org.apache.sling.distribution.queue.impl.jobhandling;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -37,6 +42,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
/**
* a {@link DistributionQueue} based on Sling Job Handling facilities
*/
@@ -44,6 +53,9 @@ public class JobHandlingDistributionQueue implements
DistributionQueue {
public final static String DISTRIBUTION_QUEUE_TOPIC =
"org/apache/sling/distribution/queue";
+ private static final Set<String> CAPABILITIES =
Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE,
CLEARABLE)));
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final String name;
@@ -162,6 +174,19 @@ public class JobHandlingDistributionQueue implements
DistributionQueue {
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String>
entryIds) {
+ List<DistributionQueueEntry> removed = new
ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
public DistributionQueueEntry remove(@NotNull String id) {
boolean removed = false;
Job job = getJob(id);
@@ -202,4 +227,21 @@ public class JobHandlingDistributionQueue implements
DistributionQueue {
return type;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new
ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 28162c3..612211d 100644
---
a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++
b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -36,10 +36,23 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
public class ResourceQueue implements DistributionQueue {
+
+ private static final Set<String> CAPABILITIES =
Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE,
CLEARABLE)));
+
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -160,6 +173,19 @@ public class ResourceQueue implements DistributionQueue {
}
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String>
entryIds) {
+ List<DistributionQueueEntry> removed = new
ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
@Nullable
@Override
public DistributionQueueEntry remove(@NotNull String itemId) {
@@ -228,4 +254,23 @@ public class ResourceQueue implements DistributionQueue {
DistributionQueueItem item = entry.getItem();
log.debug("queue[{}] {} entryId={} packageId={}", new Object[] {
queueName, scope, entryId, item.getPackageId() });
}
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new
ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
+
}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 26e05d4..24b376b 100644
---
a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++
b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -19,12 +19,17 @@
package org.apache.sling.distribution.queue.impl.simple;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,6 +44,10 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
/**
* A simple implementation of a {@link DistributionQueue}.
* <p/>
@@ -52,6 +61,9 @@ public class SimpleDistributionQueue implements
DistributionQueue {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Set<String> CAPABILITIES =
Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE,
CLEARABLE)));
+
private final String name;
private final Queue<DistributionQueueItem> queue;
@@ -122,6 +134,11 @@ public class SimpleDistributionQueue implements
DistributionQueue {
return DistributionQueueType.ORDERED;
}
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
+
@NotNull
public Iterable<DistributionQueueEntry> getItems(int skip, int limit) {
@@ -144,6 +161,18 @@ public class SimpleDistributionQueue implements
DistributionQueue {
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String>
entryIds) {
+ List<DistributionQueueEntry> removed = new
ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
@Nullable
public DistributionQueueEntry remove(@NotNull String id) {
@@ -168,4 +197,17 @@ public class SimpleDistributionQueue implements
DistributionQueue {
'}';
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new
ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/package-info.java
b/src/main/java/org/apache/sling/distribution/queue/package-info.java
index c81b5db..70795e9 100644
--- a/src/main/java/org/apache/sling/distribution/queue/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
-@Version("0.0.1")
+@Version("0.1.0")
package org.apache.sling.distribution.queue;
import aQute.bnd.annotation.Version;
diff --git
a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
index 7cff77e..cd052e1 100644
---
a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
+++
b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.queue.spi;
+import java.util.Set;
+
import aQute.bnd.annotation.ConsumerType;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.packaging.DistributionPackage;
@@ -99,6 +101,26 @@ public interface DistributionQueue {
DistributionQueueEntry remove(@NotNull String itemId);
/**
+ * Remove a set entries from the queue by specifying their identifiers.
+ *
+ * @param entryIds The identifiers of the entries to be removed
+ * @return an iterable over the removed entries
+ */
+ @NotNull
+ Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds);
+
+ /**
+ * Clear a range of entries from the queue. The range starts from
+ * the head entry, includes the specified #limit number of entries.
+ *
+ * @param limit The maximum number of entries to remove. All entries
+ * are removed when the limit is {@code -1}.
+ * @return an iterable over the removed entries
+ */
+ @NotNull
+ Iterable<DistributionQueueEntry> clear(int limit);
+
+ /**
* get the status of the queue
* @return the queue status
*/
@@ -111,4 +133,10 @@ public interface DistributionQueue {
*/
@NotNull
DistributionQueueType getType();
+
+ /**
+ * @return {@code true} if the queue supports the capability ;
+ * {@code false} otherwise
+ */
+ boolean hasCapability(@NotNull String capability);
}
diff --git
a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
index 96aa8bd..912273a 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
-@Version("0.0.1")
+@Version("1.0.0")
package org.apache.sling.distribution.queue.spi;
import aQute.bnd.annotation.Version;
diff --git
a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index dffc64c..767868b 100644
---
a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++
b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -20,6 +20,9 @@ package org.apache.sling.distribution.servlet;
import javax.servlet.ServletException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.sling.SlingServlet;
@@ -42,6 +45,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+import static
org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+
/**
* Servlet to retrieve a {@link DistributionQueue} status.
*/
@@ -69,9 +76,9 @@ public class DistributionAgentQueueServlet extends
SlingAllMethodsServlet {
if ("delete".equals(operation)) {
String limitParam = request.getParameter("limit");
String[] idParam = request.getParameterValues("id");
-
if (idParam != null) {
- deleteItems(resourceResolver, queue, idParam);
+ assertCapability(queue, REMOVABLE);
+ deleteItems(resourceResolver, queue, new
HashSet<String>(Arrays.asList(idParam)));
} else {
int limit = 1;
try {
@@ -79,28 +86,32 @@ public class DistributionAgentQueueServlet extends
SlingAllMethodsServlet {
} catch (NumberFormatException ex) {
log.warn("limit param malformed : "+limitParam, ex);
}
- deleteItems(resourceResolver, queue, limit);
+ assertCapability(queue, CLEARABLE);
+ clearItems(resourceResolver, queue, limit);
}
} else if ("copy".equals(operation)) {
String from = request.getParameter("from");
String[] idParam = request.getParameterValues("id");
if (idParam != null && from != null) {
+ assertCapability(queue, APPENDABLE);
DistributionAgent agent =
request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
addItems(resourceResolver, queue, sourceQueue, idParam);
+
}
} else if ("move".equals(operation)) {
String from = request.getParameter("from");
String[] idParam = request.getParameterValues("id");
if (idParam != null && from != null) {
+ assertCapability(queue, APPENDABLE);
DistributionAgent agent =
request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
-
+ assertCapability(sourceQueue, REMOVABLE);
addItems(resourceResolver, queue, sourceQueue, idParam);
- deleteItems(resourceResolver, sourceQueue, idParam);
+ deleteItems(resourceResolver, sourceQueue, new
HashSet<String>(Arrays.asList(idParam)));
}
}
}
@@ -116,26 +127,21 @@ public class DistributionAgentQueueServlet extends
SlingAllMethodsServlet {
}
}
- private void deleteItems(ResourceResolver resourceResolver,
DistributionQueue queue, int limit) {
- for (DistributionQueueEntry item : queue.getItems(0, limit)) {
- deleteItem(resourceResolver, queue, item);
+ private void deleteItems(ResourceResolver resourceResolver,
DistributionQueue queue, Set<String> entryIds) {
+ for (DistributionQueueEntry removed : queue.remove(entryIds)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(),
queue.getName());
}
}
- private void deleteItems(ResourceResolver resourceResolver,
DistributionQueue queue, String[] ids) {
- for (String id : ids) {
- DistributionQueueEntry entry = queue.getItem(id);
- deleteItem(resourceResolver, queue, entry);
+ private void clearItems(ResourceResolver resourceResolver,
DistributionQueue queue, int limit) {
+ for (DistributionQueueEntry removed : queue.clear(limit)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(),
queue.getName());
}
}
- private void deleteItem(ResourceResolver resourceResolver,
DistributionQueue queue, DistributionQueueEntry entry) {
- DistributionQueueItem item = entry.getItem();
- String id = entry.getId();
- queue.remove(id);
-
- DistributionPackage distributionPackage = getPackage(resourceResolver,
item);
- DistributionPackageUtils.releaseOrDelete(distributionPackage,
queue.getName());
+ private void releaseOrDeletePackage(ResourceResolver resourceResolver,
DistributionQueueItem queueItem, String queueName) {
+ DistributionPackage distributionPackage = getPackage(resourceResolver,
queueItem);
+ DistributionPackageUtils.releaseOrDelete(distributionPackage,
queueName);
}
private DistributionPackage getPackage(ResourceResolver resourceResolver,
DistributionQueueItem item) {
@@ -156,6 +162,12 @@ public class DistributionAgentQueueServlet extends
SlingAllMethodsServlet {
return null;
}
+ private void assertCapability(DistributionQueue queue, String capability) {
+ if (!queue.hasCapability(capability)) {
+ throw new UnsupportedOperationException(String.format("Capability
%s not supported for queue %s", capability, queue.getName()));
+ }
+ }
+
@NotNull
private static DistributionQueue getQueueOrThrow(@NotNull
DistributionAgent agent, @NotNull String queueName) {
DistributionQueue queue = agent.getQueue(queueName);