This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-10200 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 4659682ddb944789df3c448d494db96b0572a13d Author: tmaret <[email protected]> AuthorDate: Thu Mar 11 18:17:37 2021 +0100 SLING-10200 - Provide the importer errors as queue properties --- pom.xml | 2 +- .../journal/queue/impl/PubErrQueue.java | 2 +- .../distribution/journal/queue/impl/PubQueue.java | 10 ++- .../queue/impl/PubQueueProviderFactoryImpl.java | 5 +- .../journal/queue/impl/PubQueueProviderImpl.java | 8 ++- .../journal/queue/impl/QueueEntryFactory.java | 13 ++-- .../journal/queue/impl/QueueErrors.java | 71 ++++++++++++++++++++++ .../impl/publisher/DistributionPublisherTest.java | 4 +- .../journal/queue/impl/PubQueueProviderTest.java | 4 +- .../journal/queue/impl/PubQueueTest.java | 2 +- 10 files changed, 107 insertions(+), 14 deletions(-) diff --git a/pom.xml b/pom.xml index 7fe140a..587b7fb 100644 --- a/pom.xml +++ b/pom.xml @@ -123,7 +123,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.core</artifactId> - <version>0.4.0</version> + <version>0.4.5-SNAPSHOT</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java index 790acae..3ead8ec 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java @@ -55,7 +55,7 @@ public class PubErrQueue implements DistributionQueue { this.queueName = requireNonNull(queueName); this.agentQueue = requireNonNull(agentQueue); this.errorQueue = requireNonNull(errorQueue); - this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0); + this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0, queueItem -> null); } @Nonnull diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java index f8ed0ff..88c0ccd 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java @@ -70,14 +70,18 @@ public class PubQueue implements DistributionQueue { private final QueueEntryFactory entryFactory; + private final Throwable error; + public PubQueue(String queueName, OffsetQueue<DistributionQueueItem> offsetQueue, int retries, + @Nullable Throwable error, @Nullable ClearCallback clearCallback) { this.queueName = Objects.requireNonNull(queueName); this.offsetQueue = Objects.requireNonNull(offsetQueue); this.retries = retries; this.clearCallback = clearCallback; + this.error = error; if (clearCallback != null) { capabilities.add(CLEARABLE); /* @@ -90,7 +94,7 @@ public class PubQueue implements DistributionQueue { capabilities.add(REMOVABLE); } - this.entryFactory = new QueueEntryFactory(queueName, this::attempts); + this.entryFactory = new QueueEntryFactory(queueName, this::attempts, this::error); this.headItem = offsetQueue.getHeadItem(); } @@ -222,6 +226,10 @@ public class PubQueue implements DistributionQueue { return queueItem.equals(headItem) ? retries : 0; } + private Throwable error(DistributionQueueItem queueItem) { + return queueItem.equals(headItem) ? error : null ; + } + private Iterable<DistributionQueueEntry> clear(String tailEntryId) { log.info("Clearing up to tail queue entry {}", tailEntryId); List<DistributionQueueEntry> removed = new ArrayList<>(); diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java index 2549d65..671cf77 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java @@ -31,6 +31,9 @@ public class PubQueueProviderFactoryImpl implements PubQueueProviderFactory { @Reference private EventAdmin eventAdmin; + + @Reference + private QueueErrors queueErrors; private BundleContext context; @@ -40,7 +43,7 @@ public class PubQueueProviderFactoryImpl implements PubQueueProviderFactory { @Override public PubQueueProvider create(CacheCallback callback) { - return new PubQueueProviderImpl(eventAdmin, callback, context); + return new PubQueueProviderImpl(eventAdmin, queueErrors, callback, context); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java index a8c400f..20e11f0 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java @@ -65,6 +65,8 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable { private volatile PubQueueCache cache; //NOSONAR + private final QueueErrors queueErrors; + /* * (pubAgentName#subAgentId x OffsetQueue) */ @@ -72,8 +74,9 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable { private ServiceRegistration<?> reg; - public PubQueueProviderImpl(EventAdmin eventAdmin, CacheCallback callback, BundleContext context) { + public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors queueErrors, CacheCallback callback, BundleContext context) { this.eventAdmin = eventAdmin; + this.queueErrors = queueErrors; this.callback = callback; cache = newCache(); startCleanupTask(context); @@ -158,7 +161,8 @@ public class PubQueueProviderImpl implements PubQueueProvider, Runnable { } long minOffset = state.getLastProcessedOffset() + 1; // Start from offset after last processed OffsetQueue<DistributionQueueItem> agentQueue = getOffsetQueue(pubAgentName, minOffset); - return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), state.getClearCallback()); + Throwable error = queueErrors.getError(pubAgentName, queueName); + return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), error, state.getClearCallback()); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java index a29ef93..5017238 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java @@ -22,22 +22,24 @@ import static org.apache.sling.distribution.queue.DistributionQueueItemState.ERR import static org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED; import java.util.Calendar; +import java.util.function.Function; import java.util.function.ToIntFunction; import org.apache.sling.distribution.journal.queue.QueueItemFactory; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.apache.sling.distribution.queue.DistributionQueueItemState; import org.apache.sling.distribution.queue.DistributionQueueItemStatus; public class QueueEntryFactory { private final String queueName; private final ToIntFunction<DistributionQueueItem> attemptsCallback; + private final Function<DistributionQueueItem, Throwable> errorCallback; - public QueueEntryFactory(String queueName, ToIntFunction<DistributionQueueItem> attemptsCallback) { + public QueueEntryFactory(String queueName, ToIntFunction<DistributionQueueItem> attemptsCallback, Function<DistributionQueueItem, Throwable> errorCallback) { this.queueName = queueName; this.attemptsCallback = attemptsCallback; + this.errorCallback = errorCallback; } public DistributionQueueEntry create(DistributionQueueItem queueItem) { @@ -51,8 +53,11 @@ public class QueueEntryFactory { private DistributionQueueItemStatus buildQueueItemStatus(DistributionQueueItem queueItem) { int attempts = attemptsCallback.applyAsInt(queueItem); - DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED; - return new DistributionQueueItemStatus(itemCalendar(queueItem), state, attempts, queueName); + Throwable error = errorCallback.apply(queueItem); + Calendar entered = itemCalendar(queueItem); + return (attempts > 0) ? + new DistributionQueueItemStatus(entered, ERROR, attempts, queueName, error) : + new DistributionQueueItemStatus(entered, QUEUED, attempts, queueName); } private Calendar itemCalendar(DistributionQueueItem queueItem) { diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java new file mode 100644 index 0000000..d66e9a9 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.queue.impl; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.sling.distribution.journal.messages.LogMessage; +import org.apache.sling.distribution.journal.shared.AgentId; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventHandler; + +import static org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.KEY_MESSAGE; + +/** + * Keeps track of the processing errors per sub agent. + */ +@Component(service = { QueueErrors.class, EventHandler.class }) +public class QueueErrors implements EventHandler { + + /** + * pubAgentName -> subAgentId -> Throwable + */ + private final Map<String, Map<String, Throwable>> errors = new ConcurrentHashMap<>(); + + /** + * Return the error raised during the last processing attempt + * + * @return a {@code Throwable} or {@code null} if the last processing attempt did not fail + */ + public Throwable getError(String pubAgentName, String subAgentId) { + return errors.computeIfAbsent(pubAgentName, this::newPubAgent) + .get(subAgentId); + } + + @Override + public void handleEvent(Event event) { + LogMessage msg = (LogMessage) event.getProperty(KEY_MESSAGE); + if (msg != null) { + String subAgentId = new AgentId(msg.getSubSlingId(), msg.getSubAgentName()).getAgentId(); + errors.computeIfAbsent(msg.getPubAgentName(), this::newPubAgent) + .put(subAgentId, toThrowable(msg)); + } + } + + private Throwable toThrowable(LogMessage msg) { + return new Throwable(msg.getMessage(), new Throwable(msg.getStacktrace())); + } + + private Map<String, Throwable> newPubAgent(String pubAgentName) { + return new ConcurrentHashMap<>(); + } + +} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 180b8d8..faed812 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -221,7 +221,7 @@ public class DistributionPublisherTest { @Test public void testGetQueue() throws DistributionException, IOException { when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME)) - .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null)); + .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null,null)); DistributionQueue queue = publisher.getQueue(QUEUE_NAME); assertThat(queue, notNullValue()); } @@ -229,7 +229,7 @@ public class DistributionPublisherTest { @Test public void testGetErrorQueue() throws DistributionException, IOException { when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME + "-error")) - .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null)); + .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, null,null)); DistributionQueue queue = publisher.getQueue(QUEUE_NAME + "-error"); assertThat(queue, notNullValue()); } diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java index f462f56..85e6d24 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java @@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.queue.impl; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -119,7 +120,8 @@ public class PubQueueProviderTest { Mockito.any(Reset.class), statHandlerCaptor.capture())) .thenReturn(statPoller); - queueProvider = new PubQueueProviderImpl(eventAdmin, callback, context); + QueueErrors queueErrors = mock(QueueErrors.class); + queueProvider = new PubQueueProviderImpl(eventAdmin, queueErrors, callback, context); handler = handlerCaptor.getValue(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java index e540502..820bfff 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java @@ -60,7 +60,7 @@ public class PubQueueTest { @Before public void before () { offsetQueue = new OffsetQueueImpl<>(); - queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback); + queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); } @Test
