This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 52ee4fd5eade48b4fd22d841588f3ad23b3e4937 Author: Christian Schneider <[email protected]> AuthorDate: Tue Jun 23 13:10:41 2020 +0200 SLING-9259 - Queue package messages instead of queue items --- .../impl/subscriber/DistributionSubscriber.java | 46 +++++++++------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 94ed9ff..9a5625b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -19,13 +19,9 @@ package org.apache.sling.distribution.journal.impl.subscriber; import static java.lang.String.format; -import static java.util.Collections.emptyMap; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toSet; import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; -import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG; -import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET; -import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP; import java.io.Closeable; import java.io.IOException; @@ -54,19 +50,18 @@ import org.apache.sling.commons.osgi.PropertiesUtil; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.JournalAvailable; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.precondition.Precondition; -import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; -import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.settings.SlingSettingsService; import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceRegistration; @@ -94,8 +89,6 @@ public class DistributionSubscriber { private static final Logger LOG = LoggerFactory.getLogger(DistributionSubscriber.class); - private static final DistributionQueueItem STOPPED_ITEM = new DistributionQueueItem("stop-item", emptyMap()); - @Reference(name = "packageBuilder") private DistributionPackageBuilder packageBuilder; @@ -141,7 +134,7 @@ public class DistributionSubscriber { // Use a bounded internal buffer to allow reading further packages while working // on one at a time - private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new LinkedBlockingQueue<>(8); + private final BlockingQueue<FullMessage<PackageMessage>> messageBuffer = new LinkedBlockingQueue<>(8); private Set<String> queueNames = Collections.emptySet(); @@ -239,13 +232,12 @@ public class DistributionSubscriber { if (isBlocked) { return DistributionAgentState.BLOCKED; } - return queueItemsBuffer.size() > 0 ? DistributionAgentState.RUNNING : DistributionAgentState.IDLE; + return messageBuffer.size() > 0 ? DistributionAgentState.RUNNING : DistributionAgentState.IDLE; } private void handlePackageMessage(MessageInfo info, PackageMessage message) { if (shouldEnqueue(info, message)) { - DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, message, true); - enqueue(queueItem); + enqueue(new FullMessage<PackageMessage>(info, message)); } else { try { bookKeeper.skipPackage(info.getOffset()); @@ -272,10 +264,10 @@ public class DistributionSubscriber { * packages fetched in memory. Note that each queued item contains the binary * package to be imported. */ - private void enqueue(DistributionQueueItem queueItem) { + private void enqueue(FullMessage<PackageMessage> message) { try { while (running) { - if (queueItemsBuffer.offer(queueItem, 1000, TimeUnit.MILLISECONDS)) { + if (messageBuffer.offer(message, 1000, TimeUnit.MILLISECONDS)) { distributionMetricsService.getItemsBufferSize().increment(); return; } @@ -301,13 +293,13 @@ public class DistributionSubscriber { return; } - DistributionQueueItem item = blockingPeekQueueItem(); - if (STOPPED_ITEM == item) { + Optional<FullMessage<PackageMessage>> item = blockingPeekQueueItem(); + if (!item.isPresent()) { return; } try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) { - processQueueItem(item); + processQueueItem(item.get().getInfo(), item.get().getMessage()); } finally { subscriberIdle.ifPresent(SubscriberIdle::idle); } @@ -340,30 +332,30 @@ public class DistributionSubscriber { return running; } - private DistributionQueueItem blockingPeekQueueItem() throws InterruptedException { + private Optional<FullMessage<PackageMessage>> blockingPeekQueueItem() throws InterruptedException { while (running) { - DistributionQueueItem queueItem = queueItemsBuffer.peek(); - if (queueItem != null) { - return queueItem; + FullMessage<PackageMessage> message = messageBuffer.peek(); + if (message != null) { + return Optional.of(message); } else { Thread.sleep(QUEUE_FETCH_DELAY); } } - return STOPPED_ITEM; + return Optional.empty(); } - private void processQueueItem(DistributionQueueItem queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException { - long offset = queueItem.get(RECORD_OFFSET, Long.class); - PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class); + private void processQueueItem(MessageInfo info, PackageMessage queueItem) throws PersistenceException, LoginException, DistributionException, TimeoutException { + long offset = info.getOffset(); + PackageMessage pkgMsg = queueItem; boolean skip = shouldSkip(offset); subscriberIdle.ifPresent(SubscriberIdle::busy); if (skip) { bookKeeper.removePackage(pkgMsg, offset); } else { - long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class); + long createdTime = info.getCreateTime(); bookKeeper.importPackage(pkgMsg, offset, createdTime); } - queueItemsBuffer.remove(); + messageBuffer.remove(); distributionMetricsService.getItemsBufferSize().decrement(); }
