This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a0e0ee  SLING-12024 - Import package if status message for later 
package alre… (#122)
5a0e0ee is described below

commit 5a0e0eee55d2879787b17d650a4469c762840c82
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Wed Sep 20 14:20:51 2023 +0200

    SLING-12024 - Import package if status message for later package alre… 
(#122)
    
    * SLING-12024 - Import package if status message for later package already 
arrived
    
    * SLING-12024 - Check for higher status per publish agent
    
    * SLING-12024 - Rename variable
---
 .../impl/precondition/PackageStatusWatcher.java    | 27 +++++++--------
 .../precondition/PackageStatusWatcherTest.java     | 40 +++++++++++++---------
 2 files changed, 36 insertions(+), 31 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index fa202fe..89eee8f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -24,8 +24,9 @@ import static 
org.apache.sling.distribution.journal.HandlerAdapter.create;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -40,13 +41,12 @@ public class PackageStatusWatcher implements Closeable {
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     
     private final Closeable poller;
-    private final AtomicLong lowestStatusOffset;
     
     // subAgentName -> pkgOffset -> Status
-    private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new 
ConcurrentHashMap<>();
+    private final Map<String, NavigableMap<Long, Status>> pkgStatusPerSubAgent 
= new ConcurrentHashMap<>();
+
 
     public PackageStatusWatcher(MessagingProvider messagingProvider, Topics 
topics) {
-        this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE);
         String topicName = topics.getStatusTopic();
 
         poller = messagingProvider.createPoller(
@@ -64,23 +64,24 @@ public class PackageStatusWatcher implements Closeable {
     public PackageStatusMessage.Status getStatus(String subAgentName, long 
pkgOffset) {
         Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
         Status status = statusPerAgent.get(pkgOffset);
-        if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
+        if (status == null && higherStatusAlreadyArrived(subAgentName, 
pkgOffset)) {
             log.info("Considering offset={} imported as status for this 
package can not arrive anymore.", pkgOffset);
             return Status.IMPORTED;
         }
         return status;
     }
 
-    private boolean statusCanNotArriveAnymore(long pkgOffset) {
-        return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset < 
lowestStatusOffset.get();
+    private boolean higherStatusAlreadyArrived(String subAgentName, long 
pkgOffset) {
+        NavigableMap<Long, Status> pkgStatus = 
pkgStatusPerSubAgent.get(subAgentName);
+        return pkgStatus.higherKey(pkgOffset) != null;
     }
 
     private Map<Long, Status> getAgentStatus(String subAgentName) {
         return pkgStatusPerSubAgent.computeIfAbsent(subAgentName, 
this::newMap);
     }
     
-    private Map<Long, Status> newMap(String subAgentName) {
-        return new ConcurrentHashMap<>();
+    private NavigableMap<Long, Status> newMap(String subAgentName) {
+        return new TreeMap<>();
     }
 
     @Override
@@ -89,13 +90,9 @@ public class PackageStatusWatcher implements Closeable {
     }
 
     private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
-        long statusOffset = pkgStatusMsg.getOffset();
-        long lowest = lowestStatusOffset.get();
-        if (statusOffset < lowest) {
-            lowestStatusOffset.set(statusOffset);
-        }
+        long pkgOffset = pkgStatusMsg.getOffset();
         // TODO: check revision
         Map<Long, Status> agentStatus = 
getAgentStatus(pkgStatusMsg.getSubAgentName());
-        agentStatus.put(statusOffset, pkgStatusMsg.getStatus());
+        agentStatus.put(pkgOffset, pkgStatusMsg.getStatus());
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index cf8473e..1af8b6b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -75,42 +75,50 @@ public class PackageStatusWatcherTest {
 
     }
 
-
     @Test
-    public void testStatusWatcher() {
-
-        generateMessages(10, 50);
-
-        // If offset is lower than lowest offset we received we assume it to 
be imported
-        assertPackageStatus(1000, Status.IMPORTED);
+    public void testStatusWatcherRemoveFailed() {
+        generateStatusMessagesFromTo(10, 50, Status.REMOVED_FAILED);
 
-        assertPackageStatus(1010, Status.REMOVED_FAILED);
-        assertPackageStatus(1051, null);
+        assertPackageStatus("Offset is lower than lowest package offset from 
status messages. So we assume imported.", 1000, Status.IMPORTED);
+        assertPackageStatus("We should have got explicit status here", 1010, 
Status.REMOVED_FAILED);
+        assertPackageStatus("Status should not yet have arrived", 1051, null);
+    }
+    
+    @Test
+    public void testStatusWatcherStatusMessageMissing() {
+        generateStatusMessagesFromTo(1, 1, Status.IMPORTED);
+
+        assertPackageStatus("", 1001, Status.IMPORTED);
+        assertPackageStatus("This package status should be missing. So publish 
would wait", 1002, null);
+        
+        generateStatusMessagesFromTo(3, 3, Status.IMPORTED);
+        assertPackageStatus("", 1003, Status.IMPORTED);
+        assertPackageStatus("As we got a status message for a higher package 
offset this should allow import now", 1002, Status.IMPORTED);
     }
 
 
-    void generateMessages(int begin, int end) {
+    void generateStatusMessagesFromTo(int begin, int end, Status status) {
         MessageHandler<PackageStatusMessage> handler = 
adapterCaptor.getValue().getHandler();
-        for (int i=begin; i<end; i++) {
+        for (int i=begin; i<=end; i++) {
             handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l),
-                    createStatusMessage(i));
+                    createStatusMessage(i, status));
         }
     }
 
-    PackageStatusMessage createStatusMessage(int i) {
+    PackageStatusMessage createStatusMessage(int i, Status status) {
         return PackageStatusMessage.builder()
                 .subSlingId(SUB1_SLING_ID)
                 .subAgentName(SUB1_AGENT_NAME)
                 .pubAgentName(PUB1_AGENT_NAME)
                 .offset(1000 + i)
-                .status(PackageStatusMessage.Status.REMOVED_FAILED)
+                .status(status)
                 .build();
 
     }
 
-    void assertPackageStatus(long pkgOffset, Status expectedStatus) {
+    void assertPackageStatus(String msg, long pkgOffset, Status 
expectedStatus) {
         Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset);
-        assertEquals(expectedStatus, status);
+        assertEquals(msg, expectedStatus, status);
     }
 
 }

Reply via email to