This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new f3dcade  SLING-8415 - throw upon catching interrupted exception
f3dcade is described below

commit f3dcadec8dab17c8d055659dae44705df63dc1c9
Author: tmaret <[email protected]>
AuthorDate: Sun May 12 12:31:57 2019 +0200

    SLING-8415 - throw upon catching interrupted exception
---
 .../impl/subscriber/DistributionSubscriber.java    | 125 ++++++++++-----------
 1 file changed, 59 insertions(+), 66 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 12d1eae..676d150 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -58,7 +58,6 @@ import javax.jcr.Session;
 import javax.jcr.ValueFactory;
 
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.MessagingException;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import 
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -134,8 +133,6 @@ public class DistributionSubscriber implements 
DistributionAgent {
     
     private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = 
Collections.emptySet();
 
-    private static final DistributionQueueItem STOPPED_ITEM = new 
DistributionQueueItem("stop-item", Collections.emptyMap());
-
     @Reference(name = "packageBuilder")
     private DistributionPackageBuilder packageBuilder;
 
@@ -155,10 +152,10 @@ public class DistributionSubscriber implements 
DistributionAgent {
     private EventAdmin eventAdmin;
     
     @Reference
-    JournalAvailable journalAvailable;
+    private JournalAvailable journalAvailable;
 
     @Reference(name = "precondition")
-    Precondition precondition;
+    private Precondition precondition;
 
     @Reference
     private DistributionMetricsService distributionMetricsService;
@@ -202,7 +199,9 @@ public class DistributionSubscriber implements 
DistributionAgent {
 
     private boolean editable;
 
-    volatile boolean running = true;
+    private volatile boolean running = true;
+
+    private volatile Thread queueProcessor;
     
     @Activate
     public void activate(SubscriberConfiguration config, BundleContext 
context, Map<String, Object> properties) {
@@ -275,8 +274,8 @@ public class DistributionSubscriber implements 
DistributionAgent {
         }
 
 
-        startBackgroundThread(this::processQueueItems,
-                format("Package Message Processor for Subscriber agent %s", 
subAgentName));
+        queueProcessor = startBackgroundThread(this::processQueue,
+                format("Queue Processor for Subscriber agent %s", 
subAgentName));
 
         sender = messagingProvider.createSender();
 
@@ -336,6 +335,10 @@ public class DistributionSubscriber implements 
DistributionAgent {
         IOUtils.closeQuietly(packagePoller);
         IOUtils.closeQuietly(commandPoller);
         running = false;
+        Thread interrupter = this.queueProcessor;
+        if (interrupter != null) {
+            interrupter.interrupt();
+        }
         String msg = String.format("Stopped Subscriber agent %s, subscribed to 
Publisher agent names %s with package builder %s",
                 subAgentName,
                 queueNames,
@@ -403,7 +406,11 @@ public class DistributionSubscriber implements 
DistributionAgent {
             return;
         }
         DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info, 
message, true);
-        while (!tryEnqueue(queueItem) && running) {
+        try {
+            enqueue(queueItem);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException();
         }
     }
 
@@ -412,58 +419,61 @@ public class DistributionSubscriber implements 
DistributionAgent {
         * binary packages fetched in memory. Note that each queued item 
contains
         * the binary package to be imported.
      */
-       private boolean tryEnqueue(DistributionQueueItem queueItem) {
-               try {
-                   queueItemsBuffer.put(queueItem);
-            distributionMetricsService.getItemsBufferSize().increment();
-                   return true;
-               } catch (InterruptedException ignore) {
-                       return false;
-               }
+       private void enqueue(DistributionQueueItem queueItem) throws 
InterruptedException {
+        while (running) {
+            if (queueItemsBuffer.offer(queueItem, 1000, 
TimeUnit.MILLISECONDS)) {
+                distributionMetricsService.getItemsBufferSize().increment();
+                return;
+            }
+        }
+        throw new InterruptedException();
        }
 
-    private void processQueueItems() {
-        for (;running;) {
+    private void processQueue() {
+        LOG.info("Started Queue processor");
+        while (! Thread.interrupted()) {
             try {
-                // send status stored in a previous run if exists
-                sendStoredStatus();
-                // We block until an item is available
-                // and then process it
-                DistributionQueueItem item = blockingPeekQueueItem();
-                if (item != STOPPED_ITEM) {
-                    processQueueItem(item);
-                }
-            } catch (Throwable t) {
-                // Catch all to prevent processing from stopping
-                LOG.error("Error processing queue item", t);
-                sleep(RETRY_DELAY);
+                processQueueItems();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
             }
         }
+        LOG.info("Stopped Queue processor");
     }
 
+    private void processQueueItems() throws InterruptedException {
+        try {
+            // send status stored in a previous run if exists
+            try (Timer.Context context = 
distributionMetricsService.getSendStoredStatusDuration().time()) {
+                sendStoredStatus();
+            }
+            // block until an item is available
+            DistributionQueueItem item = blockingPeekQueueItem();
+            // and then process it
+            try (Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time()) {
+                processQueueItem(item);
+            }
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (Throwable t) {
+            // Catch all to prevent processing from stopping
+            LOG.error("Error processing queue item", t);
+            Thread.sleep(RETRY_DELAY);
+        }
+    }
 
-    private DistributionQueueItem blockingPeekQueueItem() {
-        for (;running;) {
+    private DistributionQueueItem blockingPeekQueueItem() throws 
InterruptedException {
+        while (true) {
             DistributionQueueItem queueItem = queueItemsBuffer.peek();
             if (queueItem != null) {
                 return queueItem;
             } else {
-                sleep(QUEUE_FETCH_DELAY);
+                Thread.sleep(QUEUE_FETCH_DELAY);
             }
         }
-        return STOPPED_ITEM;
     }
 
     private void processQueueItem(DistributionQueueItem queueItem) throws 
Exception {
-        Timer.Context context = 
distributionMetricsService.getProcessQueueItemDuration().time();
-        try {
-            timedProcessQueueItem(queueItem);
-        } finally {
-            context.stop();
-        }
-    }
-
-    private void timedProcessQueueItem(DistributionQueueItem queueItem) throws 
Exception {
         long offset = queueItem.get(RECORD_OFFSET, Long.class);
         boolean skip;
         try {
@@ -473,7 +483,7 @@ public class DistributionSubscriber implements 
DistributionAgent {
              * This will occur when the precondition times out.
              */
             LOG.info(e.getMessage());
-            sleep(RETRY_DELAY);
+            Thread.sleep(RETRY_DELAY);
             return;
         }
         PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, 
PackageMessage.class);
@@ -649,28 +659,19 @@ public class DistributionSubscriber implements 
DistributionAgent {
         LOG.info("Stored status {}", s);
     }
 
-    private void sendStoredStatus() {
-        Timer.Context context = 
distributionMetricsService.getSendStoredStatusDuration().time();
-        try {
-            timedSendStoredStatus();
-        } finally {
-            context.stop();
-        }
-    }
-
-    private void timedSendStoredStatus() {
+    private void sendStoredStatus() throws InterruptedException {
         ValueMap status = processedStatuses.load();
         boolean sent = status.get("sent", true);
-        for (int retry = 0 ; running && !sent ; retry++) {
+        for (int retry = 0 ; !sent ; retry++) {
             try {
                 sendStatusMessage(status);
+                markStatusSent();
                 sent = true;
             } catch (Exception e) {
                 LOG.warn("Cannot send status (retry {})", retry, e);
-                sleep(RETRY_SEND_DELAY);
+                Thread.sleep(RETRY_SEND_DELAY);
             }
         }
-        markStatusSent();
     }
 
     private void markStatusSent() {
@@ -696,14 +697,6 @@ public class DistributionSubscriber implements 
DistributionAgent {
         LOG.info("Sent status message {}", status);
     }
 
-    private void sleep(int ms) {
-        try {
-            Thread.sleep(ms);
-        } catch (InterruptedException ignore) {
-            // ignore
-        }
-    }
-
     @Nonnull
     private InputStream pkgStream(ResourceResolver resolver, PackageMessage 
pkgMsg) throws DistributionException {
         if (pkgMsg.hasPkgBinary()) {

Reply via email to