Author: mpetria
Date: Tue Feb 23 10:14:06 2016
New Revision: 1731829

URL: http://svn.apache.org/viewvc?rev=1731829&view=rev
Log:
SLING-5550: optimize acquiring of distribution packages

Modified:
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
    
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
 Tue Feb 23 10:14:06 2016
@@ -276,8 +276,6 @@ public class SimpleDistributionAgent imp
             distributionResponses.add(new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
         }
 
-        log.debug("PACKAGE-QUEUED {}: packageId={}, info={}", requestId, 
distributionPackage.getId(), distributionPackage.getInfo());
-
         return distributionResponses;
     }
 
@@ -616,7 +614,7 @@ public class SimpleDistributionAgent imp
                 final long endTime = System.currentTimeMillis();
 
 
-                log.debug("[{}] ITEM-PROCESSED item={}, status={}, 
processingTime={}", queueName, queueItem, success, endTime - startTime);
+                log.debug("[{}] ITEM-PROCESSED item={}, status={}, 
processingTime={}ms", queueName, queueItem, success, endTime - startTime);
 
                 return success;
 
@@ -635,7 +633,7 @@ public class SimpleDistributionAgent imp
 
         private final String callingUser;
         private final String requestId;
-        private final long startTime;
+        private final long requestStartTime;
         private final AtomicInteger packagesCount = new AtomicInteger();
         private final AtomicLong packagesSize = new AtomicLong();
         private final List<DistributionResponse> allResponses = new 
ArrayList<DistributionResponse>();
@@ -652,19 +650,25 @@ public class SimpleDistributionAgent imp
             return packagesSize.get();
         }
 
-        PackageExporterProcessor(String callingUser, String requestId, long 
startTime) {
+        PackageExporterProcessor(String callingUser, String requestId, long 
requestStartTime) {
             this.callingUser = callingUser;
             this.requestId = requestId;
-            this.startTime = startTime;
+            this.requestStartTime = requestStartTime;
         }
 
         @Override
         public void process(DistributionPackage distributionPackage) {
+            final long startTime = System.currentTimeMillis();
+
             Collection<SimpleDistributionResponse> responses = 
scheduleImportPackage(distributionPackage, callingUser,
-                    requestId, startTime);
+                    requestId, requestStartTime);
             packagesCount.incrementAndGet();
             packagesSize.addAndGet(distributionPackage.getSize());
             allResponses.addAll(responses);
+
+            final long endTime = System.currentTimeMillis();
+
+            log.debug("PACKAGE-QUEUED {}: packageId={}, queueTime={}ms, 
responses={}", requestId, distributionPackage.getId(), endTime - startTime, 
responses.size());
         }
     }
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
 Tue Feb 23 10:14:06 2016
