This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-11914 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit bc1950a84f6df87c6466a5beeb141d5dc6f2ab92 Author: Christian Schneider <[email protected]> AuthorDate: Tue Jun 27 11:17:25 2023 +0200 SLING-11914 - Accept initial offset via PingMessage --- .../sling/distribution/journal/bookkeeper/BookKeeper.java | 12 ++++++++++++ .../journal/impl/subscriber/DistributionSubscriber.java | 8 +++++++- .../distribution/journal/impl/subscriber/SubscriberTest.java | 7 ++++++- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 0dd7582..76d7b30 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -379,6 +379,18 @@ public class BookKeeper { packageRetries.clear(pubAgentName); } + public void handleInitialOffset(long offset) { + try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { + long currentOffset = loadOffset(); + if (currentOffset == -1) { + storeOffset(resolver, offset); + resolver.commit(); + } + } catch (Exception e) { + log.warn("Error storing initial offset={}", offset, e); + } + } + private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException { log.info("Removing failed distribution package {} at offset={}", pkgMsg, offset); Timer.Context context = distributionMetricsService.getRemovedFailedPackageDuration().time(); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index ad2ebdd..2e758a4 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -51,6 +51,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.jackrabbit.util.Text; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; +import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.commons.metrics.Timer; import org.apache.sling.commons.osgi.PropertiesUtil; import org.apache.sling.distribution.ImportPostProcessException; @@ -70,6 +71,7 @@ import org.apache.sling.distribution.journal.impl.precondition.Precondition.Deci import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.journal.messages.PingMessage; import org.apache.sling.distribution.journal.shared.Delay; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.shared.Topics; @@ -207,7 +209,7 @@ public class DistributionSubscriber { String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null; packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign, - HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage)); + HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(PingMessage.class, this::handlePingMessage)); queueThread = startBackgroundThread(this::processQueue, format("Queue Processor for Subscriber agent %s", subAgentName)); @@ -281,6 +283,10 @@ public class DistributionSubscriber { } } + private <T extends Object> void handlePingMessage(MessageInfo info, PingMessage message) { + bookKeeper.handleInitialOffset(info.getOffset()); + } + private boolean shouldEnqueue(MessageInfo info, PackageMessage message) { if (!queueNames.contains(message.getPubAgentName())) { LOG.info("Skipping distribution package {} at offset={} (not subscribed)", message, info.getOffset()); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index f200da6..bd19e55 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -87,6 +87,7 @@ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.journal.messages.PingMessage; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.apache.sling.distribution.journal.shared.Topics; @@ -199,6 +200,9 @@ public class SubscriberTest { @Captor private ArgumentCaptor<HandlerAdapter<PackageMessage>> packageCaptor; + @Captor + private ArgumentCaptor<HandlerAdapter<PingMessage>> pingCaptor; + @Captor private ArgumentCaptor<HandlerAdapter<ClearCommand>> commandCaptor; @@ -239,7 +243,8 @@ public class SubscriberTest { Mockito.eq(topics.getPackageTopic()), Mockito.eq(Reset.latest), Mockito.anyString(), - packageCaptor.capture())) + packageCaptor.capture(), + pingCaptor.capture())) .thenReturn(poller); when(clientProvider.createPoller(
