This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-10112
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit a096cf70c8b835ce29e6b496e200691e21634b72
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Feb 1 16:13:09 2021 +0100

    SLING-10112 - Split Subscriber idle from system ready check
---
 .../impl/subscriber/DistributionSubscriber.java    | 15 ++++--
 .../journal/impl/subscriber/IdleCheck.java         |  2 +
 .../journal/impl/subscriber/NoopIdle.java          |  7 ++-
 .../journal/impl/subscriber/SubscriberIdle.java    | 29 ++---------
 .../impl/subscriber/SubscriberIdleCheck.java       | 58 ++++++++++++++++++++++
 .../impl/subscriber/SubscriberIdleTest.java        | 37 ++++++--------
 6 files changed, 93 insertions(+), 55 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 7d14535..67512b2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -114,6 +114,8 @@ public class DistributionSubscriber {
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
     
+    private volatile Closeable idleReadyCheck; //NOSONAR
+    
     private volatile IdleCheck idleCheck; //NOSONAR
     
     private Closeable packagePoller;
@@ -150,11 +152,17 @@ public class DistributionSubscriber {
         requireNonNull(precondition);
         requireNonNull(bookKeeperFactory);
 
+        if (config.editable()) {
+            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName);
+        }
+
         if (config.subscriberIdleCheck()) {
             // Unofficial config (currently just for test)
             Integer idleMillies = (Integer) 
properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
             AtomicBoolean readyHolder = 
subscriberReadyStore.getReadyHolder(subAgentName);
-            idleCheck = new SubscriberIdle(context, idleMillies, readyHolder);
+            
+            idleCheck = new SubscriberIdle(idleMillies, readyHolder);
+            idleReadyCheck = new SubscriberIdleCheck(context, idleCheck);
         } else {
             idleCheck = new NoopIdle();
         }
@@ -174,9 +182,6 @@ public class DistributionSubscriber {
         packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
                 HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage));
 
-        if (config.editable()) {
-            commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName);
-        }
 
         queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", 
