This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-12024 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 118df80ef0f38fe041bf648ea0832e3c5ece580f Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Fri Sep 15 15:42:18 2023 +0200 SLING-12024 - Import package if status message for later package already arrived --- .../impl/precondition/PackageStatusWatcher.java | 17 ++++----- .../precondition/PackageStatusWatcherTest.java | 40 +++++++++++++--------- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java index fa202fe..468444e 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java @@ -40,13 +40,14 @@ public class PackageStatusWatcher implements Closeable { private final Logger log = LoggerFactory.getLogger(this.getClass()); private final Closeable poller; - private final AtomicLong lowestStatusOffset; + private final AtomicLong highestStatusOffset; // subAgentName -> pkgOffset -> Status private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new ConcurrentHashMap<>(); + public PackageStatusWatcher(MessagingProvider messagingProvider, Topics topics) { - this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE); + this.highestStatusOffset = new AtomicLong(Long.MIN_VALUE); String topicName = topics.getStatusTopic(); poller = messagingProvider.createPoller( @@ -64,15 +65,15 @@ public class PackageStatusWatcher implements Closeable { public PackageStatusMessage.Status getStatus(String subAgentName, long pkgOffset) { Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName); Status status = statusPerAgent.get(pkgOffset); - if (status == null && statusCanNotArriveAnymore(pkgOffset)) { + if (status == null && higherStatusAlreadyArrived(pkgOffset)) { log.info("Considering offset={} imported as status for this package can not arrive anymore.", pkgOffset); return Status.IMPORTED; } return status; } - private boolean statusCanNotArriveAnymore(long pkgOffset) { - return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset < lowestStatusOffset.get(); + private boolean higherStatusAlreadyArrived(long pkgOffset) { + return pkgOffset < highestStatusOffset.get(); } private Map<Long, Status> getAgentStatus(String subAgentName) { @@ -90,9 +91,9 @@ public class PackageStatusWatcher implements Closeable { private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) { long statusOffset = pkgStatusMsg.getOffset(); - long lowest = lowestStatusOffset.get(); - if (statusOffset < lowest) { - lowestStatusOffset.set(statusOffset); + long highest = highestStatusOffset.get(); + if (statusOffset > highest) { + highestStatusOffset.set(statusOffset); } // TODO: check revision Map<Long, Status> agentStatus = getAgentStatus(pkgStatusMsg.getSubAgentName()); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java index cf8473e..1af8b6b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java @@ -75,42 +75,50 @@ public class PackageStatusWatcherTest { } - @Test - public void testStatusWatcher() { - - generateMessages(10, 50); - - // If offset is lower than lowest offset we received we assume it to be imported - assertPackageStatus(1000, Status.IMPORTED); + public void testStatusWatcherRemoveFailed() { + generateStatusMessagesFromTo(10, 50, Status.REMOVED_FAILED); - assertPackageStatus(1010, Status.REMOVED_FAILED); - assertPackageStatus(1051, null); + assertPackageStatus("Offset is lower than lowest package offset from status messages. So we assume imported.", 1000, Status.IMPORTED); + assertPackageStatus("We should have got explicit status here", 1010, Status.REMOVED_FAILED); + assertPackageStatus("Status should not yet have arrived", 1051, null); + } + + @Test + public void testStatusWatcherStatusMessageMissing() { + generateStatusMessagesFromTo(1, 1, Status.IMPORTED); + + assertPackageStatus("", 1001, Status.IMPORTED); + assertPackageStatus("This package status should be missing. So publish would wait", 1002, null); + + generateStatusMessagesFromTo(3, 3, Status.IMPORTED); + assertPackageStatus("", 1003, Status.IMPORTED); + assertPackageStatus("As we got a status message for a higher package offset this should allow import now", 1002, Status.IMPORTED); } - void generateMessages(int begin, int end) { + void generateStatusMessagesFromTo(int begin, int end, Status status) { MessageHandler<PackageStatusMessage> handler = adapterCaptor.getValue().getHandler(); - for (int i=begin; i<end; i++) { + for (int i=begin; i<=end; i++) { handler.handle(new TestMessageInfo(TOPIC_NAME, 0, i, 0l), - createStatusMessage(i)); + createStatusMessage(i, status)); } } - PackageStatusMessage createStatusMessage(int i) { + PackageStatusMessage createStatusMessage(int i, Status status) { return PackageStatusMessage.builder() .subSlingId(SUB1_SLING_ID) .subAgentName(SUB1_AGENT_NAME) .pubAgentName(PUB1_AGENT_NAME) .offset(1000 + i) - .status(PackageStatusMessage.Status.REMOVED_FAILED) + .status(status) .build(); } - void assertPackageStatus(long pkgOffset, Status expectedStatus) { + void assertPackageStatus(String msg, long pkgOffset, Status expectedStatus) { Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset); - assertEquals(expectedStatus, status); + assertEquals(msg, expectedStatus, status); } }