This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new d7691ac  SLING-12872 - Add metric for distribution readiness check 
(#176)
d7691ac is described below

commit d7691ac7b7fd67fbeb1c1b72b9a9ecb1f1cd50a4
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jul 31 09:01:07 2025 +0200

    SLING-12872 - Add metric for distribution readiness check (#176)
    
    * SLING-12872 - Add metric for distribution readiness check
    
    * SLING-12872 - Add tests
    
    * SLING-12872 - Fix issues
---
 .../journal/bookkeeper/SubscriberMetrics.java       | 18 ++++++++++++++++++
 .../impl/subscriber/DistributionSubscriber.java     |  6 +++++-
 .../journal/impl/subscriber/IdleCheck.java          |  1 +
 .../journal/impl/subscriber/SubscriberReady.java    | 21 +++++++++++++--------
 .../distribution/journal/metrics/DefaultTag.java    |  4 ++++
 .../impl/subscriber/SubscriberReadyTest.java        |  6 +++++-
 6 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 3b2cf37..21e4e89 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -20,8 +20,11 @@ package org.apache.sling.distribution.journal.bookkeeper;
 
 import static 
org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -30,6 +33,7 @@ import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.journal.impl.subscriber.IdleCheck;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.metrics.Tag;
 
@@ -51,6 +55,9 @@ public class SubscriberMetrics {
     // Is the queue editable (true, false)
     private static final String TAG_EDITABLE = "editable";
     
+    // See IdleCheck.ReadyReason
+    private static final String TAG_READY_REASON = "ready_reason";
+    
     public static final String SUB_COMPONENT = 
"distribution.journal.subscriber.";
     
     private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT + 
"package_status_count";
@@ -93,10 +100,14 @@ public class SubscriberMetrics {
     private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT + 
"import_post_process_duration";
     private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT 
+ "invalidation_process_duration";
     private static final String CURRENT_IMPORT_DURATION = SUB_COMPONENT + 
"current_import_duration";
+    
+    // in seconds
+    private static final String READINESS_DURATION = SUB_COMPONENT + 
"readiness_duration";
 
        private static final String FV_MESSAGE_COUNT = SUB_COMPONENT + 
"fv_message_count";
        private static final String FV_ERROR_COUNT = SUB_COMPONENT + 
"fv_error_count";
 
+
     private final MetricsService metricsService;
     private final Tag tagSubName;
     private final Tag tagPubName;
@@ -240,6 +251,13 @@ public class SubscriberMetrics {
     public Timer getInvalidationProcessDuration() {
         return 
metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
     }
+    
+    public void readinessDuration(IdleCheck.ReadyReason readyReason, Duration 
duration) {
+       List<Tag> tags2 = new ArrayList<>(tags);
+       tags2.add(Tag.of(TAG_READY_REASON, readyReason.name()));
+        Timer timer = metricsService.timer(getMetricName(READINESS_DURATION, 
tags2));
+        timer.update(duration.getSeconds(), TimeUnit.SECONDS);
+    }
 
     public Counter getInvalidationProcessSuccess() {
         return 
metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fad9825..bda2a83 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -31,6 +31,7 @@ import static 
org.apache.sling.distribution.journal.shared.Strings.requireNotBla
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
@@ -39,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
@@ -65,6 +67,7 @@ import 
org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
 import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import 
org.apache.sling.distribution.journal.impl.subscriber.IdleCheck.ReadyReason;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.OffsetMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -150,7 +153,8 @@ public class DistributionSubscriber {
 
         if (config.subscriberIdleCheck()) {
             AtomicBoolean readyHolder = 
subscriberReadyStore.getReadyHolder(subAgentName);
-            idleCheck = new SubscriberReady(subAgentName, 
config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(), 
readyHolder, System::currentTimeMillis);
+            BiConsumer<ReadyReason, Duration> readyCallback = 
subscriberMetrics::readinessDuration;
+                       idleCheck = new SubscriberReady(subAgentName, 
config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(), 
readyHolder, System::currentTimeMillis, readyCallback);
             idleReadyCheck = new SubscriberIdleCheck(context, idleCheck, 
config.subscriberIdleTags());
         } else {
             idleCheck = new NoopIdle();
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
index 4a8b894..36a02b0 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import java.io.Closeable;
 
 public interface IdleCheck extends Closeable {
+       enum ReadyReason { IDLE, MAX_RETRIES, LATENCY, FORCE}
 
     /**
      * Called when processing of a message starts
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
index 144c216..a466c15 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
@@ -19,13 +19,14 @@
 package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.MINUTES;
 
+import java.time.Duration;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
 import java.util.function.Supplier;
 
 import org.slf4j.Logger;
@@ -58,13 +59,16 @@ public class SubscriberReady implements IdleCheck {
     private ScheduledFuture<?> schedule;
     private final ScheduledFuture<?> forceShedule;
 
-    public SubscriberReady(String subAgentName, long idleMillis, long 
forceIdleMillies, long acceptableAgeDiffMs, AtomicBoolean readyHolder, 
Supplier<Long> timeProvider) {
+       private final BiConsumer<ReadyReason, Duration> readyCallback;
+
+    public SubscriberReady(String subAgentName, long idleMillis, long 
forceIdleMillies, long acceptableAgeDiffMs, AtomicBoolean readyHolder, 
Supplier<Long> timeProvider, BiConsumer<ReadyReason, Duration> readyCallback) {
         this.subAgentName = subAgentName;
         this.idleMillis = idleMillis;
         this.forceIdleMillies = forceIdleMillies;
         this.acceptableAgeDiffMs = acceptableAgeDiffMs;
         this.isReady = readyHolder;
         this.timeProvider = timeProvider;
+               this.readyCallback = readyCallback;
         this.startTime = timeProvider.get();
         executor = Executors.newScheduledThreadPool(2);
         forceShedule = executor.schedule(this::forceIdle, forceIdleMillies, 
TimeUnit.MILLISECONDS);
@@ -87,10 +91,10 @@ public class SubscriberReady implements IdleCheck {
         cancelSchedule();
         long latency = timeProvider.get() - messageCreateTime;
         if (latency < acceptableAgeDiffMs) {
-            ready(String.format("Package message latency %d s < %d s 
acceptable limit", MILLISECONDS.toSeconds(latency), 
MILLISECONDS.toSeconds(acceptableAgeDiffMs)));
+            ready(ReadyReason.LATENCY, String.format("Package message latency 
%d s < %d s acceptable limit", MILLISECONDS.toSeconds(latency), 
MILLISECONDS.toSeconds(acceptableAgeDiffMs)));
         }
         if (retries > MAX_RETRIES) {
-            ready(String.format("Retries %d > %d", retries, MAX_RETRIES));
+            ready(ReadyReason.MAX_RETRIES, String.format("Retries %d > %d", 
retries, MAX_RETRIES));
         }
     }
 
@@ -108,7 +112,7 @@ public class SubscriberReady implements IdleCheck {
     }
     
     private void forceIdle() {
-        ready(String.format("Forcing ready after %d s", 
MILLISECONDS.toSeconds(forceIdleMillies)));
+        ready(ReadyReason.FORCE, String.format("Forcing ready after %d s", 
MILLISECONDS.toSeconds(forceIdleMillies)));
         cancelSchedule();
     }
     
@@ -119,13 +123,14 @@ public class SubscriberReady implements IdleCheck {
     }
 
     private void idleReady() {
-        ready(String.format("%s ready after being idle for > %d s", 
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
+        ready(ReadyReason.IDLE, String.format("%s ready after being idle for > 
%d s", subAgentName, MILLISECONDS.toSeconds(idleMillis)));
     }
     
-    private void ready(String reason) {
+    private void ready(IdleCheck.ReadyReason reason, String reasonSt) {
         long readyTime = timeProvider.get();
         long timeToIdle = MILLISECONDS.toSeconds(readyTime - startTime);
-        log.info("Subscriber becoming ready after timeToIdle={} s. 
Reason='{}'", timeToIdle, reason);
+        readyCallback.accept(reason, Duration.ofMillis(timeToIdle));
+        log.info("Subscriber becoming ready after timeToIdle={} s. Reason={}, 
Details='{}'.", timeToIdle, reason, reasonSt);
         isReady.set(true);
         cancelSchedule();
         forceShedule.cancel(false);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java 
b/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
index 7fdb242..1394f13 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
@@ -55,4 +55,8 @@ public class DefaultTag implements Tag {
         return Objects.equals(key, other.key) && Objects.equals(value, 
other.value);
     }
     
+    @Override
+    public String toString() {
+       return key + ":"  + value;
+    }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
index 5d186d0..0f0104b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
@@ -22,9 +22,12 @@ import static 
org.apache.sling.distribution.journal.impl.subscriber.SubscriberRe
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+import java.time.Duration;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 
+import 
org.apache.sling.distribution.journal.impl.subscriber.IdleCheck.ReadyReason;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,7 +46,8 @@ public class SubscriberReadyTest {
 
     @Before
     public void before() {
-        idle = new SubscriberReady("publish_subscriber", IDLE_MILLIES, 
FORCE_IDLE_MILLIS, ACCEPTABLE_AGE_DIFF_MS, new AtomicBoolean(), 
timeProvider::get);
+               BiConsumer<ReadyReason, Duration> readyCallback = (readyReason, 
duration)->{};
+               idle = new SubscriberReady("publish_subscriber", IDLE_MILLIES, 
FORCE_IDLE_MILLIS, ACCEPTABLE_AGE_DIFF_MS, new AtomicBoolean(), 
timeProvider::get, readyCallback);
     }
 
     @After

Reply via email to