This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12024
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 118df80ef0f38fe041bf648ea0832e3c5ece580f
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Fri Sep 15 15:42:18 2023 +0200

    SLING-12024 - Import package if status message for later package already 
arrived
---
 .../impl/precondition/PackageStatusWatcher.java    | 17 ++++-----
 .../precondition/PackageStatusWatcherTest.java     | 40 +++++++++++++---------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index fa202fe..468444e 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -40,13 +40,14 @@ public class PackageStatusWatcher implements Closeable {
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     
     private final Closeable poller;
-    private final AtomicLong lowestStatusOffset;
+    private final AtomicLong highestStatusOffset;
     
     // subAgentName -> pkgOffset -> Status
     private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new 
ConcurrentHashMap<>();
 
+
     public PackageStatusWatcher(MessagingProvider messagingProvider, Topics 
topics) {
-        this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE);
+        this.highestStatusOffset = new AtomicLong(Long.MIN_VALUE);
         String topicName = topics.getStatusTopic();
 
         poller = messagingProvider.createPoller(
@@ -64,15 +65,15 @@ public class PackageStatusWatcher implements Closeable {
     public PackageStatusMessage.Status getStatus(String subAgentName, long 
pkgOffset) {
         Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
         Status status = statusPerAgent.get(pkgOffset);
-        if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
+        if (status == null && higherStatusAlreadyArrived(pkgOffset)) {
             log.info("Considering offset={} imported as status for this 
package can not arrive anymore.", pkgOffset);
             return Status.IMPORTED;
         }
         return status;
     }
 
-    private boolean statusCanNotArriveAnymore(long pkgOffset) {
-        return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset < 
lowestStatusOffset.get();
+    private boolean higherStatusAlreadyArrived(long pkgOffset) {
+        return pkgOffset < highestStatusOffset.get();
     }
 
     private Map<Long, Status> getAgentStatus(String subAgentName) {
@@ -90,9 +91,9 @@ public class PackageStatusWatcher implements Closeable {
 
     private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
         long statusOffset = pkgStatusMsg.getOffset();
-        long lowest = lowestStatusOffset.get();
-        if (statusOffset < lowest) {
-            lowestStatusOffset.set(statusOffset);
+        long highest = highestStatusOffset.get();
+        if (statusOffset > highest) {
+            highestStatusOffset.set(statusOffset);
         }
         // TODO: check revision
         Map<Long, Status> agentStatus = 
getAgentStatus(pkgStatusMsg.getSubAgentName());
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index cf8473e..1af8b6b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -75,42 +75,50 @@ public class PackageStatusWatcherTest {
 
     }
 
-
     @Test
-    public void testStatusWatcher() {
-
-        generateMessages(10, 50);
-
-        // If offset is lower than lowest offset we received we assume it to 
be imported
-        assertPackageStatus(1000, Status.IMPORTED);
+    public void testStatusWatcherRemoveFailed() {
+        generateStatusMessagesFromTo(10, 50, Status.REMOVED_FAILED);
 
-        assertPackageStatus(1010, Status.REMOVED_FAILED);
-        assertPackageStatus(1051, null);
+        assertPackageStatus("Offset is lower than lowest package offset from 
status messages. So we assume imported.", 1000, Status.IMPORTED);
+        assertPackageStatus("We should have got explicit status here", 1010, 
Status.REMOVED_FAILED);
+        assertPackageStatus("Status should not yet have arrived", 1051, null);
+    }
+    
+    @Test
+    public void testStatusWatcherStatusMessageMissing() {
+        generateStatusMessagesFromTo(1, 1, Status.IMPORTED);
+
+        assertPackageStatus("", 1001, Status.IMPORTED);
+        assertPackageStatus("This package status should be missing. So publish 
would wait", 1002, null);
+        
+        generateStatusMessagesFromTo(3, 3, Status.IMPORTED);
+        assertPackageStatus("", 1003, Status.IMPORTED);
+        assertPackageStatus("As we got a status message for a higher package 
offset this should allow import now", 1002, Status.IMPORTED);
     }
 
 
-    void generateMessages(int begin, int end) {
+    void generateStatusMessagesFromTo(int begin, int end, Status status) {
         MessageHandler<PackageStatusMessage> handler = 
adapterCaptor.getValue().getHandler();
-        for (int i=begin; i<end; i++) {
+        for (int i=begin; i<=end; i++) {
             handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l),
-                    createStatusMessage(i));
+                    createStatusMessage(i, status));
         }
     }
 
-    PackageStatusMessage createStatusMessage(int i) {
+    PackageStatusMessage createStatusMessage(int i, Status status) {
         return PackageStatusMessage.builder()
                 .subSlingId(SUB1_SLING_ID)
                 .subAgentName(SUB1_AGENT_NAME)
                 .pubAgentName(PUB1_AGENT_NAME)
                 .offset(1000 + i)
-                .status(PackageStatusMessage.Status.REMOVED_FAILED)
+                .status(status)
                 .build();
 
     }
 
-    void assertPackageStatus(long pkgOffset, Status expectedStatus) {
+    void assertPackageStatus(String msg, long pkgOffset, Status 
expectedStatus) {
         Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset);
-        assertEquals(expectedStatus, status);
+        assertEquals(msg, expectedStatus, status);
     }
 
 }

Reply via email to