This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 52ee4fd5eade48b4fd22d841588f3ad23b3e4937
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 23 13:10:41 2020 +0200

    SLING-9259 - Queue package messages instead of queue items
---
 .../impl/subscriber/DistributionSubscriber.java    | 46 +++++++++-------------
 1 file changed, 19 insertions(+), 27 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 94ed9ff..9a5625b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -19,13 +19,9 @@
 package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static java.lang.String.format;
-import static java.util.Collections.emptyMap;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.PACKAGE_MSG;
-import static 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
-import static 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -54,19 +50,18 @@ import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
-import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -94,8 +89,6 @@ public class DistributionSubscriber {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DistributionSubscriber.class);
 
-    private static final DistributionQueueItem STOPPED_ITEM = new 
DistributionQueueItem("stop-item", emptyMap());
-
     @Reference(name = "packageBuilder")
     private DistributionPackageBuilder packageBuilder;
 
@@ -141,7 +134,7 @@ public class DistributionSubscriber {
 
     // Use a bounded internal buffer to allow reading further packages while 
working
     // on one at a time
-    private final BlockingQueue<DistributionQueueItem> queueItemsBuffer = new 
LinkedBlockingQueue<>(8);
+    private final BlockingQueue<FullMessage<PackageMessage>> messageBuffer = 
new LinkedBlockingQueue<>(8);
 
     private Set<String> queueNames = Collections.emptySet();
 
@@ -239,13 +232,12 @@ public class DistributionSubscriber {
         if (isBlocked) {
             return DistributionAgentState.BLOCKED;
         }
-        return queueItemsBuffer.size() > 0 ? DistributionAgentState.RUNNING : 
DistributionAgentState.IDLE;
+        return messageBuffer.size() > 0 ? DistributionAgentState.RUNNING : 
DistributionAgentState.IDLE;
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
         if (shouldEnqueue(info, message)) {
-            DistributionQueueItem queueItem = 
QueueItemFactory.fromPackage(info, message, true);
-            enqueue(queueItem);
+            enqueue(new FullMessage<PackageMessage>(info, message));
         } else {
             try {
                 bookKeeper.skipPackage(info.getOffset());
@@ -272,10 +264,10 @@ public class DistributionSubscriber {
      * packages fetched in memory. Note that each queued item contains the 
binary
      * package to be imported.
      */
-    private void enqueue(DistributionQueueItem queueItem) {
+    private void enqueue(FullMessage<PackageMessage> message) {
         try {
             while (running) {
-                if (queueItemsBuffer.offer(queueItem, 1000, 
TimeUnit.MILLISECONDS)) {
+                if (messageBuffer.offer(message, 1000, TimeUnit.MILLISECONDS)) 
{
                     
distributionMetricsService.getItemsBufferSize().increment();
                     return;
                 }
@@ -301,13 +293,13 @@ public class DistributionSubscriber {
                 return;
             }
 
-            DistributionQueueItem item = blockingPeekQueueItem();
-            if (STOPPED_ITEM == item) {
+            Optional<FullMessage<PackageMessage>> item = 
blockingPeekQueueItem();
+            if (!item.isPresent()) {
                 return;
             }
 
             try (Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time()) {
-                processQueueItem(item);
+                processQueueItem(item.get().getInfo(), 
item.get().getMessage());
             } finally {
                 subscriberIdle.ifPresent(SubscriberIdle::idle);
             }
@@ -340,30 +332,30 @@ public class DistributionSubscriber {
         return running;
     }
 
-    private DistributionQueueItem blockingPeekQueueItem() throws 
InterruptedException {
+    private Optional<FullMessage<PackageMessage>> blockingPeekQueueItem() 
throws InterruptedException {
         while (running) {
-            DistributionQueueItem queueItem = queueItemsBuffer.peek();
-            if (queueItem != null) {
-                return queueItem;
+            FullMessage<PackageMessage> message = messageBuffer.peek();
+            if (message != null) {
+                return Optional.of(message);
             } else {
                 Thread.sleep(QUEUE_FETCH_DELAY);
             }
         }
-        return STOPPED_ITEM;
+        return Optional.empty();
     }
 
-    private void processQueueItem(DistributionQueueItem queueItem) throws 
PersistenceException, LoginException, DistributionException, TimeoutException {
-        long offset = queueItem.get(RECORD_OFFSET, Long.class);
-        PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, 
PackageMessage.class);
+    private void processQueueItem(MessageInfo info, PackageMessage queueItem) 
throws PersistenceException, LoginException, DistributionException, 
TimeoutException {
+        long offset = info.getOffset();
+        PackageMessage pkgMsg = queueItem;
         boolean skip = shouldSkip(offset);
         subscriberIdle.ifPresent(SubscriberIdle::busy);
         if (skip) {
             bookKeeper.removePackage(pkgMsg, offset);
         } else {
-            long createdTime = queueItem.get(RECORD_TIMESTAMP, Long.class);
+            long createdTime = info.getCreateTime();
             bookKeeper.importPackage(pkgMsg, offset, createdTime);
         }
-        queueItemsBuffer.remove();
+        messageBuffer.remove();
         distributionMetricsService.getItemsBufferSize().decrement();
     }
 

Reply via email to