This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-12689-4 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 8babadaf66911eddf4ebe016aa112845d7159686 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Thu Apr 10 15:14:37 2025 +0200 SLING-12689 - Use events instead of callback --- .../journal/bookkeeper/BookKeeper.java | 16 ++--- .../journal/bookkeeper/BookKeeperFactory.java | 13 ++-- .../journal/impl/events/DistributionEvents.java | 72 ++++++++++++++++++++++ .../impl/subscriber/DistributionSubscriber.java | 7 +-- .../journal/bookkeeper/BookKeeperTest.java | 13 +++- .../journal/impl/subscriber/SubscriberTest.java | 14 ++++- 6 files changed, 111 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index a9d6c29..9acd1c6 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -58,6 +58,7 @@ import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.sling.distribution.journal.impl.events.DistributionEvents; /** * Keeps track of offset and processed status and manages @@ -107,7 +108,7 @@ public class BookKeeper { private final ImportPreProcessor importPreProcessor; private final ImportPostProcessor importPostProcessor; private final InvalidationProcessor invalidationProcessor; - private final DistributionCallback distributionCallback; + private final DistributionEvents distributionEvents; private int skippedCounter = 0; public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, @@ -115,8 +116,7 @@ public class BookKeeper { BookKeeperConfig config, ImportPreProcessor importPreProcessor, ImportPostProcessor importPostProcessor, - InvalidationProcessor invalidationProcessor, - DistributionCallback distributionCallback) { + InvalidationProcessor invalidationProcessor) { this.packageHandler = packageHandler; this.eventAdmin = eventAdmin; @@ -136,7 +136,7 @@ public class BookKeeper { this.importPreProcessor = importPreProcessor; this.importPostProcessor = importPostProcessor; this.invalidationProcessor = invalidationProcessor; - this.distributionCallback = distributionCallback != null ? distributionCallback : new NoopDistributionCallback(); + this.distributionEvents = new DistributionEvents(eventAdmin); log.info("Started bookkeeper {}.", config); } @@ -182,7 +182,7 @@ public class BookKeeper { Duration currentImporturation = Duration.ofMillis(System.currentTimeMillis() - importStartTime.getTime()); log.info("Imported distribution package {} at offset={} took importDurationMs={} created={}", pkgMsg, offset, currentImporturation.toMillis(), createdTime); subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(), Status.IMPORTED).increment(); - distributionCallback.success(pkgMsg, offset, createdTime, currentImporturation); + distributionEvents.sendSuccessEvent(pkgMsg, offset, createdTime, currentImporturation); } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPreProcessException |ImportPostProcessException e) { failure(pkgMsg, offset, createdTime, e); } finally { @@ -274,8 +274,9 @@ public class BookKeeper { subscriberMetrics.getFailedPackageImports().mark(); String pubAgentName = pkgMsg.getPubAgentName(); + packageRetries.increase(pubAgentName); int retries = packageRetries.get(pubAgentName); - boolean giveUp = errorQueueEnabled && retries >= config.getMaxRetries(); + boolean giveUp = errorQueueEnabled && retries > config.getMaxRetries(); String retriesSt = errorQueueEnabled ? Integer.toString(config.getMaxRetries()) : "infinite"; String action = giveUp ? "skip the package" : "retry later"; String msg = format("Failed attempt (%s/%s) to import the distribution package %s at offset=%d because of '%s', the importer will %s", retries, retriesSt, pkgMsg.toString(false), offset, e.getMessage(), action); @@ -285,7 +286,7 @@ public class BookKeeper { } catch (Exception e2) { log.warn("Error sending log message", e2); } - distributionCallback.failure(pkgMsg, retries, createdTime, retries, giveUp, e); + distributionEvents.sendFailureEvent(pkgMsg, offset, createdTime, retries, giveUp, e); if (giveUp) { log.warn(msg, e); removeFailedPackage(pkgMsg, offset); @@ -294,7 +295,6 @@ public class BookKeeper { if (retries == NUM_ERRORS_BLOCKING) { // Only count after a few retries to allow transient errors to recover subscriberMetrics.getBlockingImportErrors().increment(); } - packageRetries.increase(pubAgentName); throw new DistributionException(msg, e); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java index 6d5b001..4aab597 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java @@ -26,13 +26,13 @@ import org.apache.sling.distribution.ImportPostProcessor; import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.journal.BinaryStore; -import org.apache.sling.distribution.journal.DistributionCallback; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; import org.osgi.service.event.EventAdmin; +import org.apache.sling.distribution.journal.impl.events.DistributionEvents; @Component(service = BookKeeperFactory.class) public class BookKeeperFactory { @@ -60,11 +60,9 @@ public class BookKeeperFactory { public BookKeeper create( DistributionPackageBuilder packageBuilder, BookKeeperConfig config, - Consumer<PackageStatusMessage> statusSender, + Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, - SubscriberMetrics subscriberMetrics, - DistributionCallback distributionCallback - ) { + SubscriberMetrics subscriberMetrics) { ContentPackageExtractor extractor = new ContentPackageExtractor( packaging, subscriberMetrics, @@ -76,13 +74,12 @@ public class BookKeeperFactory { subscriberMetrics, packageHandler, eventAdmin, - statusSender, + sender, logSender, config, importPreProcessor, importPostProcessor, - invalidationProcessor, - distributionCallback); + invalidationProcessor); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java b/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java new file mode 100644 index 0000000..4cad4ea --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/events/DistributionEvents.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.events; + +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; + +import java.time.Duration; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class DistributionEvents { + public static final String TOPIC_PACKAGE_SUCCESS = "org/apache/sling/distribution/journal/PACKAGE_SUCCESS"; + public static final String TOPIC_PACKAGE_FAILURE = "org/apache/sling/distribution/journal/PACKAGE_FAILURE"; + + public static final String PROPERTY_PACKAGE_MESSAGE = "package.message"; + public static final String PROPERTY_OFFSET = "offset"; + public static final String PROPERTY_CREATED_DATE = "created.date"; + public static final String PROPERTY_IMPORT_DURATION = "import.duration"; + public static final String PROPERTY_NUM_RETRIES = "num.retries"; + public static final String PROPERTY_WILL_DISCARD = "will.discard"; + public static final String PROPERTY_EXCEPTION = "exception"; + + private final EventAdmin eventAdmin; + + public DistributionEvents(EventAdmin eventAdmin) { + this.eventAdmin = eventAdmin; + } + + public void sendSuccessEvent(PackageMessage packageMessage, long offset, Date createdDate, Duration importDuration) { + Map<String, Object> properties = new HashMap<>(); + properties.put(PROPERTY_PACKAGE_MESSAGE, packageMessage); + properties.put(PROPERTY_OFFSET, offset); + properties.put(PROPERTY_CREATED_DATE, createdDate); + properties.put(PROPERTY_IMPORT_DURATION, importDuration); + + Event event = new Event(TOPIC_PACKAGE_SUCCESS, properties); + eventAdmin.postEvent(event); + } + + public void sendFailureEvent(PackageMessage packageMessage, long offset, Date createdDate, int numRetries, + boolean willDiscard, Exception ex) { + Map<String, Object> properties = new HashMap<>(); + properties.put(PROPERTY_PACKAGE_MESSAGE, packageMessage); + properties.put(PROPERTY_OFFSET, offset); + properties.put(PROPERTY_CREATED_DATE, createdDate); + properties.put(PROPERTY_NUM_RETRIES, numRetries); + properties.put(PROPERTY_WILL_DISCARD, willDiscard); + properties.put(PROPERTY_EXCEPTION, ex); + + Event event = new Event(TOPIC_PACKAGE_FAILURE, properties); + eventAdmin.postEvent(event); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index 0e8d389..fcac4f2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -55,7 +55,6 @@ import org.apache.sling.commons.metrics.Timer; import org.apache.sling.distribution.ImportPostProcessException; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.common.DistributionException; -import org.apache.sling.distribution.journal.DistributionCallback; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessagingProvider; @@ -84,6 +83,7 @@ import org.osgi.service.metatype.annotations.Designate; import org.osgi.util.converter.Converters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.sling.distribution.journal.impl.events.DistributionEvents; /** * A Subscriber SCD agent which consumes messages produced by a @@ -142,8 +142,7 @@ public class DistributionSubscriber { @Reference BookKeeperFactory bookKeeperFactory, @Reference SubscriberReadyStore subscriberReadyStore, @Reference OnlyOnLeader onlyOnLeader, - @Reference DistributionCallback distributionCallback, - SubscriberConfiguration config, BundleContext context, Map<String, Object> properties + SubscriberConfiguration config, BundleContext context, Map<String, Object> properties ) { String subSlingId = requireNonNull(slingSettings.getSlingId()); subAgentName = requireNotBlank(config.name()); @@ -175,7 +174,7 @@ public class DistributionSubscriber { packageNodeName, commandNodeName, config.contentPackageExtractorOverwritePrimaryTypesOfFolders()); - bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics, distributionCallback); + bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics); if (config.editable()) { Consumer<Long> clearHandler = (Long offset) -> { diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index 557fb50..5d67939 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -108,8 +108,17 @@ public class BookKeeperTest { BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package", "command", true); subscriberMetrics = new SubscriberMetrics(metricsService, bkConfig.getSubAgentName(), "publish", bkConfig.isEditable()); DistributionCallback distributionCallback = null; - bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender, logSender, bkConfig, - importPreProcessor, importPostProcessor, invalidationProcessor, distributionCallback); + bookKeeper = new BookKeeper( + resolverFactory, + subscriberMetrics, + packageHandler, + eventAdmin, + sender, + logSender, + bkConfig, + importPreProcessor, + importPostProcessor, + invalidationProcessor); } @Test diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 16572fa..1bdbe19 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -446,8 +446,18 @@ public class SubscriberTest { props.putAll(overrides); SubscriberConfiguration config = Converters.standardConverter().convert(props).to(SubscriberConfiguration.class); OnlyOnLeader onlyOnLeader = new OnlyOnLeader(context); - DistributionCallback distributionCallback = null; - subscriber = new DistributionSubscriber(packageBuilder, slingSettings, clientProvider, precondition, metricsService, bookKeeperFactory, subscriberReadyStore, onlyOnLeader, distributionCallback, config, context, props); + this.subscriber = new DistributionSubscriber( + packageBuilder, + slingSettings, + clientProvider, + precondition, + metricsService, + bookKeeperFactory, + subscriberReadyStore, + onlyOnLeader, + config, + context, + props); verify(clientProvider).createPoller( Mockito.eq(Topics.PACKAGE_TOPIC), Mockito.eq(Reset.latest),