This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new f3dcade SLING-8415 - throw upon catching interrupted exception
f3dcade is described below
commit f3dcadec8dab17c8d055659dae44705df63dc1c9
Author: tmaret <[email protected]>
AuthorDate: Sun May 12 12:31:57 2019 +0200
SLING-8415 - throw upon catching interrupted exception
---
.../impl/subscriber/DistributionSubscriber.java | 125 ++++++++++-----------
1 file changed, 59 insertions(+), 66 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 12d1eae..676d150 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -58,7 +58,6 @@ import javax.jcr.Session;
import javax.jcr.ValueFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.journal.MessagingException;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -134,8 +133,6 @@ public class DistributionSubscriber implements
DistributionAgent {
private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES =
Collections.emptySet();
- private static final DistributionQueueItem STOPPED_ITEM = new
DistributionQueueItem("stop-item", Collections.emptyMap());
-
@Reference(name = "packageBuilder")
private DistributionPackageBuilder packageBuilder;
@@ -155,10 +152,10 @@ public class DistributionSubscriber implements
DistributionAgent {
private EventAdmin eventAdmin;
@Reference
- JournalAvailable journalAvailable;
+ private JournalAvailable journalAvailable;
@Reference(name = "precondition")
- Precondition precondition;
+ private Precondition precondition;
@Reference
private DistributionMetricsService distributionMetricsService;
@@ -202,7 +199,9 @@ public class DistributionSubscriber implements
DistributionAgent {
private boolean editable;
- volatile boolean running = true;
+ private volatile boolean running = true;
+
+ private volatile Thread queueProcessor;
@Activate
public void activate(SubscriberConfiguration config, BundleContext
context, Map<String, Object> properties) {
@@ -275,8 +274,8 @@ public class DistributionSubscriber implements
DistributionAgent {
}
- startBackgroundThread(this::processQueueItems,
- format("Package Message Processor for Subscriber agent %s",
subAgentName));
+ queueProcessor = startBackgroundThread(this::processQueue,
+ format("Queue Processor for Subscriber agent %s",
subAgentName));
sender = messagingProvider.createSender();
@@ -336,6 +335,10 @@ public class DistributionSubscriber implements
DistributionAgent {
IOUtils.closeQuietly(packagePoller);
IOUtils.closeQuietly(commandPoller);
running = false;
+ Thread interrupter = this.queueProcessor;
+ if (interrupter != null) {
+ interrupter.interrupt();
+ }
String msg = String.format("Stopped Subscriber agent %s, subscribed to
Publisher agent names %s with package builder %s",
subAgentName,
queueNames,
@@ -403,7 +406,11 @@ public class DistributionSubscriber implements
DistributionAgent {
return;
}
DistributionQueueItem queueItem = QueueItemFactory.fromPackage(info,
message, true);
- while (!tryEnqueue(queueItem) && running) {
+ try {
+ enqueue(queueItem);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException();
}
}
@@ -412,58 +419,61 @@ public class DistributionSubscriber implements
DistributionAgent {
* binary packages fetched in memory. Note that each queued item
contains
* the binary package to be imported.
*/
- private boolean tryEnqueue(DistributionQueueItem queueItem) {
- try {
- queueItemsBuffer.put(queueItem);
- distributionMetricsService.getItemsBufferSize().increment();
- return true;
- } catch (InterruptedException ignore) {
- return false;
- }
+ private void enqueue(DistributionQueueItem queueItem) throws
InterruptedException {
+ while (running) {
+ if (queueItemsBuffer.offer(queueItem, 1000,
TimeUnit.MILLISECONDS)) {
+ distributionMetricsService.getItemsBufferSize().increment();
+ return;
+ }
+ }
+ throw new InterruptedException();
}
- private void processQueueItems() {
- for (;running;) {
+ private void processQueue() {
+ LOG.info("Started Queue processor");
+ while (! Thread.interrupted()) {
try {
- // send status stored in a previous run if exists
- sendStoredStatus();
- // We block until an item is available
- // and then process it
- DistributionQueueItem item = blockingPeekQueueItem();
- if (item != STOPPED_ITEM) {
- processQueueItem(item);
- }
- } catch (Throwable t) {
- // Catch all to prevent processing from stopping
- LOG.error("Error processing queue item", t);
- sleep(RETRY_DELAY);
+ processQueueItems();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ LOG.info("Stopped Queue processor");
}
+ private void processQueueItems() throws InterruptedException {
+ try {
+ // send status stored in a previous run if exists
+ try (Timer.Context context =
distributionMetricsService.getSendStoredStatusDuration().time()) {
+ sendStoredStatus();
+ }
+ // block until an item is available
+ DistributionQueueItem item = blockingPeekQueueItem();
+ // and then process it
+ try (Timer.Context context =
distributionMetricsService.getProcessQueueItemDuration().time()) {
+ processQueueItem(item);
+ }
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Throwable t) {
+ // Catch all to prevent processing from stopping
+ LOG.error("Error processing queue item", t);
+ Thread.sleep(RETRY_DELAY);
+ }
+ }
- private DistributionQueueItem blockingPeekQueueItem() {
- for (;running;) {
+ private DistributionQueueItem blockingPeekQueueItem() throws
InterruptedException {
+ while (true) {
DistributionQueueItem queueItem = queueItemsBuffer.peek();
if (queueItem != null) {
return queueItem;
} else {
- sleep(QUEUE_FETCH_DELAY);
+ Thread.sleep(QUEUE_FETCH_DELAY);
}
}
- return STOPPED_ITEM;
}
private void processQueueItem(DistributionQueueItem queueItem) throws
Exception {
- Timer.Context context =
distributionMetricsService.getProcessQueueItemDuration().time();
- try {
- timedProcessQueueItem(queueItem);
- } finally {
- context.stop();
- }
- }
-
- private void timedProcessQueueItem(DistributionQueueItem queueItem) throws
Exception {
long offset = queueItem.get(RECORD_OFFSET, Long.class);
boolean skip;
try {
@@ -473,7 +483,7 @@ public class DistributionSubscriber implements
DistributionAgent {
* This will occur when the precondition times out.
*/
LOG.info(e.getMessage());
- sleep(RETRY_DELAY);
+ Thread.sleep(RETRY_DELAY);
return;
}
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG,
PackageMessage.class);
@@ -649,28 +659,19 @@ public class DistributionSubscriber implements
DistributionAgent {
LOG.info("Stored status {}", s);
}
- private void sendStoredStatus() {
- Timer.Context context =
distributionMetricsService.getSendStoredStatusDuration().time();
- try {
- timedSendStoredStatus();
- } finally {
- context.stop();
- }
- }
-
- private void timedSendStoredStatus() {
+ private void sendStoredStatus() throws InterruptedException {
ValueMap status = processedStatuses.load();
boolean sent = status.get("sent", true);
- for (int retry = 0 ; running && !sent ; retry++) {
+ for (int retry = 0 ; !sent ; retry++) {
try {
sendStatusMessage(status);
+ markStatusSent();
sent = true;
} catch (Exception e) {
LOG.warn("Cannot send status (retry {})", retry, e);
- sleep(RETRY_SEND_DELAY);
+ Thread.sleep(RETRY_SEND_DELAY);
}
}
- markStatusSent();
}
private void markStatusSent() {
@@ -696,14 +697,6 @@ public class DistributionSubscriber implements
DistributionAgent {
LOG.info("Sent status message {}", status);
}
- private void sleep(int ms) {
- try {
- Thread.sleep(ms);
- } catch (InterruptedException ignore) {
- // ignore
- }
- }
-
@Nonnull
private InputStream pkgStream(ResourceResolver resolver, PackageMessage
pkgMsg) throws DistributionException {
if (pkgMsg.hasPkgBinary()) {