This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-core.git
The following commit(s) were added to refs/heads/master by this push:
new 1511fb3 SLING-10123 Distribution agent queue processor should
implement a backoff in case of retries for processing an item (#49)
1511fb3 is described below
commit 1511fb3844f8cb699262cb82e5b920095638587f
Author: Mohit Arora <[email protected]>
AuthorDate: Tue Mar 16 15:28:59 2021 +0530
SLING-10123 Distribution agent queue processor should implement a backoff
in case of retries for processing an item (#49)
* Adding linear backoff for a random amount of time between 0 and 30
seconds while retrying to distribute an item in case of recoverable
distribution exception
* Incorporating review feedback
* We want to add the backoff for any retry of distribution item and not
just in case of recoverable exception
---
.../impl/SimpleDistributionAgentQueueProcessor.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git
a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
index 7ff8fbe..3b7963a 100644
---
a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
+++
b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java
@@ -110,6 +110,13 @@ class SimpleDistributionAgentQueueProcessor implements
DistributionQueueProcesso
DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus();
try {
+ int processingAttempt = queueItemStatus.getAttempts();
+ if (processingAttempt > 0) {
+ // since there is a retry, it is possible that the same error
is observed again
+ // we should add a linear backoff using random delay before
re-attempting to distribute the same item.
+ addRandomDelay(queueItemStatus.getAttempts());
+ }
+
String callingUser =
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER,
String.class);
String requestId =
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID,
String.class);
Long globalStartTime =
queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME,
Long.class);
@@ -173,6 +180,18 @@ class SimpleDistributionAgentQueueProcessor implements
DistributionQueueProcesso
return removeItemFromQueue;
}
+ private void addRandomDelay(int retryAttempts){
+ int min = 1;
+ int max = Math.min(retryAttempts, 30);
+ int random = (int)(Math.random() * (max - min + 1) + min);
+ try {
+ Thread.sleep(random * 1000);
+ } catch (InterruptedException ign) {
+ // eat this exception
+ // If there is an exception in Thread.sleep(), we will retry to
distribute queueItem immediately.
+ }
+ }
+
private boolean reEnqueuePackage(DistributionPackage distributionPackage) {
if (errorQueueStrategy == null) {