This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-10592 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit c47fafd8e190b04d36714e73cb58e4b7b6b466e9 Author: tmaret <[email protected]> AuthorDate: Sat Jul 10 00:18:26 2021 +0200 SLING-10592 - Follow an exponential delay between import attempts --- .../impl/subscriber/DistributionSubscriber.java | 10 +++- .../sling/distribution/journal/shared/Delays.java | 32 +++++++++++ .../distribution/journal/shared/DelaysTest.java | 62 ++++++++++++++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index d37d1c2..e227286 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -22,6 +22,7 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; +import static org.apache.sling.distribution.journal.shared.Delays.exponential; import java.io.Closeable; import java.io.IOException; @@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.function.LongSupplier; +import java.util.function.Supplier; import javax.annotation.ParametersAreNonnullByDefault; @@ -85,8 +88,10 @@ import org.slf4j.LoggerFactory; public class DistributionSubscriber { private static final int PRECONDITION_TIMEOUT = 60; static int RETRY_DELAY = 5000; + static int MAX_RETRY_DELAY = 300000; // 5 minutes static int QUEUE_FETCH_DELAY = 1000; private static final long COMMAND_NOT_IDLE_DELAY_MS = 200; + private static final Supplier<LongSupplier> catchAllDelays = () -> exponential(RETRY_DELAY, MAX_RETRY_DELAY); private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class); @@ -142,6 +147,8 @@ public class DistributionSubscriber { private volatile boolean running = true; private Thread queueThread; + private LongSupplier catchAllDelay = catchAllDelays.get(); + @Activate public void activate(SubscriberConfiguration config, BundleContext context, Map<String, Object> properties) { String subSlingId = requireNonNull(slingSettings.getSlingId()); @@ -305,7 +312,7 @@ public class DistributionSubscriber { } catch (Exception e) { // Catch all to prevent processing from stopping LOG.error("Error processing queue item", e); - delay(RETRY_DELAY); + delay(catchAllDelay.getAsLong()); } } LOG.info("Stopped Queue processor"); @@ -318,6 +325,7 @@ public class DistributionSubscriber { processQueueItem(item); messageBuffer.remove(); distributionMetricsService.getItemsBufferSize().decrement(); + catchAllDelay = catchAllDelays.get(); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java new file mode 100644 index 0000000..34f7cbb --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/shared/Delays.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import java.util.function.LongSupplier; + +import static java.lang.Math.min; +import static java.util.stream.LongStream.iterate; + +public final class Delays { + + public static LongSupplier exponential(long startDelay, long maxDelay) { + return iterate(startDelay, delay -> min(2 * delay, maxDelay)).iterator()::next; + } + +} diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java new file mode 100644 index 0000000..335960f --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/shared/DelaysTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import java.util.function.LongSupplier; +import java.util.stream.LongStream; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class DelaysTest { + + private static final long START_DELAY = 1L; + + private static final long MAX_DELAY = 1000L; + + @Test + public void testExponentialStartDelay() { + LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY); + assertEquals(START_DELAY, delay.getAsLong()); + } + + @Test + public void testExponentialIncreasingDelay() { + LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY); + assertTrue(delay.getAsLong() < delay.getAsLong()); + } + + @Test + public void testExponentialIncreasingRateDelay() { + LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY); + assertEquals(1, delay.getAsLong()); + assertEquals(2, delay.getAsLong()); + assertEquals(4, delay.getAsLong()); + } + + @Test + public void testExponentialMaxDelay() { + LongSupplier delay = Delays.exponential(START_DELAY, MAX_DELAY); + long maxAfterHundredDelays = LongStream.generate(delay).limit(100).max().orElseThrow(IllegalStateException::new); + assertEquals(MAX_DELAY, maxAfterHundredDelays); + } + +} \ No newline at end of file
