This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new 5a0e0ee SLING-12024 - Import package if status message for later package alre… (#122) 5a0e0ee is described below commit 5a0e0eee55d2879787b17d650a4469c762840c82 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Wed Sep 20 14:20:51 2023 +0200 SLING-12024 - Import package if status message for later package alre… (#122) * SLING-12024 - Import package if status message for later package already arrived * SLING-12024 - Check for higher status per publish agent * SLING-12024 - Rename variable --- .../impl/precondition/PackageStatusWatcher.java | 27 +++++++-------- .../precondition/PackageStatusWatcherTest.java | 40 +++++++++++++--------- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java index fa202fe..89eee8f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java @@ -24,8 +24,9 @@ import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; import java.io.IOException; import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; @@ -40,13 +41,12 @@ public class PackageStatusWatcher implements Closeable { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final Closeable poller; - private final AtomicLong lowestStatusOffset; // subAgentName -> pkgOffset -> Status - private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new ConcurrentHashMap<>(); + private final Map<String, NavigableMap<Long, Status>> pkgStatusPerSubAgent = new ConcurrentHashMap<>(); + public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) { - this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE); String topicName = topics.getStatusTopic(); poller = messagingProvider.createPoller( @@ -64,23 +64,24 @@ public class PackageStatusWatcher implements Closeable { public PackageStatusMessage.Status getStatus(String subAgentName, long pkgOffset) { Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName); Status status = statusPerAgent.get(pkgOffset); - if (status == null && statusCanNotArriveAnymore(pkgOffset)) { + if (status == null && higherStatusAlreadyArrived(subAgentName, pkgOffset)) { log.info("Considering offset={} imported as status for this package can not arrive anymore.", pkgOffset); return Status.IMPORTED; } return status; } - private boolean statusCanNotArriveAnymore(long pkgOffset) { - return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset < lowestStatusOffset.get(); + private boolean higherStatusAlreadyArrived(String subAgentName, long pkgOffset) { + NavigableMap<Long, Status> pkgStatus = pkgStatusPerSubAgent.get(subAgentName); + return pkgStatus.higherKey(pkgOffset) != null; } private Map<Long, Status> getAgentStatus(String subAgentName) { return pkgStatusPerSubAgent.computeIfAbsent(subAgentName, this::newMap); } - private Map<Long, Status> newMap(String subAgentName) { - return new ConcurrentHashMap<>(); + private NavigableMap<Long, Status> newMap(String subAgentName) { + return new TreeMap<>(); } @Override @@ -89,13 +90,9 @@ public class PackageStatusWatcher implements Closeable { } private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) { - long statusOffset = pkgStatusMsg.getOffset(); - long lowest = lowestStatusOffset.get(); - if (statusOffset < lowest) { - lowestStatusOffset.set(statusOffset); - } + long pkgOffset = pkgStatusMsg.getOffset(); // TODO: check revision Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName()); - agentStatus.put(statusOffset, pkgStatusMsg.getStatus()); + agentStatus.put(pkgOffset, pkgStatusMsg.getStatus()); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java index cf8473e..1af8b6b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java @@ -75,42 +75,50 @@ public class PackageStatusWatcherTest { } - @Test - public void testStatusWatcher() { - - generateMessages(10, 50); - - // If offset is lower than lowest offset we received we assume it to be imported - assertPackageStatus(1000, Status.IMPORTED); + public void testStatusWatcherRemoveFailed() { + generateStatusMessagesFromTo(10, 50, Status.REMOVED_FAILED); - assertPackageStatus(1010, Status.REMOVED_FAILED); - assertPackageStatus(1051, null); + assertPackageStatus("Offset is lower than lowest package offset from status messages. So we assume imported.", 1000, Status.IMPORTED); + assertPackageStatus("We should have got explicit status here", 1010, Status.REMOVED_FAILED); + assertPackageStatus("Status should not yet have arrived", 1051, null); + } + + @Test + public void testStatusWatcherStatusMessageMissing() { + generateStatusMessagesFromTo(1, 1, Status.IMPORTED); + + assertPackageStatus("", 1001, Status.IMPORTED); + assertPackageStatus("This package status should be missing. So publish would wait", 1002, null); + + generateStatusMessagesFromTo(3, 3, Status.IMPORTED); + assertPackageStatus("", 1003, Status.IMPORTED); + assertPackageStatus("As we got a status message for a higher package offset this should allow import now", 1002, Status.IMPORTED); } - void generateMessages(int begin, int end) { + void generateStatusMessagesFromTo(int begin, int end, Status status) { MessageHandler<PackageStatusMessage> handler = adapterCaptor.getValue().getHandler(); - for (int i=begin; i<end; i++) { + for (int i=begin; i<=end; i++) { handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l), - createStatusMessage(i)); + createStatusMessage(i, status)); } } - PackageStatusMessage createStatusMessage(int i) { + PackageStatusMessage createStatusMessage(int i, Status status) { return PackageStatusMessage.builder() .subSlingId(SUB1_SLING_ID) .subAgentName(SUB1_AGENT_NAME) .pubAgentName(PUB1_AGENT_NAME) .offset(1000 + i) - .status(PackageStatusMessage.Status.REMOVED_FAILED) + .status(status) .build(); } - void assertPackageStatus(long pkgOffset, Status expectedStatus) { + void assertPackageStatus(String msg, long pkgOffset, Status expectedStatus) { Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset); - assertEquals(expectedStatus, status); + assertEquals(msg, expectedStatus, status); } }