@@ -56,30 +56,42 @@ public class DistributionPackageUtils {
 
 
     /**
-     * Acquires the package if it's a {@link SharedDistributionPackage}, via 
{@link SharedDistributionPackage#acquire(String)}
+     * Acquires the package if it's a {@link SharedDistributionPackage}, via 
{@link SharedDistributionPackage#acquire(String[])}
      * @param distributionPackage a distribution package
-     * @param queueName the name of the queue in which the package should be 
acquired
+     * @param queueNames the name of the queue in which the package should be 
acquired
      */
-    public static void acquire(DistributionPackage distributionPackage, String 
queueName) {
+    public static void acquire(DistributionPackage distributionPackage, 
String... queueNames) {
         if (distributionPackage instanceof SharedDistributionPackage) {
-            ((SharedDistributionPackage) 
distributionPackage).acquire(queueName);
+            ((SharedDistributionPackage) 
distributionPackage).acquire(queueNames);
+        }
+    }
+
+
+    /**
+     * Releases the package if it's a {@link SharedDistributionPackage}, via 
{@link SharedDistributionPackage#release(String[])}
+     * @param distributionPackage a distribution package
+     * @param queueNames the name of the queue in which the package should be 
released
+     */
+    public static void release(DistributionPackage distributionPackage, 
String... queueNames) {
+        if (distributionPackage instanceof SharedDistributionPackage) {
+            ((SharedDistributionPackage) 
distributionPackage).release(queueNames);
         }
     }
 
     /**
      * Releases a distribution package if it's a {@link 
SharedDistributionPackage}, otherwise deletes it.
      * @param distributionPackage a distribution package
-     * @param queueName the name of the queue from which it should be 
eventually released
+     * @param queueNames the name of the queue from which it should be 
eventually released
      */
-    public static void releaseOrDelete(DistributionPackage 
distributionPackage, String queueName) {
+    public static void releaseOrDelete(DistributionPackage 
distributionPackage, String... queueNames) {
         if (distributionPackage == null) {
             return;
         }
         try {
             if (distributionPackage instanceof SharedDistributionPackage) {
-                if (queueName != null) {
-                    ((SharedDistributionPackage) 
distributionPackage).release(queueName);
-                    log.debug("package {} released from queue {}", 
distributionPackage.getId(), queueName);
+                if (queueNames != null) {
+                    ((SharedDistributionPackage) 
distributionPackage).release(queueNames);
+                    log.debug("package {} released from queue {}", 
distributionPackage.getId(), queueNames);
                 } else {
                     log.error("package {} cannot be released from null queue", 
distributionPackage.getId());
                 }

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
 Tue Feb 23 10:14:06 2016
@@ -80,7 +80,7 @@ public class LocalDistributionPackageImp
     @Activate
     public void activate(Map<String, Object> config) {
         name = PropertiesUtil.toString(config.get(NAME), null);
-        importer = new LocalDistributionPackageImporter(new 
DefaultSharedDistributionPackageBuilder(packageBuilder));
+        importer = new LocalDistributionPackageImporter(packageBuilder);
     }
 
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
 Tue Feb 23 10:14:06 2016
@@ -45,44 +45,39 @@ public class MultipleQueueDispatchingStr
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private final List<String> queueNames;
+    private final String[] queueNames;
 
     public MultipleQueueDispatchingStrategy(String[] queueNames) {
-        this.queueNames = 
Collections.unmodifiableList(Arrays.asList(queueNames));
+        this.queueNames = Arrays.copyOf(queueNames, queueNames.length);
     }
 
     public Iterable<DistributionQueueItemStatus> add(@Nonnull 
DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider 
queueProvider) throws DistributionException {
 
-        if (!(distributionPackage instanceof SharedDistributionPackage) && 
queueNames.size() > 1) {
+        if (!(distributionPackage instanceof SharedDistributionPackage) && 
queueNames.length > 1) {
             throw new DistributionException("distribution package must be a 
shared package to be added in multiple queues");
         }
 
         DistributionQueueItem queueItem = getItem(distributionPackage);
         List<DistributionQueueItemStatus> result = new 
ArrayList<DistributionQueueItemStatus>();
 
-        // acquire the package temporarily until all queues are filled
-        String tempQueueName = "temp" + UUID.randomUUID();
-        DistributionPackageUtils.acquire(distributionPackage, tempQueueName);
-
-        try {
-            for (String queueName : queueNames) {
-                DistributionQueue queue = queueProvider.getQueue(queueName);
-                DistributionQueueItemStatus status = new 
DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());
-
-                DistributionPackageUtils.acquire(distributionPackage, 
queueName);
-                DistributionQueueEntry queueEntry = queue.add(queueItem);
-
-                if (queueEntry != null) {
-                    status = queueEntry.getStatus();
-                } else {
-                    
DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
-                    log.error("cannot add package {} to queue {}", 
distributionPackage.getId(), queueName);
-                }
+        // first acquire the package for all queues
+        DistributionPackageUtils.acquire(distributionPackage, queueNames);
 
-                result.add(status);
+        // second add the package to all queues
+        for (String queueName : queueNames) {
+            DistributionQueue queue = queueProvider.getQueue(queueName);
+            DistributionQueueItemStatus status = new 
DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());
+
+            DistributionQueueEntry queueEntry = queue.add(queueItem);
+
+            if (queueEntry != null) {
+                status = queueEntry.getStatus();
+            } else {
+                DistributionPackageUtils.release(distributionPackage, 
queueName);
+                log.error("cannot add package {} to queue {}", 
distributionPackage.getId(), queueName);
             }
-        } finally {
-            DistributionPackageUtils.releaseOrDelete(distributionPackage, 
tempQueueName);
+
+            result.add(status);
         }
 
         return result;
@@ -91,7 +86,7 @@ public class MultipleQueueDispatchingStr
 
     @Nonnull
     public List<String> getQueueNames() {
-        return queueNames;
+        return Arrays.asList(queueNames);
     }
 
     private DistributionQueueItem getItem(DistributionPackage 
distributionPackage) {

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
 Tue Feb 23 10:14:06 2016
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.se
 import javax.annotation.Nonnull;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,29 +51,29 @@ public class DefaultSharedDistributionPa
         this.distributionPackage = distributionPackage;
     }
 
-    public void acquire(@Nonnull String holderName) {
-        if (holderName.length() == 0) {
+    public void acquire(@Nonnull String[] holderNames) {
+        if (holderNames.length == 0) {
             throw new IllegalArgumentException("holder name cannot be null or 
empty");
         }
 
         try {
-            createHolderResource(holderName);
+            createHolderResource(holderNames);
 
-            log.debug("acquired package {} for holder {}", new 
Object[]{packagePath, holderName});
+            log.debug("acquired package {} for holder {}", new 
Object[]{packagePath, Arrays.toString(holderNames)});
 
         } catch (PersistenceException e) {
             log.error("cannot acquire package", e);
         }
     }
 
-    public void release(@Nonnull String holderName) {
+    public void release(@Nonnull String[] holderNames) {
 
-        if (holderName.length() == 0) {
+        if (holderNames.length == 0) {
             throw new IllegalArgumentException("holder name cannot be null or 
empty");
         }
 
         try {
-            boolean doPackageDelete = deleteHolderResource(holderName);
+            boolean doPackageDelete = deleteHolderResource(holderNames);
 
 
             if (doPackageDelete) {
@@ -83,7 +84,7 @@ public class DefaultSharedDistributionPa
                 resourceResolver.commit();
             }
 
-            log.debug("released package {} from holder {} delete {}", new 
Object[]{packagePath, holderName, doPackageDelete});
+            log.debug("released package {} from holder {} delete {}", new 
Object[]{packagePath, Arrays.toString(holderNames), doPackageDelete});
         } catch (PersistenceException e) {
             log.error("cannot release package", e);
         }
@@ -153,7 +154,7 @@ public class DefaultSharedDistributionPa
         return null;
     }
 
-    private void createHolderResource(String holderName) throws 
PersistenceException {
+    private void createHolderResource(String[] holderNames) throws 
PersistenceException {
 
         Resource holderRoot = getHolderRootResource();
 
@@ -161,25 +162,30 @@ public class DefaultSharedDistributionPa
             return;
         }
 
-        Resource holder = holderRoot.getChild(holderName);
+        for (String holderName : holderNames) {
+            Resource holder = holderRoot.getChild(holderName);
 
-        if (holder != null) {
-            return;
+            if (holder != null) {
+                return;
+            }
+
+            resourceResolver.create(holderRoot, holderName, 
Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) 
"sling:Folder"));
         }
 
-        resourceResolver.create(holderRoot, holderName, 
Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) 
"sling:Folder"));
         resourceResolver.commit();
     }
 
-    private boolean deleteHolderResource(String holderName) throws 
PersistenceException {
+    private boolean deleteHolderResource(String[] holderNames) throws 
PersistenceException {
         boolean doPackageDelete = false;
         Resource holderRoot = getHolderRootResource();
 
         if (holderRoot != null) {
-            Resource holder = holderRoot.getChild(holderName);
+            for (String holderName : holderNames) {
+                Resource holder = holderRoot.getChild(holderName);
 
-            if (holder != null) {
-                resourceResolver.delete(holder);
+                if (holder != null) {
+                    resourceResolver.delete(holder);
+                }
             }
         }
 

Modified: 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
URL: 
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
--- 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
 (original)
+++ 
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
 Tue Feb 23 10:14:06 2016
@@ -33,12 +33,12 @@ public interface SharedDistributionPacka
     /**
      * acquire a reference to this package and increase the reference count.
      */
-    void acquire(@Nonnull String holderName);
+    void acquire(@Nonnull String[] holderNames);
 
     /**
      * release a reference to this package and decrease the reference count.
      * when no more references are hold the package {@code 
DistributionPackage#delete} method is called.
      */
-    void release(@Nonnull String holderName);
+    void release(@Nonnull String[] holderNames);
 
 }


Reply via email to