This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 51c5f71 SLING-10200 - Provide the importer errors as queue properties
(#67)
51c5f71 is described below
commit 51c5f71ea6941448341f7cd9c9d41b37aeef6487
Author: Timothee Maret <[email protected]>
AuthorDate: Thu Mar 11 22:27:04 2021 +0100
SLING-10200 - Provide the importer errors as queue properties (#67)
---
pom.xml | 2 +-
.../journal/queue/impl/PubErrQueue.java | 2 +-
.../distribution/journal/queue/impl/PubQueue.java | 10 ++-
.../queue/impl/PubQueueProviderFactoryImpl.java | 5 +-
.../journal/queue/impl/PubQueueProviderImpl.java | 8 ++-
.../journal/queue/impl/QueueEntryFactory.java | 13 ++--
.../journal/queue/impl/QueueErrors.java | 74 ++++++++++++++++++++
.../impl/publisher/DistributionPublisherTest.java | 4 +-
.../journal/queue/impl/PubQueueProviderTest.java | 4 +-
.../journal/queue/impl/PubQueueTest.java | 2 +-
.../journal/queue/impl/QueueErrorsTest.java | 79 ++++++++++++++++++++++
11 files changed, 189 insertions(+), 14 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7fe140a..587b7fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.core</artifactId>
- <version>0.4.0</version>
+ <version>0.4.5-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
index 790acae..3ead8ec 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
@@ -55,7 +55,7 @@ public class PubErrQueue implements DistributionQueue {
this.queueName = requireNonNull(queueName);
this.agentQueue = requireNonNull(agentQueue);
this.errorQueue = requireNonNull(errorQueue);
- this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
+ this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0,
queueItem -> null);
}
@Nonnull
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index f8ed0ff..88c0ccd 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -70,14 +70,18 @@ public class PubQueue implements DistributionQueue {
private final QueueEntryFactory entryFactory;
+ private final Throwable error;
+
public PubQueue(String queueName,
OffsetQueue<DistributionQueueItem> offsetQueue,
int retries,
+ @Nullable Throwable error,
@Nullable ClearCallback clearCallback) {
this.queueName = Objects.requireNonNull(queueName);
this.offsetQueue = Objects.requireNonNull(offsetQueue);
this.retries = retries;
this.clearCallback = clearCallback;
+ this.error = error;
if (clearCallback != null) {
capabilities.add(CLEARABLE);
/*
@@ -90,7 +94,7 @@ public class PubQueue implements DistributionQueue {
capabilities.add(REMOVABLE);
}
- this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
+ this.entryFactory = new QueueEntryFactory(queueName, this::attempts,
this::error);
this.headItem = offsetQueue.getHeadItem();
}
@@ -222,6 +226,10 @@ public class PubQueue implements DistributionQueue {
return queueItem.equals(headItem) ? retries : 0;
}
+ private Throwable error(DistributionQueueItem queueItem) {
+ return queueItem.equals(headItem) ? error : null ;
+ }
+
private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
log.info("Clearing up to tail queue entry {}", tailEntryId);
List<DistributionQueueEntry> removed = new ArrayList<>();
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
index 2549d65..671cf77 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
@@ -31,6 +31,9 @@ public class PubQueueProviderFactoryImpl implements
PubQueueProviderFactory {
@Reference
private EventAdmin eventAdmin;
+
+ @Reference
+ private QueueErrors queueErrors;
private BundleContext context;
@@ -40,7 +43,7 @@ public class PubQueueProviderFactoryImpl implements
PubQueueProviderFactory {
@Override
public PubQueueProvider create(CacheCallback callback) {
- return new PubQueueProviderImpl(eventAdmin, callback, context);
+ return new PubQueueProviderImpl(eventAdmin, queueErrors, callback,
context);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index a8c400f..20e11f0 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -65,6 +65,8 @@ public class PubQueueProviderImpl implements
PubQueueProvider, Runnable {
private volatile PubQueueCache cache; //NOSONAR
+ private final QueueErrors queueErrors;
+
/*
* (pubAgentName#subAgentId x OffsetQueue)
*/
@@ -72,8 +74,9 @@ public class PubQueueProviderImpl implements
PubQueueProvider, Runnable {
private ServiceRegistration<?> reg;
- public PubQueueProviderImpl(EventAdmin eventAdmin, CacheCallback callback,
BundleContext context) {
+ public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors
queueErrors, CacheCallback callback, BundleContext context) {
this.eventAdmin = eventAdmin;
+ this.queueErrors = queueErrors;
this.callback = callback;
cache = newCache();
startCleanupTask(context);
@@ -158,7 +161,8 @@ public class PubQueueProviderImpl implements
PubQueueProvider, Runnable {
}
long minOffset = state.getLastProcessedOffset() + 1; // Start from
offset after last processed
OffsetQueue<DistributionQueueItem> agentQueue =
getOffsetQueue(pubAgentName, minOffset);
- return new PubQueue(queueName,
agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(),
state.getClearCallback());
+ Throwable error = queueErrors.getError(pubAgentName, queueName);
+ return new PubQueue(queueName,
agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), error,
state.getClearCallback());
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
index a29ef93..5017238 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
@@ -22,22 +22,24 @@ import static
org.apache.sling.distribution.queue.DistributionQueueItemState.ERR
import static
org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
import java.util.Calendar;
+import java.util.function.Function;
import java.util.function.ToIntFunction;
import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
public class QueueEntryFactory {
private final String queueName;
private final ToIntFunction<DistributionQueueItem> attemptsCallback;
+ private final Function<DistributionQueueItem, Throwable> errorCallback;
- public QueueEntryFactory(String queueName,
ToIntFunction<DistributionQueueItem> attemptsCallback) {
+ public QueueEntryFactory(String queueName,
ToIntFunction<DistributionQueueItem> attemptsCallback,
Function<DistributionQueueItem, Throwable> errorCallback) {
this.queueName = queueName;
this.attemptsCallback = attemptsCallback;
+ this.errorCallback = errorCallback;
}
public DistributionQueueEntry create(DistributionQueueItem queueItem) {
@@ -51,8 +53,11 @@ public class QueueEntryFactory {
private DistributionQueueItemStatus
buildQueueItemStatus(DistributionQueueItem queueItem) {
int attempts = attemptsCallback.applyAsInt(queueItem);
- DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED;
- return new DistributionQueueItemStatus(itemCalendar(queueItem), state,
attempts, queueName);
+ Throwable error = errorCallback.apply(queueItem);
+ Calendar entered = itemCalendar(queueItem);
+ return (attempts > 0) ?
+ new DistributionQueueItemStatus(entered, ERROR, attempts,
queueName, error) :
+ new DistributionQueueItemStatus(entered, QUEUED, attempts,
queueName);
}
private Calendar itemCalendar(DistributionQueueItem queueItem) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
new file mode 100644
index 0000000..437ee8f
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.sling.distribution.journal.messages.LogMessage;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+import static
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.KEY_MESSAGE;
+import static
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.TOPIC_DISTRIBUTION_LOG;
+import static org.osgi.service.event.EventConstants.EVENT_TOPIC;
+
+/**
+ * Keeps track of the processing errors per sub agent.
+ */
+@Component(service = { QueueErrors.class, EventHandler.class },
+ property = EVENT_TOPIC + "=" + TOPIC_DISTRIBUTION_LOG)
+public class QueueErrors implements EventHandler {
+
+ /**
+ * pubAgentName -> subAgentId -> Throwable
+ */
+ private final Map<String, Map<String, Throwable>> errors = new
ConcurrentHashMap<>();
+
+ /**
+ * Return the error raised during the last processing attempt
+ *
+ * @return a {@code Throwable} or {@code null} if the last processing
attempt did not fail
+ */
+ public Throwable getError(String pubAgentName, String subAgentId) {
+ return errors.computeIfAbsent(pubAgentName, this::newPubAgent)
+ .get(subAgentId);
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ LogMessage msg = (LogMessage) event.getProperty(KEY_MESSAGE);
+ if (msg != null) {
+ String subAgentId = new AgentId(msg.getSubSlingId(),
msg.getSubAgentName()).getAgentId();
+ errors.computeIfAbsent(msg.getPubAgentName(), this::newPubAgent)
+ .put(subAgentId, toThrowable(msg));
+ }
+ }
+
+ private Throwable toThrowable(LogMessage msg) {
+ return new Throwable(msg.getMessage(), new
Throwable(msg.getStacktrace()));
+ }
+
+ private Map<String, Throwable> newPubAgent(String pubAgentName) {
+ return new ConcurrentHashMap<>();
+ }
+
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 180b8d8..faed812 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -221,7 +221,7 @@ public class DistributionPublisherTest {
@Test
public void testGetQueue() throws DistributionException, IOException {
when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME))
- .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0,
null));
+ .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0,
null,null));
DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
assertThat(queue, notNullValue());
}
@@ -229,7 +229,7 @@ public class DistributionPublisherTest {
@Test
public void testGetErrorQueue() throws DistributionException, IOException {
when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME + "-error"))
- .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0,
null));
+ .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0,
null,null));
DistributionQueue queue = publisher.getQueue(QUEUE_NAME + "-error");
assertThat(queue, notNullValue());
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index f462f56..85e6d24 100644
---
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.queue.impl;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -119,7 +120,8 @@ public class PubQueueProviderTest {
Mockito.any(Reset.class),
statHandlerCaptor.capture()))
.thenReturn(statPoller);
- queueProvider = new PubQueueProviderImpl(eventAdmin, callback,
context);
+ QueueErrors queueErrors = mock(QueueErrors.class);
+ queueProvider = new PubQueueProviderImpl(eventAdmin, queueErrors,
callback, context);
handler = handlerCaptor.getValue();
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index e540502..820bfff 100644
---
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -60,7 +60,7 @@ public class PubQueueTest {
@Before
public void before () {
offsetQueue = new OffsetQueueImpl<>();
- queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback);
+ queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
}
@Test
diff --git
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
new file mode 100644
index 0000000..69afd9c
--- /dev/null
+++
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.sling.distribution.journal.messages.LogMessage;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import static
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.TOPIC_DISTRIBUTION_LOG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class QueueErrorsTest {
+
+ private QueueErrors queueErrors;
+
+ @Before
+ public void before () {
+ queueErrors = new QueueErrors();
+ }
+
+ @Test
+ public void testEmptyQueueError() {
+ assertNull(queueErrors.getError("anyPubAgentName", "anySubAgentId"));
+ }
+
+ @Test
+ public void testKeepTrackPerPubAgent() {
+ String
+ pubAgentName = "pubName",
+ subAgentName = "subName",
+ subSlingId = UUID.randomUUID().toString(),
+ subAgentId = new AgentId(subSlingId,
subAgentName).getAgentId(),
+ message = "Failed to process the item xyz",
+ trace = "Failed to process\nRaised here\nfrom here";
+
+ queueErrors.handleEvent(newEvent(pubAgentName, subSlingId,
subAgentName, message, trace));
+
+ assertNull(queueErrors.getError(pubAgentName + "another-pub-agent",
subAgentId));
+ assertNull(queueErrors.getError(pubAgentName, subAgentId +
"another-sub-agent"));
+ Throwable error = queueErrors.getError(pubAgentName, subAgentId);
+ assertNotNull(error);
+ assertEquals(message, error.getMessage());
+ }
+
+ private Event newEvent(String pubAgentName, String subSlingId, String
subAgentName, String message, String trace) {
+ return newEvent(new LogMessage(pubAgentName, subSlingId, subAgentName,
message, trace));
+ }
+
+ private Event newEvent(LogMessage msg) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("message", msg);
+ return new Event(TOPIC_DISTRIBUTION_LOG, props);
+ }
+
+}
\ No newline at end of file