This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new c907c57  SLING-12429 - Only use editable queues for computing queue 
size (#153)
c907c57 is described below

commit c907c571c307298b459dd274094ee168d6ff2f76
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Nov 26 12:05:45 2024 +0100

    SLING-12429 - Only use editable queues for computing queue size (#153)
    
    * SLING-12429 - Only use editable queues for computing queue size
    
    * SLING-12429 - Only use editable queues for computing queue size
---
 .../journal/impl/publisher/MessagingCacheCallback.java       |  6 ++++++
 .../journal/impl/subscriber/SubscriberReady.java             |  2 +-
 .../sling/distribution/journal/queue/CacheCallback.java      |  2 ++
 .../journal/queue/impl/PubQueueProviderImpl.java             | 12 ++++++++++--
 .../journal/queue/impl/PubQueueProviderTest.java             |  5 ++++-
 5 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 341d553..fca05a0 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -103,6 +103,12 @@ public class MessagingCacheCallback implements 
CacheCallback {
         return new QueueState(curOffset, headRetries, maxRetries, 
clearCallback);
     }
     
+    @Override
+    public State getState(String pubAgentName, String subAgentId) {
+        TopologyView view = discoveryService.getTopologyView();
+        return view.getState(subAgentId, pubAgentName);
+    }
+    
     private void sendClearCommand(String pubAgentName, AgentId subAgentId, 
long offset) {
         ClearCommand command = ClearCommand.builder()
                 .pubAgentName(pubAgentName)
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
index bb376da..58977c1 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
@@ -118,7 +118,7 @@ public class SubscriberReady implements IdleCheck {
     }
 
     private void idleReady() {
-        ready(String.format("%s ready after being idle for > %d ms", 
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
+        ready(String.format("%s ready after being idle for > %d s", 
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
     }
     
     private void ready(String reason) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java 
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
index c42306f..215a843 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
@@ -24,11 +24,13 @@ import java.util.Set;
 
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 
 public interface CacheCallback {
     Closeable createConsumer(MessageHandler<PackageMessage> handler);
     List<FullMessage<PackageMessage>> fetchRange(long minOffset, long 
maxOffset) throws InterruptedException;
     QueueState getQueueState(String pubAgentName, String subAgentId);
+    State getState(String pubAgentName, String subAgentId);
     Set<String> getSubscribedAgentIds(String pubAgentName);
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index 1747e70..3b2d74c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -37,6 +37,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import 
org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
@@ -206,8 +207,9 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
     }
     
 
+    @Override
     public int getMaxQueueSize(String pubAgentName) {
-        Optional<Long> minOffset = getMinOffset(pubAgentName);
+        Optional<Long> minOffset = getMinEditableQueueOffset(pubAgentName);
         if (minOffset.isPresent()) {
             return getOffsetQueue(pubAgentName, 
minOffset.get()).getMinOffsetQueue(minOffset.get()).getSize();
         } else {
@@ -215,11 +217,17 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
         }
     }
 
-    private Optional<Long> getMinOffset(String pubAgentName) {
+    private Optional<Long> getMinEditableQueueOffset(String pubAgentName) {
         return callback.getSubscribedAgentIds(pubAgentName).stream()
+            .filter(subAgentName -> isEditable(pubAgentName, subAgentName))
             .map(subAgentName -> lastProcessedOffset(pubAgentName, 
subAgentName))
             .min(Long::compare);
     }
+    
+    private boolean isEditable(String pubAgentName, String subAgentName) {
+        State state = callback.getState(pubAgentName, subAgentName);
+        return state == null ? false : state.isEditable();
+    }
 
     private long lastProcessedOffset(String pubAgentName, String subAgentName) 
{
         return callback.getQueueState(pubAgentName, 
subAgentName).getLastProcessedOffset();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index 23d66a4..008610e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -48,6 +48,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.State;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -172,7 +173,9 @@ public class PubQueueProviderTest {
         
when(callback.getSubscribedAgentIds(PUB1_AGENT_NAME)).thenReturn(Collections.singleton("sub1"));
         when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME), 
Mockito.any()))
             .thenReturn(new QueueState(0, -1, 0, null));
-
+        State state = Mockito.mock(State.class);
+        when(state.isEditable()).thenReturn(true);
+        when(callback.getState(Mockito.eq(PUB1_AGENT_NAME), 
Mockito.anyString())).thenReturn(state);
         int size = queueProvider.getMaxQueueSize(PUB1_AGENT_NAME);
         assertThat(size, equalTo(2));
     }

Reply via email to