This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-8531 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d95a933c0d62374b2c2f912f4751b90f63be8a84 Author: Christian Schneider <[email protected]> AuthorDate: Mon Jul 8 16:20:16 2019 +0200 SLING-8531 - Exponential backoff for retries --- pom.xml | 8 ++- .../journal/impl/shared/ExponentialBackOff.java | 75 ++++++++++++++++++++++ .../impl/shared/JournalAvailableChecker.java | 74 ++++++++++++++------- .../impl/shared/ExponentialBackoffTest.java | 52 +++++++++++++++ .../impl/shared/JournalAvailableCheckerTest.java | 53 +++++++++++++-- 5 files changed, 230 insertions(+), 32 deletions(-) diff --git a/pom.xml b/pom.xml index 60a109c..e204997 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ <parent> <groupId>org.apache.sling</groupId> <artifactId>sling</artifactId> - <version>34</version> + <version>35</version> <relativePath /> </parent> @@ -141,12 +141,14 @@ <!-- OSGi --> <dependency> <groupId>org.osgi</groupId> - <artifactId>org.osgi.service.component.annotations</artifactId> + <artifactId>osgi.core</artifactId> + <version>6.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.osgi</groupId> - <artifactId>org.osgi.service.metatype.annotations</artifactId> + <artifactId>osgi.cmpn</artifactId> + <version>6.0.0</version> <scope>provided</scope> </dependency> diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java new file mode 100644 index 0000000..148bd2a --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.shared; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retry with exponential backoff. + * + * Calls the checkCallback until it does not throw an Exception + */ +public class ExponentialBackOff implements Closeable { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private final ScheduledExecutorService executor; + private final Runnable checkCallback; + + private Random random; + private int maxDelay; + + private int currentMaxDelay; + + public ExponentialBackOff(int maxDelay, Runnable checkCallback) { + this.currentMaxDelay = 128; + this.maxDelay = maxDelay; + this.checkCallback = checkCallback; + this.executor = Executors.newScheduledThreadPool(1); + this.random = new Random(); + scheduleCheck(); + } + + @Override + public void close() throws IOException { + this.executor.shutdown(); + } + + private void scheduleCheck() { + this.currentMaxDelay = Math.min(this.currentMaxDelay *2, maxDelay); + int delay = this.random.nextInt(currentMaxDelay) + 1; + log.info("Scheduling next check in {} ms with maximum {} ms.", delay, currentMaxDelay); + this.executor.schedule(this::check, delay, TimeUnit.MILLISECONDS); + } + + private void check() { + try { + this.checkCallback.run(); + } catch (RuntimeException e) { + scheduleCheck(); + } + } +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java index 0d51329..dcb5fd9 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java @@ -18,40 +18,40 @@ */ package org.apache.sling.distribution.journal.impl.shared; -import javax.annotation.ParametersAreNonnullByDefault; - import static java.util.Objects.requireNonNull; -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE; -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.IOUtils; +import org.apache.sling.distribution.journal.ExceptionEventSender; +import org.apache.sling.distribution.journal.JournalAvailable; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventConstants; +import org.osgi.service.event.EventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService; -import org.apache.sling.distribution.journal.JournalAvailable; - -@Component( - service = {Runnable.class}, - immediate = true, - property = { - PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=true", - PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false", - PROPERTY_SCHEDULER_PERIOD + ":Long=" + 90 // 90 seconds - } +@Component( + property = EventConstants.EVENT_TOPIC + "=" + ExceptionEventSender.ERROR_TOPIC ) -@ParametersAreNonnullByDefault -public class JournalAvailableChecker implements JournalAvailable, Runnable { +public class JournalAvailableChecker implements JournalAvailable, EventHandler { + + private static final int MAX_RETGRY_DELAY_MS = 10000; + + // Minimal number of errors before journal is considered unavailable + public static final int MIN_ERRORS = 2; private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class); + @Reference Topics topics; @@ -60,26 +60,33 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable { @Reference DistributionMetricsService metrics; - + private BundleContext context; private volatile ServiceRegistration<JournalAvailable> reg; private GaugeService<Boolean> gauge; + private ExponentialBackOff backoffRetry; + + private AtomicInteger numErrors; + @Activate public void activate(BundleContext context) { requireNonNull(provider); requireNonNull(topics); this.context = context; + this.numErrors = new AtomicInteger(); gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable); LOG.info("Started Journal availability checker service"); + startChecks(); } @Deactivate public void deactivate() { gauge.close(); unRegister(); + IOUtils.closeQuietly(this.backoffRetry); LOG.info("Stopped Journal availability checker service"); } @@ -91,17 +98,20 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable { } private void available() { + this.backoffRetry = null; if (this.reg == null) { + IOUtils.closeQuietly(this.backoffRetry); LOG.info("Journal is available"); this.reg = context.registerService(JournalAvailable.class, this, null); } } private void unAvailable(Exception e) { + String msg = "Journal is still unavailable: " + e.getMessage(); if (LOG.isDebugEnabled()) { - LOG.warn("Journal is unavailable " + e.getMessage(), e); + LOG.warn(msg, e); } else { - LOG.warn("Journal is unavailable " + e.getMessage()); + LOG.warn(msg); } unRegister(); } @@ -110,7 +120,6 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable { return reg != null; } - @Override public void run() { try { LOG.debug("Journal checker is running"); @@ -118,6 +127,7 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable { available(); } catch (Exception e) { unAvailable(e); + throw e; } } @@ -127,4 +137,22 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable { this.reg = null; } } + + @Override + public synchronized void handleEvent(Event event) { + String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE); + int curNumErrors = this.numErrors.incrementAndGet(); + if (this.backoffRetry == null && curNumErrors >= MIN_ERRORS) { + LOG.warn("Received exception event {}. Journal is considered unavailable.", type); + unRegister(); + startChecks(); + } else { + LOG.info("Received exception event {}. {} of {} errors occured.", type, this.numErrors.get(), MIN_ERRORS); + } + } + + private void startChecks() { + this.backoffRetry = new ExponentialBackOff(MAX_RETGRY_DELAY_MS, this::run); + this.numErrors.set(0); + } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java new file mode 100644 index 0000000..01bcec5 --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.shared; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ExponentialBackoffTest { + + private static final int RETRIES = 10; + private static final int MAX_DELAY_MS = 256; + private CountDownLatch countDown = new CountDownLatch(RETRIES); + + @Test + public void testIsAvailable() throws Exception { + ExponentialBackOff backOff = new ExponentialBackOff(MAX_DELAY_MS, this::checkCallback); + boolean finished = this.countDown.await(MAX_DELAY_MS * RETRIES, TimeUnit.MILLISECONDS); + backOff.close(); + assertThat("Should finish before the timeout", finished, equalTo(true)); + } + + private void checkCallback() { + this.countDown.countDown(); + if (countDown.getCount() > 0) { + throw new RuntimeException("Failing num: " + this.countDown.getCount()); + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java index 622013a..fb407a2 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java @@ -20,19 +20,24 @@ package org.apache.sling.distribution.journal.impl.shared; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; import static org.osgi.util.converter.Converters.standardConverter; +import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import org.apache.sling.distribution.journal.ExceptionEventSender; import org.apache.sling.distribution.journal.JournalAvailable; import org.apache.sling.distribution.journal.MessagingException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,6 +48,7 @@ import org.mockito.Spy; import org.mockito.runners.MockitoJUnitRunner; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; +import org.osgi.service.event.Event; @RunWith(MockitoJUnitRunner.class) public class JournalAvailableCheckerTest { @@ -51,7 +57,7 @@ public class JournalAvailableCheckerTest { @InjectMocks private JournalAvailableChecker checker; - + @Spy private Topics topics = new Topics(); @@ -65,7 +71,7 @@ public class JournalAvailableCheckerTest { private BundleContext context; @Mock - private ServiceRegistration<JournalAvailable> reg; + private ServiceRegistration<JournalAvailable> sreg; @SuppressWarnings("rawtypes") @Mock @@ -78,7 +84,7 @@ public class JournalAvailableCheckerTest { doThrow(new MessagingException("topic is invalid")) .when(provider).assertTopic(INVALID_TOPIC); when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any())) - .thenReturn(reg); + .thenReturn(sreg); checker.activate(context); } @@ -89,18 +95,53 @@ public class JournalAvailableCheckerTest { @Test public void testIsAvailable() throws Exception { - topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC))); - checker.run(); + makeCheckFail(); + try { + checker.run(); + Assert.fail("Should throw exception"); + } catch (Exception e) { + } assertFalse(checker.isAvailable()); - topics.activate(topicsConfiguration(emptyMap())); + makeCheckSucceed(); checker.run(); assertTrue(checker.isAvailable()); } + @Test + public void testActivateChecksOnEvent() { + await("At the start checks are triggers and should set the state available") + .until(checker::isAvailable); + + makeCheckFail(); + Event event = createErrorEvent(new IOException("Expected")); + checker.handleEvent(event); + await().until(checker::isAvailable); + // Signal second exception to checker to start the checks. Now we should see not available + checker.handleEvent(event); + await().until(() -> !checker.isAvailable()); + + makeCheckSucceed(); + await().until(checker::isAvailable); + } + + private void makeCheckSucceed() { + topics.activate(topicsConfiguration(emptyMap())); + } + + private void makeCheckFail() { + topics.activate(topicsConfiguration(singletonMap("packageTopic", INVALID_TOPIC))); + } + private Topics.TopicsConfiguration topicsConfiguration(Map<String,String> props) { return standardConverter() .convert(props) .to(Topics.TopicsConfiguration.class); } + private static Event createErrorEvent(Exception e) { + Map<String, String> props = new HashMap<>(); + props.put(ExceptionEventSender.KEY_TYPE, e.getClass().getName()); + props.put(ExceptionEventSender.KEY_MESSAGE, e.getMessage()); + return new Event(ExceptionEventSender.ERROR_TOPIC, props); + } } \ No newline at end of file
