This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-10592
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit c47fafd8e190b04d36714e73cb58e4b7b6b466e9
Author: tmaret <[email protected]>
AuthorDate: Sat Jul 10 00:18:26 2021 +0200

    SLING-10592 - Follow an exponential delay between import attempts
---
 .../impl/subscriber/DistributionSubscriber.java    | 10 +++-
 .../sling/distribution/journal/shared/Delays.java  | 32 +++++++++++
 .../distribution/journal/shared/DelaysTest.java    | 62 ++++++++++++++++++++++
 3 files changed, 103 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index d37d1c2..e227286 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -22,6 +22,7 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static org.apache.sling.distribution.journal.shared.Delays.exponential;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
@@ -85,8 +88,10 @@ import org.slf4j.LoggerFactory;
 public class DistributionSubscriber {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
+    static int MAX_RETRY_DELAY = 300000; // 5 minutes
     static int QUEUE_FETCH_DELAY = 1000;
     private static final long COMMAND_NOT_IDLE_DELAY_MS = 200;
+    private static final Supplier<LongSupplier> catchAllDelays = () -> 
exponential(RETRY_DELAY, MAX_RETRY_DELAY);
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DistributionSubscriber.class);
 
@@ -142,6 +147,8 @@ public class DistributionSubscriber {
     private volatile boolean running = true;
     private Thread queueThread;
 
+    private LongSupplier catchAllDelay = catchAllDelays.get();
+
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext 
context, Map<String, Object> properties) {
         String subSlingId = requireNonNull(slingSettings.getSlingId());
@@ -305,7 +312,7 @@ public class DistributionSubscriber {
             } catch (Exception e) {
                 // Catch all to prevent processing from stopping
                 LOG.error("Error processing queue item", e);
-                delay(RETRY_DELAY);
+                delay(catchAllDelay.getAsLong());
             }
         }
         LOG.info("Stopped Queue processor");
@@ -318,6 +325,7 @@ public class DistributionSubscriber {
             processQueueItem(item);
             messageBuffer.remove();
             distributionMetricsService.getItemsBufferSize().decrement();
+            catchAllDelay = catchAllDelays.get();
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java 
b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java
new file mode 100644
index 0000000..34f7cbb
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+
+import static java.lang.Math.min;
+import static java.util.stream.LongStream.iterate;
+
+public final class Delays {
+
+    public static LongSupplier exponential(long startDelay, long maxDelay) {
+        return iterate(startDelay, delay -> min(2 * delay, 
maxDelay)).iterator()::next;
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java 
b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java
new file mode 100644
index 0000000..335960f
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import java.util.function.LongSupplier;
+import java.util.stream.LongStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class DelaysTest {
+
+    private static final long START_DELAY = 1L;
+
+    private static final long MAX_DELAY = 1000L;
+
+    @Test
+    public void testExponentialStartDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(START_DELAY, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertTrue(delay.getAsLong() < delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialIncreasingRateDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        assertEquals(1, delay.getAsLong());
+        assertEquals(2, delay.getAsLong());
+        assertEquals(4, delay.getAsLong());
+    }
+
+    @Test
+    public void testExponentialMaxDelay() {
+        LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY);
+        long maxAfterHundredDelays = 
LongStream.generate(delay).limit(100).max().orElseThrow(IllegalStateException::new);
+        assertEquals(MAX_DELAY, maxAfterHundredDelays);
+    }
+
+}
\ No newline at end of file

Reply via email to