This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-10112 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit a096cf70c8b835ce29e6b496e200691e21634b72 Author: Christian Schneider <[email protected]> AuthorDate: Mon Feb 1 16:13:09 2021 +0100 SLING-10112 - Split Subscriber idle from system ready check --- .../impl/subscriber/DistributionSubscriber.java | 15 ++++-- .../journal/impl/subscriber/IdleCheck.java | 2 + .../journal/impl/subscriber/NoopIdle.java | 7 ++- .../journal/impl/subscriber/SubscriberIdle.java | 29 ++--------- .../impl/subscriber/SubscriberIdleCheck.java | 58 ++++++++++++++++++++++ .../impl/subscriber/SubscriberIdleTest.java | 37 ++++++-------- 6 files changed, 93 insertions(+), 55 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 7d14535..67512b2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -114,6 +114,8 @@ public class DistributionSubscriber { @Reference private SubscriberReadyStore subscriberReadyStore; + private volatile Closeable idleReadyCheck; //NOSONAR + private volatile IdleCheck idleCheck; //NOSONAR private Closeable packagePoller; @@ -150,11 +152,17 @@ public class DistributionSubscriber { requireNonNull(precondition); requireNonNull(bookKeeperFactory); + if (config.editable()) { + commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName); + } + if (config.subscriberIdleCheck()) { // Unofficial config (currently just for test) Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS); AtomicBoolean readyHolder = subscriberReadyStore.getReadyHolder(subAgentName); - idleCheck = new SubscriberIdle(context, idleMillies, readyHolder); + + idleCheck = new SubscriberIdle(idleMillies, readyHolder); + idleReadyCheck = new SubscriberIdleCheck(context, idleCheck); } else { idleCheck = new NoopIdle(); } @@ -174,9 +182,6 @@ public class DistributionSubscriber { packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign, HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage)); - if (config.editable()) { - commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName); - } queueThread = startBackgroundThread(this::processQueue, format("Queue Processor for Subscriber agent %s", subAgentName)); @@ -203,7 +208,7 @@ public class DistributionSubscriber { */ IOUtils.closeQuietly(announcer, bookKeeper, - packagePoller, idleCheck, commandPoller); + packagePoller, idleReadyCheck, idleCheck, commandPoller); running = false; try { queueThread.join(); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java index ace8a03..5640a87 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java @@ -33,4 +33,6 @@ public interface IdleCheck extends Closeable { * Called when processing of a message has finished */ void idle(); + + boolean isIdle(); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java index 07fe70e..6c82601 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java @@ -18,8 +18,6 @@ */ package org.apache.sling.distribution.journal.impl.subscriber; -import java.io.IOException; - public class NoopIdle implements IdleCheck { @Override @@ -36,4 +34,9 @@ public class NoopIdle implements IdleCheck { public void close() { } + + @Override + public boolean isIdle() { + return true; + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java index eb65339..fd67ab7 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java @@ -18,27 +18,18 @@ */ package org.apache.sling.distribution.journal.impl.subscriber; -import java.io.Closeable; -import java.util.Hashtable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.felix.systemready.CheckStatus; -import org.apache.felix.systemready.CheckStatus.State; -import org.apache.felix.systemready.StateType; -import org.apache.felix.systemready.SystemReadyCheck; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceRegistration; - /** * A DistributionSubscriber is considered ready when it is idle for more than * the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing * the same package for more than MAX_RETRIES times. */ -public class SubscriberIdle implements IdleCheck, SystemReadyCheck { +public class SubscriberIdle implements IdleCheck { public static final int DEFAULT_IDLE_TIME_MILLIS = 10000; public static final int MAX_RETRIES = 10; @@ -48,25 +39,16 @@ public class SubscriberIdle implements IdleCheck, SystemReadyCheck { private final ScheduledExecutorService executor; private ScheduledFuture<?> schedule; - private final ServiceRegistration<SystemReadyCheck> reg; - - public SubscriberIdle(BundleContext context, int idleMillis, AtomicBoolean readyHolder) { + public SubscriberIdle(int idleMillis, AtomicBoolean readyHolder) { this.idleMillis = idleMillis; this.isReady = readyHolder; executor = Executors.newScheduledThreadPool(1); idle(); - this.reg = context.registerService(SystemReadyCheck.class, this, new Hashtable<>()); } @Override - public String getName() { - return "DistributionSubscriber idle"; - } - - @Override - public CheckStatus getStatus() { - State state = isReady.get() ? State.GREEN : State.RED; - return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle"); + public boolean isIdle() { + return isReady.get(); } /** @@ -104,9 +86,6 @@ public class SubscriberIdle implements IdleCheck, SystemReadyCheck { @Override public void close() { executor.shutdownNow(); - if (reg != null) { - reg.unregister(); - } } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java new file mode 100644 index 0000000..238f777 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.subscriber; + +import java.io.Closeable; +import java.util.Hashtable; + +import org.apache.felix.systemready.CheckStatus; +import org.apache.felix.systemready.StateType; +import org.apache.felix.systemready.SystemReadyCheck; +import org.apache.felix.systemready.CheckStatus.State; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; + +public class SubscriberIdleCheck implements SystemReadyCheck, Closeable { + private final ServiceRegistration<SystemReadyCheck> reg; + private final IdleCheck idleCheck; + + public SubscriberIdleCheck(BundleContext context, IdleCheck idleCheck) { + this.idleCheck = idleCheck; + this.reg = context.registerService(SystemReadyCheck.class, this, new Hashtable<>()); + } + + @Override + public String getName() { + return "DistributionSubscriber idle"; + } + + @Override + public CheckStatus getStatus() { + State state = idleCheck.isIdle() ? State.GREEN : State.RED; + return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle"); + } + + @Override + public void close() { + if (reg != null) { + reg.unregister(); + } + } + +} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java index ed481c7..5540532 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java @@ -24,12 +24,9 @@ import static org.junit.Assert.assertThat; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.felix.systemready.CheckStatus.State; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; -import org.osgi.framework.BundleContext; public class SubscriberIdleTest { @@ -39,9 +36,8 @@ public class SubscriberIdleTest { @Before public void before() { - BundleContext context = Mockito.mock(BundleContext.class); readyHolder = new AtomicBoolean(); - idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder); + idle = new SubscriberIdle(IDLE_MILLIES, readyHolder); } @After @@ -51,50 +47,45 @@ public class SubscriberIdleTest { @Test public void testIdle() throws InterruptedException { - assertState("Initial state", State.RED); + assertThat("Initial state", idle.isIdle(), equalTo(false)); idle.busy(0); idle.idle(); - assertState("State after reset", State.RED); + assertThat("State after reset", idle.isIdle(), equalTo(false)); Thread.sleep(30); - assertState("State after time below idle limit", State.RED); + assertThat("State after time below idle limit", idle.isIdle(), equalTo(false)); idle.busy(0); Thread.sleep(80); idle.idle(); - assertState("State after long processing", State.RED); + assertThat("State after long processing", idle.isIdle(), equalTo(false)); Thread.sleep(80); - assertState("State after time over idle limit", State.GREEN); + assertThat("State after time over idle limit", idle.isIdle(), equalTo(true)); idle.busy(0); - assertState("State should not be reset once it reached GREEN", State.GREEN); + assertThat("State should not be reset once it reached GREEN", idle.isIdle(), equalTo(true)); } @Test public void testMaxRetries() { idle.busy(0); idle.idle(); - assertState("State with no retries", State.RED); + assertThat("State with no retries", idle.isIdle(), equalTo(false)); idle.busy(MAX_RETRIES); idle.idle(); - assertState("State with retries <= MAX_RETRIES", State.RED); + assertThat("State with retries <= MAX_RETRIES", idle.isIdle(), equalTo(false)); idle.busy(MAX_RETRIES + 1); idle.idle(); - assertState("State with retries > MAX_RETRIES", State.GREEN); + assertThat("State with retries > MAX_RETRIES", idle.isIdle(), equalTo(true)); idle.busy(0); - assertState("State should not be reset once it reached GREEN", State.GREEN); + assertThat("State should not be reset once it reached idle", idle.isIdle(), equalTo(true)); } @Test public void testStartIdle() throws InterruptedException { - BundleContext context = Mockito.mock(BundleContext.class); readyHolder = new AtomicBoolean(); - idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder); - assertState("Initial state", State.RED); + idle = new SubscriberIdle(IDLE_MILLIES, readyHolder); + assertThat("Initial state", idle.isIdle(), equalTo(false)); Thread.sleep(IDLE_MILLIES * 2); - assertState("State after time over idle limit", State.GREEN); + assertThat("State after time over idle limit", idle.isIdle(), equalTo(true)); idle.close(); } - - private void assertState(String message, State expectedState) { - assertThat(message, idle.getStatus().getState(), equalTo(expectedState)); - } }
