This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12689-4
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 8babadaf66911eddf4ebe016aa112845d7159686
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Thu Apr 10 15:14:37 2025 +0200

    SLING-12689 - Use events instead of callback
---
 .../journal/bookkeeper/BookKeeper.java             | 16 ++---
 .../journal/bookkeeper/BookKeeperFactory.java      | 13 ++--
 .../journal/impl/events/DistributionEvents.java    | 72 ++++++++++++++++++++++
 .../impl/subscriber/DistributionSubscriber.java    |  7 +--
 .../journal/bookkeeper/BookKeeperTest.java         | 13 +++-
 .../journal/impl/subscriber/SubscriberTest.java    | 14 ++++-
 6 files changed, 111 insertions(+), 24 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index a9d6c29..9acd1c6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -58,6 +58,7 @@ import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.sling.distribution.journal.impl.events.DistributionEvents;
 
 /**
  * Keeps track of offset and processed status and manages 
@@ -107,7 +108,7 @@ public class BookKeeper {
     private final ImportPreProcessor importPreProcessor;
     private final ImportPostProcessor importPostProcessor;
     private final InvalidationProcessor invalidationProcessor;
-    private final DistributionCallback distributionCallback;
+    private final DistributionEvents distributionEvents;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
SubscriberMetrics subscriberMetrics,
@@ -115,8 +116,7 @@ public class BookKeeper {
         BookKeeperConfig config, 
         ImportPreProcessor importPreProcessor, 
         ImportPostProcessor importPostProcessor, 
-        InvalidationProcessor invalidationProcessor,
-        DistributionCallback distributionCallback) {
+        InvalidationProcessor invalidationProcessor) {
        
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
@@ -136,7 +136,7 @@ public class BookKeeper {
         this.importPreProcessor = importPreProcessor;
         this.importPostProcessor = importPostProcessor;
         this.invalidationProcessor = invalidationProcessor;
-        this.distributionCallback = distributionCallback != null ? 
distributionCallback : new NoopDistributionCallback();
+        this.distributionEvents = new DistributionEvents(eventAdmin);
         log.info("Started bookkeeper {}.", config);
     }
     
@@ -182,7 +182,7 @@ public class BookKeeper {
             Duration currentImporturation = 
Duration.ofMillis(System.currentTimeMillis() - importStartTime.getTime());
             log.info("Imported distribution package {} at offset={} took 
importDurationMs={} created={}", pkgMsg, offset, 
currentImporturation.toMillis(), createdTime);
             
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(), 
Status.IMPORTED).increment();
-            distributionCallback.success(pkgMsg, offset, createdTime, 
currentImporturation);
+            distributionEvents.sendSuccessEvent(pkgMsg, offset, createdTime, 
currentImporturation);
         } catch (DistributionException | LoginException | IOException | 
RuntimeException | ImportPreProcessException |ImportPostProcessException e) {
             failure(pkgMsg, offset, createdTime, e);
         } finally {
@@ -274,8 +274,9 @@ public class BookKeeper {
         subscriberMetrics.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
+        packageRetries.increase(pubAgentName);
         int retries = packageRetries.get(pubAgentName);
-        boolean giveUp = errorQueueEnabled && retries >= 
config.getMaxRetries();
+        boolean giveUp = errorQueueEnabled && retries > config.getMaxRetries();
         String retriesSt = errorQueueEnabled ? 
Integer.toString(config.getMaxRetries()) : "infinite";
         String action = giveUp ? "skip the package" : "retry later";
         String msg = format("Failed attempt (%s/%s) to import the distribution 
package %s at offset=%d because of '%s', the importer will %s", retries, 
retriesSt, pkgMsg.toString(false), offset, e.getMessage(), action);
@@ -285,7 +286,7 @@ public class BookKeeper {
         } catch (Exception e2) {
             log.warn("Error sending log message", e2);
         }
-        distributionCallback.failure(pkgMsg, retries, createdTime, retries, 
giveUp, e);
+        distributionEvents.sendFailureEvent(pkgMsg, offset, createdTime, 
retries, giveUp, e);
         if (giveUp) {
             log.warn(msg, e);
             removeFailedPackage(pkgMsg, offset);
@@ -294,7 +295,6 @@ public class BookKeeper {
             if (retries == NUM_ERRORS_BLOCKING) { // Only count after a few 
retries to allow transient errors to recover
                 subscriberMetrics.getBlockingImportErrors().increment();
             }
-            packageRetries.increase(pubAgentName);
             throw new DistributionException(msg, e);
         }
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 6d5b001..4aab597 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -26,13 +26,13 @@ import org.apache.sling.distribution.ImportPostProcessor;
 import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.journal.BinaryStore;
-import org.apache.sling.distribution.journal.DistributionCallback;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.EventAdmin;
+import org.apache.sling.distribution.journal.impl.events.DistributionEvents;
 
 @Component(service = BookKeeperFactory.class)
 public class BookKeeperFactory {
@@ -60,11 +60,9 @@ public class BookKeeperFactory {
     public BookKeeper create(
             DistributionPackageBuilder packageBuilder,
             BookKeeperConfig config,
-            Consumer<PackageStatusMessage> statusSender,
+            Consumer<PackageStatusMessage> sender,
             Consumer<LogMessage> logSender,
-            SubscriberMetrics subscriberMetrics,
-            DistributionCallback distributionCallback
-            ) {
+            SubscriberMetrics subscriberMetrics) {
         ContentPackageExtractor extractor = new ContentPackageExtractor(
                 packaging,
                 subscriberMetrics,
@@ -76,13 +74,12 @@ public class BookKeeperFactory {
                 subscriberMetrics,
                 packageHandler,
                 eventAdmin,
-                statusSender,
+                sender,
                 logSender,
                 config,
                 importPreProcessor,
                 importPostProcessor,
-                invalidationProcessor, 
-                distributionCallback);
+                invalidationProcessor);
     }
 
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java
new file mode 100644
index 0000000..4cad4ea
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.events;
+
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DistributionEvents {
+    public static final String TOPIC_PACKAGE_SUCCESS = 
"org/apache/sling/distribution/journal/PACKAGE_SUCCESS";
+    public static final String TOPIC_PACKAGE_FAILURE = 
"org/apache/sling/distribution/journal/PACKAGE_FAILURE";
+
+    public static final String PROPERTY_PACKAGE_MESSAGE = "package.message";
+    public static final String PROPERTY_OFFSET = "offset";
+    public static final String PROPERTY_CREATED_DATE = "created.date";
+    public static final String PROPERTY_IMPORT_DURATION = "import.duration";
+    public static final String PROPERTY_NUM_RETRIES = "num.retries";
+    public static final String PROPERTY_WILL_DISCARD = "will.discard";
+    public static final String PROPERTY_EXCEPTION = "exception";
+
+    private final EventAdmin eventAdmin;
+
+    public DistributionEvents(EventAdmin eventAdmin) {
+        this.eventAdmin = eventAdmin;
+    }
+
+    public void sendSuccessEvent(PackageMessage packageMessage, long offset, 
Date createdDate, Duration importDuration) {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(PROPERTY_PACKAGE_MESSAGE, packageMessage);
+        properties.put(PROPERTY_OFFSET, offset);
+        properties.put(PROPERTY_CREATED_DATE, createdDate);
+        properties.put(PROPERTY_IMPORT_DURATION, importDuration);
+        
+        Event event = new Event(TOPIC_PACKAGE_SUCCESS, properties);
+        eventAdmin.postEvent(event);
+    }
+
+    public void sendFailureEvent(PackageMessage packageMessage, long offset, 
Date createdDate, int numRetries, 
+            boolean willDiscard, Exception ex) {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put(PROPERTY_PACKAGE_MESSAGE, packageMessage);
+        properties.put(PROPERTY_OFFSET, offset);
+        properties.put(PROPERTY_CREATED_DATE, createdDate);
+        properties.put(PROPERTY_NUM_RETRIES, numRetries);
+        properties.put(PROPERTY_WILL_DISCARD, willDiscard);
+        properties.put(PROPERTY_EXCEPTION, ex);
+        
+        Event event = new Event(TOPIC_PACKAGE_FAILURE, properties);
+        eventAdmin.postEvent(event);
+    }
+} 
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 0e8d389..fcac4f2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -55,7 +55,6 @@ import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.DistributionCallback;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -84,6 +83,7 @@ import org.osgi.service.metatype.annotations.Designate;
 import org.osgi.util.converter.Converters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.sling.distribution.journal.impl.events.DistributionEvents;
 
 /**
  * A Subscriber SCD agent which consumes messages produced by a
@@ -142,8 +142,7 @@ public class DistributionSubscriber {
                    @Reference BookKeeperFactory bookKeeperFactory,
                    @Reference SubscriberReadyStore subscriberReadyStore,
                    @Reference OnlyOnLeader onlyOnLeader,
-                   @Reference DistributionCallback distributionCallback,
-                       SubscriberConfiguration config, BundleContext context, 
Map<String, Object> properties
+                   SubscriberConfiguration config, BundleContext context, 
Map<String, Object> properties
                        ) {
                String subSlingId = requireNonNull(slingSettings.getSlingId());
         subAgentName = requireNotBlank(config.name());
@@ -175,7 +174,7 @@ public class DistributionSubscriber {
                 packageNodeName,
                 commandNodeName,
                 
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
-        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender, this.subscriberMetrics, distributionCallback);
+        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender, this.subscriberMetrics);
         
         if (config.editable()) {
                Consumer<Long> clearHandler = (Long offset) -> {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 557fb50..5d67939 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -108,8 +108,17 @@ public class BookKeeperTest {
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract, "package", "command", true);
         subscriberMetrics = new SubscriberMetrics(metricsService, 
bkConfig.getSubAgentName(), "publish", bkConfig.isEditable());
         DistributionCallback distributionCallback = null;
-               bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, 
packageHandler, eventAdmin, sender, logSender, bkConfig,
-                importPreProcessor, importPostProcessor, 
invalidationProcessor, distributionCallback);
+               bookKeeper = new BookKeeper(
+                resolverFactory,
+                subscriberMetrics,
+                packageHandler,
+                eventAdmin,
+                sender,
+                logSender,
+                bkConfig,
+                importPreProcessor,
+                importPostProcessor,
+                invalidationProcessor);
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 16572fa..1bdbe19 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -446,8 +446,18 @@ public class SubscriberTest {
         props.putAll(overrides);
         SubscriberConfiguration config = 
Converters.standardConverter().convert(props).to(SubscriberConfiguration.class);
         OnlyOnLeader onlyOnLeader = new OnlyOnLeader(context);
-               DistributionCallback distributionCallback = null;
-               subscriber = new DistributionSubscriber(packageBuilder, 
slingSettings, clientProvider, precondition, metricsService, bookKeeperFactory, 
subscriberReadyStore, onlyOnLeader, distributionCallback, config, context, 
props);
+        this.subscriber = new DistributionSubscriber(
+                packageBuilder,
+                slingSettings,
+                clientProvider,
+                precondition,
+                metricsService,
+                bookKeeperFactory,
+                subscriberReadyStore,
+                onlyOnLeader,
+                config,
+                context,
+                props);
         verify(clientProvider).createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
                 Mockito.eq(Reset.latest),

Reply via email to