Author: mpetria
Date: Tue Feb 23 10:14:06 2016
New Revision: 1731829
URL: http://svn.apache.org/viewvc?rev=1731829&view=rev
Log:
SLING-5550: optimize acquiring of distribution packages
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
Tue Feb 23 10:14:06 2016
@@ -276,8 +276,6 @@ public class SimpleDistributionAgent imp
distributionResponses.add(new
SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString()));
}
- log.debug("PACKAGE-QUEUED {}: packageId={}, info={}", requestId,
distributionPackage.getId(), distributionPackage.getInfo());
-
return distributionResponses;
}
@@ -616,7 +614,7 @@ public class SimpleDistributionAgent imp
final long endTime = System.currentTimeMillis();
- log.debug("[{}] ITEM-PROCESSED item={}, status={},
processingTime={}", queueName, queueItem, success, endTime - startTime);
+ log.debug("[{}] ITEM-PROCESSED item={}, status={},
processingTime={}ms", queueName, queueItem, success, endTime - startTime);
return success;
@@ -635,7 +633,7 @@ public class SimpleDistributionAgent imp
private final String callingUser;
private final String requestId;
- private final long startTime;
+ private final long requestStartTime;
private final AtomicInteger packagesCount = new AtomicInteger();
private final AtomicLong packagesSize = new AtomicLong();
private final List<DistributionResponse> allResponses = new
ArrayList<DistributionResponse>();
@@ -652,19 +650,25 @@ public class SimpleDistributionAgent imp
return packagesSize.get();
}
- PackageExporterProcessor(String callingUser, String requestId, long
startTime) {
+ PackageExporterProcessor(String callingUser, String requestId, long
requestStartTime) {
this.callingUser = callingUser;
this.requestId = requestId;
- this.startTime = startTime;
+ this.requestStartTime = requestStartTime;
}
@Override
public void process(DistributionPackage distributionPackage) {
+ final long startTime = System.currentTimeMillis();
+
Collection<SimpleDistributionResponse> responses =
scheduleImportPackage(distributionPackage, callingUser,
- requestId, startTime);
+ requestId, requestStartTime);
packagesCount.incrementAndGet();
packagesSize.addAndGet(distributionPackage.getSize());
allResponses.addAll(responses);
+
+ final long endTime = System.currentTimeMillis();
+
+ log.debug("PACKAGE-QUEUED {}: packageId={}, queueTime={}ms,
responses={}", requestId, distributionPackage.getId(), endTime - startTime,
responses.size());
}
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
Tue Feb 23 10:14:06 2016
@@ -56,30 +56,42 @@ public class DistributionPackageUtils {
/**
- * Acquires the package if it's a {@link SharedDistributionPackage}, via
{@link SharedDistributionPackage#acquire(String)}
+ * Acquires the package if it's a {@link SharedDistributionPackage}, via
{@link SharedDistributionPackage#acquire(String[])}
* @param distributionPackage a distribution package
- * @param queueName the name of the queue in which the package should be
acquired
+ * @param queueNames the name of the queue in which the package should be
acquired
*/
- public static void acquire(DistributionPackage distributionPackage, String
queueName) {
+ public static void acquire(DistributionPackage distributionPackage,
String... queueNames) {
if (distributionPackage instanceof SharedDistributionPackage) {
- ((SharedDistributionPackage)
distributionPackage).acquire(queueName);
+ ((SharedDistributionPackage)
distributionPackage).acquire(queueNames);
+ }
+ }
+
+
+ /**
+ * Releases the package if it's a {@link SharedDistributionPackage}, via
{@link SharedDistributionPackage#release(String[])}
+ * @param distributionPackage a distribution package
+ * @param queueNames the name of the queue in which the package should be
released
+ */
+ public static void release(DistributionPackage distributionPackage,
String... queueNames) {
+ if (distributionPackage instanceof SharedDistributionPackage) {
+ ((SharedDistributionPackage)
distributionPackage).release(queueNames);
}
}
/**
* Releases a distribution package if it's a {@link
SharedDistributionPackage}, otherwise deletes it.
* @param distributionPackage a distribution package
- * @param queueName the name of the queue from which it should be
eventually released
+ * @param queueNames the name of the queue from which it should be
eventually released
*/
- public static void releaseOrDelete(DistributionPackage
distributionPackage, String queueName) {
+ public static void releaseOrDelete(DistributionPackage
distributionPackage, String... queueNames) {
if (distributionPackage == null) {
return;
}
try {
if (distributionPackage instanceof SharedDistributionPackage) {
- if (queueName != null) {
- ((SharedDistributionPackage)
distributionPackage).release(queueName);
- log.debug("package {} released from queue {}",
distributionPackage.getId(), queueName);
+ if (queueNames != null) {
+ ((SharedDistributionPackage)
distributionPackage).release(queueNames);
+ log.debug("package {} released from queue {}",
distributionPackage.getId(), queueNames);
} else {
log.error("package {} cannot be released from null queue",
distributionPackage.getId());
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/packaging/impl/importer/LocalDistributionPackageImporterFactory.java
Tue Feb 23 10:14:06 2016
@@ -80,7 +80,7 @@ public class LocalDistributionPackageImp
@Activate
public void activate(Map<String, Object> config) {
name = PropertiesUtil.toString(config.get(NAME), null);
- importer = new LocalDistributionPackageImporter(new
DefaultSharedDistributionPackageBuilder(packageBuilder));
+ importer = new LocalDistributionPackageImporter(packageBuilder);
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/queue/impl/MultipleQueueDispatchingStrategy.java
Tue Feb 23 10:14:06 2016
@@ -45,44 +45,39 @@ public class MultipleQueueDispatchingStr
private final Logger log = LoggerFactory.getLogger(getClass());
- private final List<String> queueNames;
+ private final String[] queueNames;
public MultipleQueueDispatchingStrategy(String[] queueNames) {
- this.queueNames =
Collections.unmodifiableList(Arrays.asList(queueNames));
+ this.queueNames = Arrays.copyOf(queueNames, queueNames.length);
}
public Iterable<DistributionQueueItemStatus> add(@Nonnull
DistributionPackage distributionPackage, @Nonnull DistributionQueueProvider
queueProvider) throws DistributionException {
- if (!(distributionPackage instanceof SharedDistributionPackage) &&
queueNames.size() > 1) {
+ if (!(distributionPackage instanceof SharedDistributionPackage) &&
queueNames.length > 1) {
throw new DistributionException("distribution package must be a
shared package to be added in multiple queues");
}
DistributionQueueItem queueItem = getItem(distributionPackage);
List<DistributionQueueItemStatus> result = new
ArrayList<DistributionQueueItemStatus>();
- // acquire the package temporarily until all queues are filled
- String tempQueueName = "temp" + UUID.randomUUID();
- DistributionPackageUtils.acquire(distributionPackage, tempQueueName);
-
- try {
- for (String queueName : queueNames) {
- DistributionQueue queue = queueProvider.getQueue(queueName);
- DistributionQueueItemStatus status = new
DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());
-
- DistributionPackageUtils.acquire(distributionPackage,
queueName);
- DistributionQueueEntry queueEntry = queue.add(queueItem);
-
- if (queueEntry != null) {
- status = queueEntry.getStatus();
- } else {
-
DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
- log.error("cannot add package {} to queue {}",
distributionPackage.getId(), queueName);
- }
+ // first acquire the package for all queues
+ DistributionPackageUtils.acquire(distributionPackage, queueNames);
- result.add(status);
+ // second add the package to all queues
+ for (String queueName : queueNames) {
+ DistributionQueue queue = queueProvider.getQueue(queueName);
+ DistributionQueueItemStatus status = new
DistributionQueueItemStatus(DistributionQueueItemState.ERROR, queue.getName());
+
+ DistributionQueueEntry queueEntry = queue.add(queueItem);
+
+ if (queueEntry != null) {
+ status = queueEntry.getStatus();
+ } else {
+ DistributionPackageUtils.release(distributionPackage,
queueName);
+ log.error("cannot add package {} to queue {}",
distributionPackage.getId(), queueName);
}
- } finally {
- DistributionPackageUtils.releaseOrDelete(distributionPackage,
tempQueueName);
+
+ result.add(status);
}
return result;
@@ -91,7 +86,7 @@ public class MultipleQueueDispatchingStr
@Nonnull
public List<String> getQueueNames() {
- return queueNames;
+ return Arrays.asList(queueNames);
}
private DistributionQueueItem getItem(DistributionPackage
distributionPackage) {
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/DefaultSharedDistributionPackage.java
Tue Feb 23 10:14:06 2016
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.se
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,29 +51,29 @@ public class DefaultSharedDistributionPa
this.distributionPackage = distributionPackage;
}
- public void acquire(@Nonnull String holderName) {
- if (holderName.length() == 0) {
+ public void acquire(@Nonnull String[] holderNames) {
+ if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or
empty");
}
try {
- createHolderResource(holderName);
+ createHolderResource(holderNames);
- log.debug("acquired package {} for holder {}", new
Object[]{packagePath, holderName});
+ log.debug("acquired package {} for holder {}", new
Object[]{packagePath, Arrays.toString(holderNames)});
} catch (PersistenceException e) {
log.error("cannot acquire package", e);
}
}
- public void release(@Nonnull String holderName) {
+ public void release(@Nonnull String[] holderNames) {
- if (holderName.length() == 0) {
+ if (holderNames.length == 0) {
throw new IllegalArgumentException("holder name cannot be null or
empty");
}
try {
- boolean doPackageDelete = deleteHolderResource(holderName);
+ boolean doPackageDelete = deleteHolderResource(holderNames);
if (doPackageDelete) {
@@ -83,7 +84,7 @@ public class DefaultSharedDistributionPa
resourceResolver.commit();
}
- log.debug("released package {} from holder {} delete {}", new
Object[]{packagePath, holderName, doPackageDelete});
+ log.debug("released package {} from holder {} delete {}", new
Object[]{packagePath, Arrays.toString(holderNames), doPackageDelete});
} catch (PersistenceException e) {
log.error("cannot release package", e);
}
@@ -153,7 +154,7 @@ public class DefaultSharedDistributionPa
return null;
}
- private void createHolderResource(String holderName) throws
PersistenceException {
+ private void createHolderResource(String[] holderNames) throws
PersistenceException {
Resource holderRoot = getHolderRootResource();
@@ -161,25 +162,30 @@ public class DefaultSharedDistributionPa
return;
}
- Resource holder = holderRoot.getChild(holderName);
+ for (String holderName : holderNames) {
+ Resource holder = holderRoot.getChild(holderName);
- if (holder != null) {
- return;
+ if (holder != null) {
+ return;
+ }
+
+ resourceResolver.create(holderRoot, holderName,
Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)
"sling:Folder"));
}
- resourceResolver.create(holderRoot, holderName,
Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object)
"sling:Folder"));
resourceResolver.commit();
}
- private boolean deleteHolderResource(String holderName) throws
PersistenceException {
+ private boolean deleteHolderResource(String[] holderNames) throws
PersistenceException {
boolean doPackageDelete = false;
Resource holderRoot = getHolderRootResource();
if (holderRoot != null) {
- Resource holder = holderRoot.getChild(holderName);
+ for (String holderName : holderNames) {
+ Resource holder = holderRoot.getChild(holderName);
- if (holder != null) {
- resourceResolver.delete(holder);
+ if (holder != null) {
+ resourceResolver.delete(holder);
+ }
}
}
Modified:
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
URL:
http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java?rev=1731829&r1=1731828&r2=1731829&view=diff
==============================================================================
---
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
(original)
+++
sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/serialization/impl/SharedDistributionPackage.java
Tue Feb 23 10:14:06 2016
@@ -33,12 +33,12 @@ public interface SharedDistributionPacka
/**
* acquire a reference to this package and increase the reference count.
*/
- void acquire(@Nonnull String holderName);
+ void acquire(@Nonnull String[] holderNames);
/**
* release a reference to this package and decrease the reference count.
* when no more references are hold the package {@code
DistributionPackage#delete} method is called.
*/
- void release(@Nonnull String holderName);
+ void release(@Nonnull String[] holderNames);
}