subAgentName));
@@ -203,7 +208,7 @@ public class DistributionSubscriber {
          */
 
         IOUtils.closeQuietly(announcer, bookKeeper, 
-                packagePoller, idleCheck, commandPoller);
+                packagePoller, idleReadyCheck, idleCheck, commandPoller);
         running = false;
         try {
             queueThread.join();
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
index ace8a03..5640a87 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
@@ -33,4 +33,6 @@ public interface IdleCheck extends Closeable {
      * Called when processing of a message has finished
      */
     void idle();
+
+    boolean isIdle();
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
index 07fe70e..6c82601 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopIdle.java
@@ -18,8 +18,6 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import java.io.IOException;
-
 public class NoopIdle implements IdleCheck {
 
     @Override
@@ -36,4 +34,9 @@ public class NoopIdle implements IdleCheck {
     public void close() {
 
     }
+
+    @Override
+    public boolean isIdle() {
+        return true;
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index eb65339..fd67ab7 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -18,27 +18,18 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
-import java.io.Closeable;
-import java.util.Hashtable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.felix.systemready.CheckStatus;
-import org.apache.felix.systemready.CheckStatus.State;
-import org.apache.felix.systemready.StateType;
-import org.apache.felix.systemready.SystemReadyCheck;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-
 /**
  * A DistributionSubscriber is considered ready when it is idle for more than
  * the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing
  * the same package for more than MAX_RETRIES times.
  */
-public class SubscriberIdle implements IdleCheck, SystemReadyCheck {
+public class SubscriberIdle implements IdleCheck {
     public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
 
     public static final int MAX_RETRIES = 10;
@@ -48,25 +39,16 @@ public class SubscriberIdle implements IdleCheck, 
SystemReadyCheck {
     private final ScheduledExecutorService executor;
     private ScheduledFuture<?> schedule;
 
-    private final ServiceRegistration<SystemReadyCheck> reg;
-    
-    public SubscriberIdle(BundleContext context, int idleMillis, AtomicBoolean 
readyHolder) {
+    public SubscriberIdle(int idleMillis, AtomicBoolean readyHolder) {
         this.idleMillis = idleMillis;
         this.isReady = readyHolder;
         executor = Executors.newScheduledThreadPool(1);
         idle();
-        this.reg = context.registerService(SystemReadyCheck.class, this, new 
Hashtable<>());
     }
     
     @Override
-    public String getName() {
-        return "DistributionSubscriber idle";
-    }
-
-    @Override
-    public CheckStatus getStatus() {
-        State state = isReady.get() ? State.GREEN : State.RED; 
-        return new CheckStatus(getName(), StateType.READY, state, 
"DistributionSubscriber idle");
+    public boolean isIdle() {
+        return isReady.get();
     }
     
     /**
@@ -104,9 +86,6 @@ public class SubscriberIdle implements IdleCheck, 
SystemReadyCheck {
     @Override
     public void close() {
         executor.shutdownNow();
-        if (reg != null) {
-            reg.unregister();
-        }
     }
 
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java
new file mode 100644
index 0000000..238f777
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleCheck.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.Hashtable;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+public class SubscriberIdleCheck implements SystemReadyCheck, Closeable {
+    private final ServiceRegistration<SystemReadyCheck> reg;
+    private final IdleCheck idleCheck;
+    
+    public SubscriberIdleCheck(BundleContext context, IdleCheck idleCheck) {
+        this.idleCheck = idleCheck;
+        this.reg = context.registerService(SystemReadyCheck.class, this, new 
Hashtable<>());
+    }
+
+    @Override
+    public String getName() {
+        return "DistributionSubscriber idle";
+    }
+
+    @Override
+    public CheckStatus getStatus() {
+        State state = idleCheck.isIdle() ? State.GREEN : State.RED; 
+        return new CheckStatus(getName(), StateType.READY, state, 
"DistributionSubscriber idle");
+    }
+
+    @Override
+    public void close() {
+        if (reg != null) {
+            reg.unregister();
+        }
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index ed481c7..5540532 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -24,12 +24,9 @@ import static org.junit.Assert.assertThat;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.felix.systemready.CheckStatus.State;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
-import org.osgi.framework.BundleContext;
 
 public class SubscriberIdleTest {
 
@@ -39,9 +36,8 @@ public class SubscriberIdleTest {
 
     @Before
     public void before() {
-        BundleContext context = Mockito.mock(BundleContext.class);
         readyHolder = new AtomicBoolean();
-        idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
+        idle = new SubscriberIdle(IDLE_MILLIES, readyHolder);
     }
 
     @After
@@ -51,50 +47,45 @@ public class SubscriberIdleTest {
     
     @Test
     public void testIdle() throws InterruptedException {
-        assertState("Initial state", State.RED);
+        assertThat("Initial state", idle.isIdle(), equalTo(false));
         idle.busy(0);
         idle.idle();
-        assertState("State after reset", State.RED);
+        assertThat("State after reset", idle.isIdle(), equalTo(false));
         Thread.sleep(30);
-        assertState("State after time below idle limit", State.RED);
+        assertThat("State after time below idle limit", idle.isIdle(), 
equalTo(false));
         idle.busy(0);
         Thread.sleep(80);
         idle.idle();
-        assertState("State after long processing", State.RED);
+        assertThat("State after long processing", idle.isIdle(), 
equalTo(false));
         Thread.sleep(80);
-        assertState("State after time over idle limit", State.GREEN);
+        assertThat("State after time over idle limit", idle.isIdle(), 
equalTo(true));
         idle.busy(0);
-        assertState("State should not be reset once it reached GREEN", 
State.GREEN);
+        assertThat("State should not be reset once it reached GREEN", 
idle.isIdle(), equalTo(true));
     }
 
     @Test
     public void testMaxRetries() {
         idle.busy(0);
         idle.idle();
-        assertState("State with no retries", State.RED);
+        assertThat("State with no retries", idle.isIdle(), equalTo(false));
         idle.busy(MAX_RETRIES);
         idle.idle();
-        assertState("State with retries <= MAX_RETRIES", State.RED);
+        assertThat("State with retries <= MAX_RETRIES", idle.isIdle(), 
equalTo(false));
         idle.busy(MAX_RETRIES + 1);
         idle.idle();
-        assertState("State with retries > MAX_RETRIES", State.GREEN);
+        assertThat("State with retries > MAX_RETRIES", idle.isIdle(), 
equalTo(true));
         idle.busy(0);
-        assertState("State should not be reset once it reached GREEN", 
State.GREEN);
+        assertThat("State should not be reset once it reached idle", 
idle.isIdle(), equalTo(true));
     }
     
     @Test
     public void testStartIdle() throws InterruptedException {
-        BundleContext context = Mockito.mock(BundleContext.class);
         readyHolder = new AtomicBoolean();
-        idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
-        assertState("Initial state", State.RED);
+        idle = new SubscriberIdle(IDLE_MILLIES, readyHolder);
+        assertThat("Initial state", idle.isIdle(), equalTo(false));
         Thread.sleep(IDLE_MILLIES * 2);
-        assertState("State after time over idle limit", State.GREEN);
+        assertThat("State after time over idle limit", idle.isIdle(), 
equalTo(true));
         idle.close();
     }
-
-    private void assertState(String message, State expectedState) {
-        assertThat(message, idle.getStatus().getState(), 
equalTo(expectedState));
-    }
     
 }

Reply via email to