This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-11914
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit bc1950a84f6df87c6466a5beeb141d5dc6f2ab92
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 27 11:17:25 2023 +0200

    SLING-11914 - Accept initial offset via PingMessage
---
 .../sling/distribution/journal/bookkeeper/BookKeeper.java    | 12 ++++++++++++
 .../journal/impl/subscriber/DistributionSubscriber.java      |  8 +++++++-
 .../distribution/journal/impl/subscriber/SubscriberTest.java |  7 ++++++-
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 0dd7582..76d7b30 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -379,6 +379,18 @@ public class BookKeeper {
         packageRetries.clear(pubAgentName);
     }
 
+    public void handleInitialOffset(long offset) {
+        try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            long currentOffset = loadOffset();
+            if (currentOffset == -1) {
+                storeOffset(resolver, offset);
+                resolver.commit();
+            }
+        } catch (Exception e) {
+            log.warn("Error storing initial offset={}", offset, e);
+        }
+    }
+
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) 
throws DistributionException {
         log.info("Removing failed distribution package {} at offset={}", 
pkgMsg, offset);
         Timer.Context context = 
distributionMetricsService.getRemovedFailedPackageDuration().time();
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index ad2ebdd..2e758a4 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -51,6 +51,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.ImportPostProcessException;
@@ -70,6 +71,7 @@ import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Deci
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.journal.messages.PingMessage;
 import org.apache.sling.distribution.journal.shared.Delay;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.Topics;
@@ -207,7 +209,7 @@ public class DistributionSubscriber {
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
 
         packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
-                HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage));
+                HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage), HandlerAdapter.create(PingMessage.class, 
this::handlePingMessage));
 
         queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", 
subAgentName));
@@ -281,6 +283,10 @@ public class DistributionSubscriber {
         }
     }
 
+    private <T extends Object> void handlePingMessage(MessageInfo info, 
PingMessage message) {
+        bookKeeper.handleInitialOffset(info.getOffset());
+    }
+
     private boolean shouldEnqueue(MessageInfo info, PackageMessage message) {
         if (!queueNames.contains(message.getPubAgentName())) {
             LOG.info("Skipping distribution package {} at offset={} (not 
subscribed)", message, info.getOffset());
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index f200da6..bd19e55 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -87,6 +87,7 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.journal.messages.PingMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.shared.Topics;
@@ -199,6 +200,9 @@ public class SubscriberTest {
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> packageCaptor;
     
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PingMessage>> pingCaptor;
+    
     @Captor
     private ArgumentCaptor<HandlerAdapter<ClearCommand>> commandCaptor;
     
@@ -239,7 +243,8 @@ public class SubscriberTest {
                 Mockito.eq(topics.getPackageTopic()),
                 Mockito.eq(Reset.latest), 
                 Mockito.anyString(),
-                packageCaptor.capture()))
+                packageCaptor.capture(),
+                pingCaptor.capture()))
             .thenReturn(poller);
         
         when(clientProvider.createPoller(

Reply via email to