This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new c907c57 SLING-12429 - Only use editable queues for computing queue
size (#153)
c907c57 is described below
commit c907c571c307298b459dd274094ee168d6ff2f76
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Nov 26 12:05:45 2024 +0100
SLING-12429 - Only use editable queues for computing queue size (#153)
* SLING-12429 - Only use editable queues for computing queue size
* SLING-12429 - Only use editable queues for computing queue size
---
.../journal/impl/publisher/MessagingCacheCallback.java | 6 ++++++
.../journal/impl/subscriber/SubscriberReady.java | 2 +-
.../sling/distribution/journal/queue/CacheCallback.java | 2 ++
.../journal/queue/impl/PubQueueProviderImpl.java | 12 ++++++++++--
.../journal/queue/impl/PubQueueProviderTest.java | 5 ++++-
5 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
index 341d553..fca05a0 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -103,6 +103,12 @@ public class MessagingCacheCallback implements
CacheCallback {
return new QueueState(curOffset, headRetries, maxRetries,
clearCallback);
}
+ @Override
+ public State getState(String pubAgentName, String subAgentId) {
+ TopologyView view = discoveryService.getTopologyView();
+ return view.getState(subAgentId, pubAgentName);
+ }
+
private void sendClearCommand(String pubAgentName, AgentId subAgentId,
long offset) {
ClearCommand command = ClearCommand.builder()
.pubAgentName(pubAgentName)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
index bb376da..58977c1 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java
@@ -118,7 +118,7 @@ public class SubscriberReady implements IdleCheck {
}
private void idleReady() {
- ready(String.format("%s ready after being idle for > %d ms",
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
+ ready(String.format("%s ready after being idle for > %d s",
subAgentName, MILLISECONDS.toSeconds(idleMillis)));
}
private void ready(String reason) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
index c42306f..215a843 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java
@@ -24,11 +24,13 @@ import java.util.Set;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.journal.messages.PackageMessage;
public interface CacheCallback {
Closeable createConsumer(MessageHandler<PackageMessage> handler);
List<FullMessage<PackageMessage>> fetchRange(long minOffset, long
maxOffset) throws InterruptedException;
QueueState getQueueState(String pubAgentName, String subAgentId);
+ State getState(String pubAgentName, String subAgentId);
Set<String> getSubscribedAgentIds(String pubAgentName);
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index 1747e70..3b2d74c 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -37,6 +37,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.discovery.State;
import
org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
@@ -206,8 +207,9 @@ public class PubQueueProviderImpl implements
PubQueueProvider, Runnable {
}
+ @Override
public int getMaxQueueSize(String pubAgentName) {
- Optional<Long> minOffset = getMinOffset(pubAgentName);
+ Optional<Long> minOffset = getMinEditableQueueOffset(pubAgentName);
if (minOffset.isPresent()) {
return getOffsetQueue(pubAgentName,
minOffset.get()).getMinOffsetQueue(minOffset.get()).getSize();
} else {
@@ -215,11 +217,17 @@ public class PubQueueProviderImpl implements
PubQueueProvider, Runnable {
}
}
- private Optional<Long> getMinOffset(String pubAgentName) {
+ private Optional<Long> getMinEditableQueueOffset(String pubAgentName) {
return callback.getSubscribedAgentIds(pubAgentName).stream()
+ .filter(subAgentName -> isEditable(pubAgentName, subAgentName))
.map(subAgentName -> lastProcessedOffset(pubAgentName,
subAgentName))
.min(Long::compare);
}
+
+ private boolean isEditable(String pubAgentName, String subAgentName) {
+ State state = callback.getState(pubAgentName, subAgentName);
+ return state == null ? false : state.isEditable();
+ }
private long lastProcessedOffset(String pubAgentName, String subAgentName)
{
return callback.getQueueState(pubAgentName,
subAgentName).getLastProcessedOffset();
diff --git
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index 23d66a4..008610e 100644
---
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -48,6 +48,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -172,7 +173,9 @@ public class PubQueueProviderTest {
when(callback.getSubscribedAgentIds(PUB1_AGENT_NAME)).thenReturn(Collections.singleton("sub1"));
when(callback.getQueueState(Mockito.eq(PUB1_AGENT_NAME),
Mockito.any()))
.thenReturn(new QueueState(0, -1, 0, null));
-
+ State state = Mockito.mock(State.class);
+ when(state.isEditable()).thenReturn(true);
+ when(callback.getState(Mockito.eq(PUB1_AGENT_NAME),
Mockito.anyString())).thenReturn(state);
int size = queueProvider.getMaxQueueSize(PUB1_AGENT_NAME);
assertThat(size, equalTo(2));
}