This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new 0ddee4a SLING-12274 - Introduce import pre-processor hook. (#137) 0ddee4a is described below commit 0ddee4a2afe1b0af77f4afe63780e5a051eca329 Author: Danilo Banjac <53165717+daniloban...@users.noreply.github.com> AuthorDate: Mon Apr 15 12:47:35 2024 +0200 SLING-12274 - Introduce import pre-processor hook. (#137) * SLING-12274 - Introduce import pre-processor hook. * Use snapshot --------- Co-authored-by: Danilo Banjac <dban...@adobe.com> Co-authored-by: Christian Schneider <ch...@die-schneider.net> --- pom.xml | 2 +- .../journal/bookkeeper/BookKeeper.java | 62 ++++++++++++++++++---- .../journal/bookkeeper/BookKeeperFactory.java | 7 ++- .../journal/bookkeeper/SubscriberMetrics.java | 21 ++++++++ .../journal/shared/NoOpImportPreProcessor.java | 45 ++++++++++++++++ .../journal/bookkeeper/BookKeeperTest.java | 6 ++- .../publisher/PackageDistributedNotifierTest.java | 1 + .../journal/impl/subscriber/SubscriberTest.java | 22 +++++++- .../journal/queue/impl/PubQueueCacheTest.java | 3 ++ .../journal/queue/impl/PubQueueProviderTest.java | 5 +- 10 files changed, 158 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index 2e1d055..0a86346 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.api</artifactId> - <version>0.7.0</version> + <version>0.7.1-SNAPSHOT</version> <scope>provided</scope> </dependency> <dependency> diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 11e5ccd..76697c1 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -42,6 +42,8 @@ import org.apache.sling.api.resource.ValueMap; import org.apache.sling.commons.metrics.Timer; import org.apache.sling.distribution.ImportPostProcessException; import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessException; +import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessException; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; @@ -96,13 +98,14 @@ public class BookKeeper { private final PackageRetries packageRetries = new PackageRetries(); private final LocalStore statusStore; private final LocalStore processedOffsets; + private final ImportPreProcessor importPreProcessor; private final ImportPostProcessor importPostProcessor; private final InvalidationProcessor invalidationProcessor; private int skippedCounter = 0; public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, - BookKeeperConfig config, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) { + BookKeeperConfig config, ImportPreProcessor importPreProcessor, ImportPostProcessor importPostProcessor, InvalidationProcessor invalidationProcessor) { this.packageHandler = packageHandler; this.eventAdmin = eventAdmin; this.sender = sender; @@ -117,6 +120,7 @@ public class BookKeeper { this.errorQueueEnabled = (config.getMaxRetries() >= 0); this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName()); this.processedOffsets = new LocalStore(resolverFactory, config.getPackageNodeName(), config.getSubAgentName()); + this.importPreProcessor = importPreProcessor; this.importPostProcessor = importPostProcessor; this.invalidationProcessor = invalidationProcessor; log.info("Started bookkeeper {}.", config); @@ -142,6 +146,9 @@ public class BookKeeper { log.debug("Importing distribution package {} at offset={}", pkgMsg, offset); try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time(); ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { + // Execute the pre-processor + preProcess(pkgMsg); + packageHandler.apply(importerResolver, pkgMsg); if (config.isEditable()) { storeStatus(importerResolver, new PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName())); @@ -160,7 +167,7 @@ public class BookKeeper { eventAdmin.postEvent(event); log.info("Imported distribution package {} at offset={}", pkgMsg, offset); subscriberMetrics.getPackageStatusCounter(Status.IMPORTED).increment(); - } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPostProcessException e) { + } catch (DistributionException | LoginException | IOException | RuntimeException | ImportPreProcessException |ImportPostProcessException e) { failure(pkgMsg, offset, e); } } @@ -168,10 +175,7 @@ public class BookKeeper { public void invalidateCache(PackageMessage pkgMsg, long offset) throws DistributionException { log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { - Map<String, Object> props = new HashMap<>(); - props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name()); - props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths()); - props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId()); + Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg); long invalidationStartTime = currentTimeMillis(); subscriberMetrics.getInvalidationProcessRequest().increment(); @@ -200,13 +204,34 @@ public class BookKeeper { } } + /** + * Initiates pre-processing for a given package message. + * It constructs properties from the message, logs the event, processes the message, + * and updates relevant metrics. Throws {@link ImportPreProcessException} on failure. + * + * @param packageMessage the message to pre-process + * @throws ImportPreProcessException if pre-processing fails + */ + private void preProcess(PackageMessage packageMessage) throws ImportPreProcessException { + log.debug("Executing import pre processor for package [{}]", packageMessage); + Map<String, Object> processorProperties = this.buildProcessorPropertiesFromMessage(packageMessage); + + long preProcessStartTime = currentTimeMillis(); + subscriberMetrics.getImportPreProcessRequest().increment(); + + this.importPreProcessor.process(processorProperties); + + log.debug("Executed import pre processor for package [{}]", packageMessage.getPkgId()); + + subscriberMetrics.getImportPreProcessDuration().update( + (currentTimeMillis() - preProcessStartTime), TimeUnit.MILLISECONDS); + subscriberMetrics.getImportPreProcessSuccess().increment(); + } + private void postProcess(PackageMessage pkgMsg) throws ImportPostProcessException { log.debug("Executing import post processor for package [{}]", pkgMsg); - Map<String, Object> props = new HashMap<>(); - props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name()); - props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths()); - props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId()); + Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg); long postProcessStartTime = currentTimeMillis(); subscriberMetrics.getImportPostProcessRequest().increment(); @@ -411,6 +436,23 @@ public class BookKeeper { return resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, subService)); } + /** + * Constructs processor properties from a {@link PackageMessage}. + * Extracts distribution type, paths, and package ID from the message + * to create a map used by various processors. + * + * @param packageMessage the message to extract properties from + * @return a map of key properties for processor use + */ + private Map<String, Object> buildProcessorPropertiesFromMessage(PackageMessage packageMessage) { + Map<String, Object> processorProperties = new HashMap<>(); + processorProperties.put(DISTRIBUTION_TYPE, packageMessage.getReqType().name()); + processorProperties.put(DISTRIBUTION_PATHS, packageMessage.getPaths()); + processorProperties.put(DISTRIBUTION_PACKAGE_ID, packageMessage.getPkgId()); + + return processorProperties; + } + static void retryDelay() { try { Thread.sleep(RETRY_SEND_DELAY); diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java index f650fc4..584d404 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java @@ -23,6 +23,7 @@ import java.util.function.Consumer; import org.apache.jackrabbit.vault.packaging.Packaging; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; @@ -48,7 +49,10 @@ public class BookKeeperFactory { @Reference BinaryStore binaryStore; - + + @Reference + ImportPreProcessor importPreProcessor; + @Reference ImportPostProcessor importPostProcessor; @@ -74,6 +78,7 @@ public class BookKeeperFactory { statusSender, logSender, config, + importPreProcessor, importPostProcessor, invalidationProcessor); } diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java index 51ae453..686dd0c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java @@ -58,6 +58,12 @@ public class SubscriberMetrics { private final Timer packageJournalDistributionDuration; + private final Timer importPreProcessDuration; + + private final Counter importPreProcessSuccess; + + private final Counter importPreProcessRequest; + private final Timer importPostProcessDuration; private final Counter importPostProcessSuccess; @@ -87,6 +93,9 @@ public class SubscriberMetrics { processQueueItemDuration = metricsService.timer(getMetricName("process_queue_item_duration")); packageDistributedDuration = metricsService.timer(getMetricName("request_distributed_duration")); packageJournalDistributionDuration = metricsService.timer(getMetricName("package_journal_distribution_duration")); + importPreProcessDuration = metricsService.timer(getMetricName("import_pre_process_duration")); + importPreProcessSuccess = metricsService.counter(getMetricName("import_pre_process_success_count")); + importPreProcessRequest = metricsService.counter(getMetricName("import_pre_process_request_count")); importPostProcessDuration = metricsService.timer(getMetricName("import_post_process_duration")); importPostProcessSuccess = metricsService.counter(getMetricName("import_post_process_success_count")); importPostProcessRequest = metricsService.counter(getMetricName("import_post_process_request_count")); @@ -198,6 +207,18 @@ public class SubscriberMetrics { return metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), "status", status.name())); } + public Timer getImportPreProcessDuration() { + return importPreProcessDuration; + } + + public Counter getImportPreProcessSuccess() { + return importPreProcessSuccess; + } + + public Counter getImportPreProcessRequest() { + return importPreProcessRequest; + } + public Timer getImportPostProcessDuration() { return importPostProcessDuration; } diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java new file mode 100644 index 0000000..b3494ff --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.shared; + +import org.apache.sling.distribution.ImportPreProcessor; +import org.osgi.service.component.annotations.Component; + +import java.util.Map; + +import javax.annotation.Nonnull; + +/** + * A no-operation (no-op) implementation of the {@link ImportPreProcessor} interface. + * This class is intended as a placeholder or default implementation that performs + * no actions when its {@link #process(Map)} method is invoked. It's useful in contexts + * where an {@link ImportPreProcessor} is required but no pre-import processing is necessary. + */ +@Component(property = { "type=default" }) +public class NoOpImportPreProcessor implements ImportPreProcessor { + + /** + * Does not perform any pre-import processing on the given properties. This method + * is intentionally left empty, making this class a no-operation implementation. + * + * @param props properties defining the content to be imported; not used by this method. + */ + @Override + public void process(@Nonnull Map<String, Object> props) {} +} diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index 5c11b2d..bf72a26 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -33,6 +33,7 @@ import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.metrics.MetricsService; import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.messages.LogMessage; @@ -75,6 +76,9 @@ public class BookKeeperTest { @Mock private Consumer<LogMessage> logSender; + + @Mock + private ImportPreProcessor importPreProcessor; @Mock private ImportPostProcessor importPostProcessor; @@ -88,7 +92,7 @@ public class BookKeeperTest { BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package", true); bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender, logSender, bkConfig, - importPostProcessor, invalidationProcessor); + importPreProcessor, importPostProcessor, invalidationProcessor); } @Test diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java index d330376..f4bf3c8 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java @@ -181,6 +181,7 @@ public class PackageDistributedNotifierTest { .reqType(PackageMessage.ReqType.ADD) .pkgType("journal") .paths(Collections.singletonList("path")) + .deepPaths(Collections.singletonList("deep-path")) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index f1fa3d5..c73b4b5 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -60,6 +60,8 @@ import org.apache.sling.api.resource.ResourceUtil; import org.apache.sling.commons.metrics.MetricsService; import org.apache.sling.distribution.ImportPostProcessException; import org.apache.sling.distribution.ImportPostProcessor; +import org.apache.sling.distribution.ImportPreProcessException; +import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; @@ -73,6 +75,7 @@ import org.apache.sling.distribution.journal.bookkeeper.BookKeeper; import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory; import org.apache.sling.distribution.journal.bookkeeper.LocalStore; import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics; +import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor; import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor; import org.apache.sling.distribution.journal.impl.precondition.Precondition; import org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision; @@ -178,6 +181,9 @@ public class SubscriberTest { @Spy private SubscriberMetrics subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP); + + @Spy + private ImportPreProcessor importPreProcessor = new NoOpImportPreProcessor(); @Spy private ImportPostProcessor importPostProcessor = new NoOpImportPostProcessor(); @@ -287,7 +293,7 @@ public class SubscriberTest { } @Test - public void testImportPostProcessInvoked() throws DistributionException, ImportPostProcessException { + public void testImportPreAndPostProcessInvoked() throws DistributionException, ImportPostProcessException, ImportPreProcessException { assumeNoPrecondition(); initSubscriber(); assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE)); @@ -310,9 +316,23 @@ public class SubscriberTest { props.put(DISTRIBUTION_PATHS, message.getPaths()); props.put(DISTRIBUTION_PACKAGE_ID, message.getPkgId()); + verify(importPreProcessor, times(1)).process(props); verify(importPostProcessor, times(1)).process(props); } + @Test + public void testImportPreProcessError() throws ImportPreProcessException { + assumeNoPrecondition(); + initSubscriber(Collections.singletonMap("maxRetries", "0")); + doThrow(new ImportPreProcessException("Failed pre process")). + when(importPreProcessor).process(any()); + + MessageInfo info = createInfo(0L); + packageHandler.handle(info, BASIC_ADD_PACKAGE); + + verifyStatusMessageSentWithStatus(Status.REMOVED_FAILED); + } + @Test public void testImportPostProcessError() throws DistributionException, ImportPostProcessException { assumeNoPrecondition(); diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java index 1badd30..bf0ed17 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; @@ -191,6 +192,8 @@ public class PubQueueCacheTest { .pubSlingId("pubSlingId") .reqType(reqType) .pubAgentName(pubAgentName) + .paths(List.of("path")) + .deepPaths(List.of("deep-path")) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java index d50a53b..23d66a4 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java @@ -28,9 +28,9 @@ import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.UUID; @@ -270,7 +270,8 @@ public class PubQueueProviderTest { .pkgId(packageId) .reqType(ReqType.ADD) .pkgType("journal") - .paths(Arrays.asList("path")) + .paths(List.of("path")) + .deepPaths(List.of("deep-path")) .build(); } }