This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new d7691ac SLING-12872 - Add metric for distribution readiness check
(#176)
d7691ac is described below
commit d7691ac7b7fd67fbeb1c1b72b9a9ecb1f1cd50a4
Author: Christian Schneider <[email protected]>
AuthorDate: Thu Jul 31 09:01:07 2025 +0200
SLING-12872 - Add metric for distribution readiness check (#176)
* SLING-12872 - Add metric for distribution readiness check
* SLING-12872 - Add tests
* SLING-12872 - Fix issues
---
.../journal/bookkeeper/SubscriberMetrics.java | 18 ++++++++++++++++++
.../impl/subscriber/DistributionSubscriber.java | 6 +++++-
.../journal/impl/subscriber/IdleCheck.java | 1 +
.../journal/impl/subscriber/SubscriberReady.java | 21 +++++++++++++--------
.../distribution/journal/metrics/DefaultTag.java | 4 ++++
.../impl/subscriber/SubscriberReadyTest.java | 6 +++++-
6 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 3b2cf37..21e4e89 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -20,8 +20,11 @@ package org.apache.sling.distribution.journal.bookkeeper;
import static
org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -30,6 +33,7 @@ import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.distribution.journal.impl.subscriber.IdleCheck;
import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.metrics.Tag;
@@ -51,6 +55,9 @@ public class SubscriberMetrics {
// Is the queue editable (true, false)
private static final String TAG_EDITABLE = "editable";
+ // See IdleCheck.ReadyReason
+ private static final String TAG_READY_REASON = "ready_reason";
+
public static final String SUB_COMPONENT =
"distribution.journal.subscriber.";
private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT +
"package_status_count";
@@ -93,10 +100,14 @@ public class SubscriberMetrics {
private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT +
"import_post_process_duration";
private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT
+ "invalidation_process_duration";
private static final String CURRENT_IMPORT_DURATION = SUB_COMPONENT +
"current_import_duration";
+
+ // in seconds
+ private static final String READINESS_DURATION = SUB_COMPONENT +
"readiness_duration";
private static final String FV_MESSAGE_COUNT = SUB_COMPONENT +
"fv_message_count";
private static final String FV_ERROR_COUNT = SUB_COMPONENT +
"fv_error_count";
+
private final MetricsService metricsService;
private final Tag tagSubName;
private final Tag tagPubName;
@@ -240,6 +251,13 @@ public class SubscriberMetrics {
public Timer getInvalidationProcessDuration() {
return
metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
}
+
+ public void readinessDuration(IdleCheck.ReadyReason readyReason, Duration
duration) {
+ List<Tag> tags2 = new ArrayList<>(tags);
+ tags2.add(Tag.of(TAG_READY_REASON, readyReason.name()));
+ Timer timer = metricsService.timer(getMetricName(READINESS_DURATION,
tags2));
+ timer.update(duration.getSeconds(), TimeUnit.SECONDS);
+ }
public Counter getInvalidationProcessSuccess() {
return
metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index fad9825..bda2a83 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -31,6 +31,7 @@ import static
org.apache.sling.distribution.journal.shared.Strings.requireNotBla
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@@ -39,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@@ -65,6 +67,7 @@ import
org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
+import
org.apache.sling.distribution.journal.impl.subscriber.IdleCheck.ReadyReason;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.OffsetMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -150,7 +153,8 @@ public class DistributionSubscriber {
if (config.subscriberIdleCheck()) {
AtomicBoolean readyHolder =
subscriberReadyStore.getReadyHolder(subAgentName);
- idleCheck = new SubscriberReady(subAgentName,
config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(),
readyHolder, System::currentTimeMillis);
+ BiConsumer<ReadyReason, Duration> readyCallback =
subscriberMetrics::readinessDuration;
+ idleCheck = new SubscriberReady(subAgentName,
config.idleMillies(), config.forceReadyMillies(), config.acceptableAgeDiffMs(),
readyHolder, System::currentTimeMillis, readyCallback);
idleReadyCheck = new SubscriberIdleCheck(context, idleCheck,
config.subscriberIdleTags());
} else {
idleCheck = new NoopIdle();
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
index 4a8b894..36a02b0 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/IdleCheck.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import java.io.Closeable;
public interface IdleCheck extends Closeable {
+ enum ReadyReason { IDLE, MAX_RETRIES, LATENCY, FORCE}
/**
* Called when processing of a message starts
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
index 144c216..a466c15 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
@@ -19,13 +19,14 @@
package org.apache.sling.distribution.journal.impl.subscriber;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.MINUTES;
+import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
@@ -58,13 +59,16 @@ public class SubscriberReady implements IdleCheck {
private ScheduledFuture<?> schedule;
private final ScheduledFuture<?> forceShedule;
- public SubscriberReady(String subAgentName, long idleMillis, long
forceIdleMillies, long acceptableAgeDiffMs, AtomicBoolean readyHolder,
Supplier<Long> timeProvider) {
+ private final BiConsumer<ReadyReason, Duration> readyCallback;
+
+ public SubscriberReady(String subAgentName, long idleMillis, long
forceIdleMillies, long acceptableAgeDiffMs, AtomicBoolean readyHolder,
Supplier<Long> timeProvider, BiConsumer<ReadyReason, Duration> readyCallback) {
this.subAgentName = subAgentName;
this.idleMillis = idleMillis;
this.forceIdleMillies = forceIdleMillies;
this.acceptableAgeDiffMs = acceptableAgeDiffMs;
this.isReady = readyHolder;
this.timeProvider = timeProvider;
+ this.readyCallback = readyCallback;
this.startTime = timeProvider.get();
executor = Executors.newScheduledThreadPool(2);
forceShedule = executor.schedule(this::forceIdle, forceIdleMillies,
TimeUnit.MILLISECONDS);
@@ -87,10 +91,10 @@ public class SubscriberReady implements IdleCheck {
cancelSchedule();
long latency = timeProvider.get() - messageCreateTime;
if (latency < acceptableAgeDiffMs) {
- ready(String.format("Package message latency %d s < %d s
acceptable limit", MILLISECONDS.toSeconds(latency),
MILLISECONDS.toSeconds(acceptableAgeDiffMs)));
+ ready(ReadyReason.LATENCY, String.format("Package message latency
%d s < %d s acceptable limit", MILLISECONDS.toSeconds(latency),
MILLISECONDS.toSeconds(acceptableAgeDiffMs)));
}
if (retries > MAX_RETRIES) {
- ready(String.format("Retries %d > %d", retries, MAX_RETRIES));
+ ready(ReadyReason.MAX_RETRIES, String.format("Retries %d > %d",
retries, MAX_RETRIES));
}
}
@@ -108,7 +112,7 @@ public class SubscriberReady implements IdleCheck {
}
private void forceIdle() {
- ready(String.format("Forcing ready after %d s",
MILLISECONDS.toSeconds(forceIdleMillies)));
+ ready(ReadyReason.FORCE, String.format("Forcing ready after %d s",
MILLISECONDS.toSeconds(forceIdleMillies)));
cancelSchedule();
}
@@ -119,13 +123,14 @@ public class SubscriberReady implements IdleCheck {
}
private void idleReady() {
- ready(String.format("%s ready after being idle for > %d s",
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
+ ready(ReadyReason.IDLE, String.format("%s ready after being idle for >
%d s", subAgentName, MILLISECONDS.toSeconds(idleMillis)));
}
- private void ready(String reason) {
+ private void ready(IdleCheck.ReadyReason reason, String reasonSt) {
long readyTime = timeProvider.get();
long timeToIdle = MILLISECONDS.toSeconds(readyTime - startTime);
- log.info("Subscriber becoming ready after timeToIdle={} s.
Reason='{}'", timeToIdle, reason);
+ readyCallback.accept(reason, Duration.ofMillis(timeToIdle));
+ log.info("Subscriber becoming ready after timeToIdle={} s. Reason={},
Details='{}'.", timeToIdle, reason, reasonSt);
isReady.set(true);
cancelSchedule();
forceShedule.cancel(false);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
b/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
index 7fdb242..1394f13 100644
---
a/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
+++
b/src/main/java/org/apache/sling/distribution/journal/metrics/DefaultTag.java
@@ -55,4 +55,8 @@ public class DefaultTag implements Tag {
return Objects.equals(key, other.key) && Objects.equals(value,
other.value);
}
+ @Override
+ public String toString() {
+ return key + ":" + value;
+ }
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
index 5d186d0..0f0104b 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReadyTest.java
@@ -22,9 +22,12 @@ import static
org.apache.sling.distribution.journal.impl.subscriber.SubscriberRe
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import
org.apache.sling.distribution.journal.impl.subscriber.IdleCheck.ReadyReason;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +46,8 @@ public class SubscriberReadyTest {
@Before
public void before() {
- idle = new SubscriberReady("publish_subscriber", IDLE_MILLIES,
FORCE_IDLE_MILLIS, ACCEPTABLE_AGE_DIFF_MS, new AtomicBoolean(),
timeProvider::get);
+ BiConsumer<ReadyReason, Duration> readyCallback = (readyReason,
duration)->{};
+ idle = new SubscriberReady("publish_subscriber", IDLE_MILLIES,
FORCE_IDLE_MILLIS, ACCEPTABLE_AGE_DIFF_MS, new AtomicBoolean(),
timeProvider::get, readyCallback);
}
@After