This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c5f71  SLING-10200 - Provide the importer errors as queue properties 
(#67)
51c5f71 is described below

commit 51c5f71ea6941448341f7cd9c9d41b37aeef6487
Author: Timothee Maret <[email protected]>
AuthorDate: Thu Mar 11 22:27:04 2021 +0100

    SLING-10200 - Provide the importer errors as queue properties (#67)
---
 pom.xml                                            |  2 +-
 .../journal/queue/impl/PubErrQueue.java            |  2 +-
 .../distribution/journal/queue/impl/PubQueue.java  | 10 ++-
 .../queue/impl/PubQueueProviderFactoryImpl.java    |  5 +-
 .../journal/queue/impl/PubQueueProviderImpl.java   |  8 ++-
 .../journal/queue/impl/QueueEntryFactory.java      | 13 ++--
 .../journal/queue/impl/QueueErrors.java            | 74 ++++++++++++++++++++
 .../impl/publisher/DistributionPublisherTest.java  |  4 +-
 .../journal/queue/impl/PubQueueProviderTest.java   |  4 +-
 .../journal/queue/impl/PubQueueTest.java           |  2 +-
 .../journal/queue/impl/QueueErrorsTest.java        | 79 ++++++++++++++++++++++
 11 files changed, 189 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7fe140a..587b7fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.core</artifactId>
-            <version>0.4.0</version>
+            <version>0.4.5-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
index 790acae..3ead8ec 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubErrQueue.java
@@ -55,7 +55,7 @@ public class PubErrQueue implements DistributionQueue {
         this.queueName = requireNonNull(queueName);
         this.agentQueue = requireNonNull(agentQueue);
         this.errorQueue = requireNonNull(errorQueue);
-        this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0);
+        this.entryFactory = new QueueEntryFactory(queueName, queueItem -> 0, 
queueItem -> null);
     }
 
     @Nonnull
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index f8ed0ff..88c0ccd 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -70,14 +70,18 @@ public class PubQueue implements DistributionQueue {
 
        private final QueueEntryFactory entryFactory;
 
+       private final Throwable error;
+
     public PubQueue(String queueName,
                     OffsetQueue<DistributionQueueItem> offsetQueue,
                     int retries,
+                    @Nullable Throwable error,
                     @Nullable ClearCallback clearCallback) {
         this.queueName = Objects.requireNonNull(queueName);
         this.offsetQueue = Objects.requireNonNull(offsetQueue);
         this.retries = retries;
         this.clearCallback = clearCallback;
+        this.error = error;
         if (clearCallback != null) {
             capabilities.add(CLEARABLE);
             /*
@@ -90,7 +94,7 @@ public class PubQueue implements DistributionQueue {
             capabilities.add(REMOVABLE);
         }
 
-        this.entryFactory = new QueueEntryFactory(queueName, this::attempts);
+        this.entryFactory = new QueueEntryFactory(queueName, this::attempts, 
this::error);
         this.headItem = offsetQueue.getHeadItem();
     }
     
@@ -222,6 +226,10 @@ public class PubQueue implements DistributionQueue {
         return queueItem.equals(headItem) ? retries : 0;
     }
 
+    private Throwable error(DistributionQueueItem queueItem) {
+        return queueItem.equals(headItem) ? error : null ;
+    }
+
     private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
         log.info("Clearing up to tail queue entry {}", tailEntryId);
         List<DistributionQueueEntry> removed = new ArrayList<>();
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
index 2549d65..671cf77 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderFactoryImpl.java
@@ -31,6 +31,9 @@ public class PubQueueProviderFactoryImpl implements 
PubQueueProviderFactory {
     
     @Reference
     private EventAdmin eventAdmin;
+
+    @Reference
+    private QueueErrors queueErrors;
     
     private BundleContext context;
 
@@ -40,7 +43,7 @@ public class PubQueueProviderFactoryImpl implements 
PubQueueProviderFactory {
 
     @Override
     public PubQueueProvider create(CacheCallback callback) {
-        return new PubQueueProviderImpl(eventAdmin, callback, context);
+        return new PubQueueProviderImpl(eventAdmin, queueErrors, callback, 
context);
     }
     
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
index a8c400f..20e11f0 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderImpl.java
@@ -65,6 +65,8 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
     
     private volatile PubQueueCache cache; //NOSONAR
 
+    private final QueueErrors queueErrors;
+
     /*
      * (pubAgentName#subAgentId x OffsetQueue)
      */
@@ -72,8 +74,9 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
 
     private ServiceRegistration<?> reg;
 
-    public PubQueueProviderImpl(EventAdmin eventAdmin, CacheCallback callback, 
BundleContext context) {
+    public PubQueueProviderImpl(EventAdmin eventAdmin, QueueErrors 
queueErrors, CacheCallback callback, BundleContext context) {
         this.eventAdmin = eventAdmin;
+        this.queueErrors = queueErrors;
         this.callback = callback;
         cache = newCache();
         startCleanupTask(context);
@@ -158,7 +161,8 @@ public class PubQueueProviderImpl implements 
PubQueueProvider, Runnable {
             }
             long minOffset = state.getLastProcessedOffset() + 1; // Start from 
offset after last processed
             OffsetQueue<DistributionQueueItem> agentQueue = 
getOffsetQueue(pubAgentName, minOffset);
-            return new PubQueue(queueName, 
agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), 
state.getClearCallback());
+            Throwable error = queueErrors.getError(pubAgentName, queueName);
+            return new PubQueue(queueName, 
agentQueue.getMinOffsetQueue(minOffset), state.getHeadRetries(), error, 
state.getClearCallback());
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
index a29ef93..5017238 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueEntryFactory.java
@@ -22,22 +22,24 @@ import static 
org.apache.sling.distribution.queue.DistributionQueueItemState.ERR
 import static 
org.apache.sling.distribution.queue.DistributionQueueItemState.QUEUED;
 
 import java.util.Calendar;
+import java.util.function.Function;
 import java.util.function.ToIntFunction;
 
 import org.apache.sling.distribution.journal.queue.QueueItemFactory;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 
 public class QueueEntryFactory {
 
     private final String queueName;
     private final ToIntFunction<DistributionQueueItem> attemptsCallback;
+    private final Function<DistributionQueueItem, Throwable> errorCallback;
 
-    public QueueEntryFactory(String queueName, 
ToIntFunction<DistributionQueueItem> attemptsCallback) {
+    public QueueEntryFactory(String queueName, 
ToIntFunction<DistributionQueueItem> attemptsCallback, 
Function<DistributionQueueItem, Throwable> errorCallback) {
         this.queueName = queueName;
         this.attemptsCallback = attemptsCallback;
+        this.errorCallback = errorCallback;
     }
 
     public DistributionQueueEntry create(DistributionQueueItem queueItem) {
@@ -51,8 +53,11 @@ public class QueueEntryFactory {
 
     private DistributionQueueItemStatus 
buildQueueItemStatus(DistributionQueueItem queueItem) {
         int attempts = attemptsCallback.applyAsInt(queueItem);
-        DistributionQueueItemState state = (attempts > 0) ? ERROR : QUEUED;
-        return new DistributionQueueItemStatus(itemCalendar(queueItem), state, 
attempts, queueName);
+        Throwable error = errorCallback.apply(queueItem);
+        Calendar entered = itemCalendar(queueItem);
+        return (attempts > 0) ?
+                new DistributionQueueItemStatus(entered, ERROR,  attempts, 
queueName, error) :
+                new DistributionQueueItemStatus(entered, QUEUED, attempts, 
queueName);
     }
 
     private Calendar itemCalendar(DistributionQueueItem queueItem) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
new file mode 100644
index 0000000..437ee8f
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/QueueErrors.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.sling.distribution.journal.messages.LogMessage;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+import static 
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.KEY_MESSAGE;
+import static 
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.TOPIC_DISTRIBUTION_LOG;
+import static org.osgi.service.event.EventConstants.EVENT_TOPIC;
+
+/**
+ * Keeps track of the processing errors per sub agent.
+ */
+@Component(service = { QueueErrors.class, EventHandler.class },
+        property = EVENT_TOPIC + "=" + TOPIC_DISTRIBUTION_LOG)
+public class QueueErrors implements EventHandler {
+
+    /**
+     * pubAgentName -> subAgentId -> Throwable
+     */
+    private final Map<String, Map<String, Throwable>> errors = new 
ConcurrentHashMap<>();
+
+    /**
+     * Return the error raised during the last processing attempt
+     *
+     * @return a {@code Throwable} or {@code null} if the last processing 
attempt did not fail
+     */
+    public Throwable getError(String pubAgentName, String subAgentId) {
+        return errors.computeIfAbsent(pubAgentName, this::newPubAgent)
+                .get(subAgentId);
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        LogMessage msg = (LogMessage) event.getProperty(KEY_MESSAGE);
+        if (msg != null) {
+            String subAgentId = new AgentId(msg.getSubSlingId(), 
msg.getSubAgentName()).getAgentId();
+            errors.computeIfAbsent(msg.getPubAgentName(), this::newPubAgent)
+                    .put(subAgentId, toThrowable(msg));
+        }
+    }
+
+    private Throwable toThrowable(LogMessage msg) {
+        return new Throwable(msg.getMessage(), new 
Throwable(msg.getStacktrace()));
+    }
+
+    private Map<String, Throwable> newPubAgent(String pubAgentName) {
+        return new ConcurrentHashMap<>();
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index 180b8d8..faed812 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -221,7 +221,7 @@ public class DistributionPublisherTest {
     @Test
     public void testGetQueue() throws DistributionException, IOException {
         when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME))
-            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, 
null));
+            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, 
null,null));
         DistributionQueue queue = publisher.getQueue(QUEUE_NAME);
         assertThat(queue, notNullValue());
     }
@@ -229,7 +229,7 @@ public class DistributionPublisherTest {
     @Test
     public void testGetErrorQueue() throws DistributionException, IOException {
         when(pubQueueProvider.getQueue(PUB1AGENT1, QUEUE_NAME + "-error"))
-            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, 
null));
+            .thenReturn(new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, 
null,null));
         DistributionQueue queue = publisher.getQueue(QUEUE_NAME + "-error");
         assertThat(queue, notNullValue());
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index f462f56..85e6d24 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.queue.impl;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -119,7 +120,8 @@ public class PubQueueProviderTest {
                 Mockito.any(Reset.class),
                 statHandlerCaptor.capture()))
         .thenReturn(statPoller);
-        queueProvider = new PubQueueProviderImpl(eventAdmin, callback, 
context);
+        QueueErrors queueErrors = mock(QueueErrors.class);
+        queueProvider = new PubQueueProviderImpl(eventAdmin, queueErrors,  
callback, context);
         handler = handlerCaptor.getValue();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index e540502..820bfff 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -60,7 +60,7 @@ public class PubQueueTest {
     @Before
     public void before () {
         offsetQueue = new OffsetQueueImpl<>();
-        queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback);
+        queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
new file mode 100644
index 0000000..69afd9c
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/QueueErrorsTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.queue.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.sling.distribution.journal.messages.LogMessage;
+import org.apache.sling.distribution.journal.shared.AgentId;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import static 
org.apache.sling.distribution.journal.impl.discovery.DiscoveryService.TOPIC_DISTRIBUTION_LOG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class QueueErrorsTest {
+
+    private QueueErrors queueErrors;
+
+    @Before
+    public void before () {
+        queueErrors = new QueueErrors();
+    }
+
+    @Test
+    public void testEmptyQueueError() {
+        assertNull(queueErrors.getError("anyPubAgentName", "anySubAgentId"));
+    }
+
+    @Test
+    public void testKeepTrackPerPubAgent() {
+        String
+                pubAgentName = "pubName",
+                subAgentName = "subName",
+                subSlingId = UUID.randomUUID().toString(),
+                subAgentId = new AgentId(subSlingId, 
subAgentName).getAgentId(),
+                message = "Failed to process the item xyz",
+                trace = "Failed to process\nRaised here\nfrom here";
+
+        queueErrors.handleEvent(newEvent(pubAgentName, subSlingId, 
subAgentName, message, trace));
+
+        assertNull(queueErrors.getError(pubAgentName + "another-pub-agent", 
subAgentId));
+        assertNull(queueErrors.getError(pubAgentName, subAgentId + 
"another-sub-agent"));
+        Throwable error = queueErrors.getError(pubAgentName, subAgentId);
+        assertNotNull(error);
+        assertEquals(message, error.getMessage());
+    }
+
+    private Event newEvent(String pubAgentName, String subSlingId, String 
subAgentName, String message, String trace) {
+        return newEvent(new LogMessage(pubAgentName, subSlingId, subAgentName, 
message, trace));
+    }
+
+    private Event newEvent(LogMessage msg) {
+        Map<String, Object> props = new HashMap<>();
+        props.put("message", msg);
+        return new Event(TOPIC_DISTRIBUTION_LOG, props);
+    }
+
+}
\ No newline at end of file

Reply via email to