Author: tommaso
Date: Fri Jun 30 09:05:00 2017
New Revision: 1800364

URL: http://svn.apache.org/viewvc?rev=1800364&view=rev
Log:
SLING-6988 - async delivery should use an unordered queue

Modified:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java?rev=1800364&r1=1800363&r2=1800364&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/AsyncDeliveryDispatchingStrategy.java
 Fri Jun 30 09:05:00 2017
@@ -28,12 +28,7 @@ import org.apache.sling.distribution.pac
 import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils;
 import org.apache.sling.distribution.packaging.impl.ReferencePackage;
 import org.apache.sling.distribution.packaging.impl.SharedDistributionPackage;
-import org.apache.sling.distribution.queue.DistributionQueue;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
-import org.apache.sling.distribution.queue.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +87,7 @@ public class AsyncDeliveryDispatchingStr
                 DistributionPackageUtils.acquire(distributionPackage, 
deliveryQueueName);
 
                 // add the actual package to the delivery queue
-                DistributionQueue deliveryQueue = 
queueProvider.getQueue(deliveryQueueName);
+                DistributionQueue deliveryQueue = 
queueProvider.getQueue(deliveryQueueName, DistributionQueueType.PARALLEL);
                 DistributionQueueEntry deliveryQueueEntry = 
deliveryQueue.add(item);
                 if (deliveryQueueEntry != null) {
                     DistributionQueueItemStatus status = 
deliveryQueueEntry.getStatus();

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java?rev=1800364&r1=1800363&r2=1800364&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueueProvider.java
 Fri Jun 30 09:05:00 2017
@@ -101,12 +101,14 @@ public class JobHandlingDistributionQueu
                         QueueConfiguration.class.getName(), null);
                 Dictionary<String, Object> props = new Hashtable<String, 
Object>();
                 props.put(ConfigurationConstants.PROP_NAME, queueName);
-                props.put(ConfigurationConstants.PROP_TYPE, 
QueueConfiguration.Type.UNORDERED.name());
+                props.put(ConfigurationConstants.PROP_TYPE, 
DistributionQueueType.PARALLEL.equals(type) ?
+                        QueueConfiguration.Type.UNORDERED.name() : 
QueueConfiguration.Type.ORDERED.name());
                 props.put(ConfigurationConstants.PROP_TOPICS, new 
String[]{topic});
                 props.put(ConfigurationConstants.PROP_RETRIES, -1);
                 props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
                 props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
                 props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
+                props.put(ConfigurationConstants.PROP_MAX_PARALLEL, 
ConfigurationConstants.DEFAULT_MAX_PARALLEL);
                 config.update(props);
             }
         } catch (IOException e) {


Reply via email to