This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-12429 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit dd537c6fc7f91d2b3a9e500a4e47067f00c02cad Author: Christian Schneider <[email protected]> AuthorDate: Tue Sep 17 09:18:08 2024 +0200 SLING-12429 - Only use editable queues for computing queue size --- .../journal/impl/publisher/MessagingCacheCallback.java | 6 ++++++ .../journal/impl/subscriber/SubscriberReady.java | 2 +- .../sling/distribution/journal/queue/CacheCallback.java | 2 ++ .../journal/queue/impl/PubQueueProviderImpl.java | 12 ++++++++++-- 4 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java index 341d553..fca05a0 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java @@ -103,6 +103,12 @@ public class MessagingCacheCallback implements CacheCallback { return new QueueState(curOffset, headRetries, maxRetries, clearCallback); } + @Override + public State getState(String pubAgentName, String subAgentId) { + TopologyView view = discoveryService.getTopologyView(); + return view.getState(subAgentId, pubAgentName); + } + private void sendClearCommand(String pubAgentName, AgentId subAgentId, long offset) { ClearCommand command = ClearCommand.builder() .pubAgentName(pubAgentName) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java index bb376da..58977c1 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberReady.java @@ -118,7 +118,7 @@ public class SubscriberReady implements IdleCheck { } private void idleReady() { - ready(String.format("%s ready after being idle for > %d ms", subAgentName, MILLISECONDS.toSeconds(idleMillis))); + ready(String.format("%s ready after being idle for > %d s", subAgentName, MILLISECONDS.toSeconds(idleMillis))); } private void ready(String reason) { diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java index c42306f..215a843 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/CacheCallback.java @@ -24,11 +24,13 @@ import java.util.Set; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.impl.discovery.State; import org.apache.sling.distribution.journal.messages.PackageMessage; public interface CacheCallback { Closeable createConsumer(MessageHandler<PackageMessage> handler); List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException; QueueState getQueueState(String pubAgentName, String subAgentId); + State getState(String pubAgentName, String subAgentId); Set<String> getSubscribedAgentIds(String pubAgentName); } diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java index c2c95fe..100db0d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java @@ -36,6 +36,7 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.impl.discovery.State; import org.apache.sling.distribution.journal.impl.publisher.PackageQueuedNotifier; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; @@ -204,8 +205,9 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable { } + @Override public int getMaxQueueSize(String pubAgentName) { - Optional<Long> minOffset = getMinOffset(pubAgentName); + Optional<Long> minOffset = getMinEditableQueueOffset(pubAgentName); if (minOffset.isPresent()) { return getOffsetQueue(pubAgentName, minOffset.get()).getMinOffsetQueue(minOffset.get()).getSize(); } else { @@ -213,11 +215,17 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable { } } - private Optional<Long> getMinOffset(String pubAgentName) { + private Optional<Long> getMinEditableQueueOffset(String pubAgentName) { return callback.getSubscribedAgentIds(pubAgentName).stream() + .filter(subAgentName -> isEditable(pubAgentName, subAgentName)) .map(subAgentName -> lastProcessedOffset(pubAgentName, subAgentName)) .min(Long::compare); } + + private boolean isEditable(String pubAgentName, String subAgentName) { + State state = callback.getState(pubAgentName, subAgentName); + return state == null ? false : state.isEditable(); + } private long lastProcessedOffset(String pubAgentName, String subAgentName) { return callback.getQueueState(pubAgentName, subAgentName).getLastProcessedOffset();
