This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12429
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit dd537c6fc7f91d2b3a9e500a4e47067f00c02cad
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Sep 17 09:18:08 2024 +0200

    SLING-12429 - Only use editable queues for computing queue size
---
 .../journal/impl/publisher/MessagingCacheCallback.java       |  6 ++++++
 .../journal/impl/subscriber/SubscriberReady.java             |  2 +-
 .../sling/distribution/journal/queue/CacheCallback.java      |  2 ++
 .../journal/queue/impl/PubQueueProviderImpl.java             | 12 ++++++++++--
 4 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 341d553..fca05a0 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -103,6 +103,12 @@ public class MessagingCacheCallback implements 
CacheCallback {
         return new QueueState(curOffset, headRetries, maxRetries, 
clearCallback);
     }
     
+    @Override
+    public State getState(String pubAgentName, String subAgentId) {
+        TopologyView view = discoveryService.getTopologyView();
+        return view.getState(subAgentId, pubAgentName);
+    }
+    
     private void sendClearCommand(String pubAgentName, AgentId subAgentId, 
long offset) {
         ClearCommand command = ClearCommand.builder()
                 .pubAgentName(pubAgentName)
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
index bb376da..58977c1 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
@@ -118,7 +118,7 @@ public class SubscriberReady implements IdleCheck {
     }
 
     private void idleReady() {
-        ready(String.format("%s ready after being idle for > %d ms", 
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
+        ready(String.format("%s ready after being idle for > %d s", 
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
     }
     
     private void ready(String reason) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java 
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
index c42306f..215a843 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
@@ -24,11 +24,13 @@ import java.util.Set;
 
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 
 public interface CacheCallback {
     Closeable createConsumer(MessageHandler<PackageMessage> handler);
     List<FullMessage<PackageMessage>> fetchRange(long minOffset, long 
maxOffset) throws InterruptedException;
     QueueState getQueueState(String pubAgentName, String subAgentId);
+    State getState(String pubAgentName, String subAgentId);
     Set<String> getSubscribedAgentIds(String pubAgentName);
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index c2c95fe..100db0d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -36,6 +36,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import 
org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
@@ -204,8 +205,9 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
     }
     
 
+    @Override
     public int getMaxQueueSize(String pubAgentName) {
-        Optional<Long> minOffset = getMinOffset(pubAgentName);
+        Optional<Long> minOffset = getMinEditableQueueOffset(pubAgentName);
         if (minOffset.isPresent()) {
             return getOffsetQueue(pubAgentName, 
minOffset.get()).getMinOffsetQueue(minOffset.get()).getSize();
         } else {
@@ -213,11 +215,17 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
         }
     }
 
-    private Optional<Long> getMinOffset(String pubAgentName) {
+    private Optional<Long> getMinEditableQueueOffset(String pubAgentName) {
         return callback.getSubscribedAgentIds(pubAgentName).stream()
+            .filter(subAgentName -> isEditable(pubAgentName, subAgentName))
             .map(subAgentName -> lastProcessedOffset(pubAgentName, 
subAgentName))
             .min(Long::compare);
     }
+    
+    private boolean isEditable(String pubAgentName, String subAgentName) {
+        State state = callback.getState(pubAgentName, subAgentName);
+        return state == null ? false : state.isEditable();
+    }
 
     private long lastProcessedOffset(String pubAgentName, String subAgentName) 
{
         return callback.getQueueState(pubAgentName, 
subAgentName).getLastProcessedOffset();

Reply via email to