This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddee4a  SLING-12274 - Introduce import pre-processor hook. (#137)
0ddee4a is described below

commit 0ddee4a2afe1b0af77f4afe63780e5a051eca329
Author: Danilo Banjac <53165717+daniloban...@users.noreply.github.com>
AuthorDate: Mon Apr 15 12:47:35 2024 +0200

    SLING-12274 - Introduce import pre-processor hook. (#137)
    
    * SLING-12274 - Introduce import pre-processor hook.
    
    * Use snapshot
    
    ---------
    
    Co-authored-by: Danilo Banjac <dban...@adobe.com>
    Co-authored-by: Christian Schneider <ch...@die-schneider.net>
---
 pom.xml                                            |  2 +-
 .../journal/bookkeeper/BookKeeper.java             | 62 ++++++++++++++++++----
 .../journal/bookkeeper/BookKeeperFactory.java      |  7 ++-
 .../journal/bookkeeper/SubscriberMetrics.java      | 21 ++++++++
 .../journal/shared/NoOpImportPreProcessor.java     | 45 ++++++++++++++++
 .../journal/bookkeeper/BookKeeperTest.java         |  6 ++-
 .../publisher/PackageDistributedNotifierTest.java  |  1 +
 .../journal/impl/subscriber/SubscriberTest.java    | 22 +++++++-
 .../journal/queue/impl/PubQueueCacheTest.java      |  3 ++
 .../journal/queue/impl/PubQueueProviderTest.java   |  5 +-
 10 files changed, 158 insertions(+), 16 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2e1d055..0a86346 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.api</artifactId>
-            <version>0.7.0</version>
+            <version>0.7.1-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 11e5ccd..76697c1 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -42,6 +42,8 @@ import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.ImportPreProcessException;
+import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessException;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
@@ -96,13 +98,14 @@ public class BookKeeper {
     private final PackageRetries packageRetries = new PackageRetries();
     private final LocalStore statusStore;
     private final LocalStore processedOffsets;
+    private final ImportPreProcessor importPreProcessor;
     private final ImportPostProcessor importPostProcessor;
     private final InvalidationProcessor invalidationProcessor;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
SubscriberMetrics subscriberMetrics,
         PackageHandler packageHandler, EventAdmin eventAdmin, 
Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
-        BookKeeperConfig config, ImportPostProcessor importPostProcessor, 
InvalidationProcessor invalidationProcessor) {
+        BookKeeperConfig config, ImportPreProcessor importPreProcessor, 
ImportPostProcessor importPostProcessor, InvalidationProcessor 
invalidationProcessor) {
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
         this.sender = sender;
@@ -117,6 +120,7 @@ public class BookKeeper {
         this.errorQueueEnabled = (config.getMaxRetries() >= 0);
         this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, 
config.getSubAgentName());
         this.processedOffsets = new LocalStore(resolverFactory, 
config.getPackageNodeName(), config.getSubAgentName());
+        this.importPreProcessor = importPreProcessor;
         this.importPostProcessor = importPostProcessor;
         this.invalidationProcessor = invalidationProcessor;
         log.info("Started bookkeeper {}.", config);
@@ -142,6 +146,9 @@ public class BookKeeper {
         log.debug("Importing distribution package {} at offset={}", pkgMsg, 
offset);
         try (Timer.Context context = 
subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
+            // Execute the pre-processor
+            preProcess(pkgMsg);
+
             packageHandler.apply(importerResolver, pkgMsg);
             if (config.isEditable()) {
                 storeStatus(importerResolver, new 
PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
@@ -160,7 +167,7 @@ public class BookKeeper {
             eventAdmin.postEvent(event);
             log.info("Imported distribution package {} at offset={}", pkgMsg, 
offset);
             
subscriberMetrics.getPackageStatusCounter(Status.IMPORTED).increment();
-        } catch (DistributionException | LoginException | IOException | 
RuntimeException | ImportPostProcessException e) {
+        } catch (DistributionException | LoginException | IOException | 
RuntimeException | ImportPreProcessException |ImportPostProcessException e) {
             failure(pkgMsg, offset, e);
         }
     }
@@ -168,10 +175,7 @@ public class BookKeeper {
     public void invalidateCache(PackageMessage pkgMsg, long offset) throws 
DistributionException {
         log.debug("Invalidating the cache for the package {} at offset={}", 
pkgMsg, offset);
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
-            Map<String, Object> props = new HashMap<>();
-            props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
-            props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths());
-            props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
+            Map<String, Object> props = 
this.buildProcessorPropertiesFromMessage(pkgMsg);
 
             long invalidationStartTime = currentTimeMillis();
             subscriberMetrics.getInvalidationProcessRequest().increment();
@@ -200,13 +204,34 @@ public class BookKeeper {
         }
     }
 
+    /**
+     * Initiates pre-processing for a given package message.
+     * It constructs properties from the message, logs the event, processes 
the message,
+     * and updates relevant metrics. Throws {@link ImportPreProcessException} 
on failure.
+     *
+     * @param packageMessage the message to pre-process
+     * @throws ImportPreProcessException if pre-processing fails
+     */
+    private void preProcess(PackageMessage packageMessage) throws 
ImportPreProcessException {
+        log.debug("Executing import pre processor for package [{}]", 
packageMessage);
+        Map<String, Object> processorProperties = 
this.buildProcessorPropertiesFromMessage(packageMessage);
+
+        long preProcessStartTime = currentTimeMillis();
+        subscriberMetrics.getImportPreProcessRequest().increment();
+
+        this.importPreProcessor.process(processorProperties);
+
+        log.debug("Executed import pre processor for package [{}]", 
packageMessage.getPkgId());
+
+        subscriberMetrics.getImportPreProcessDuration().update(
+                (currentTimeMillis() - preProcessStartTime), 
TimeUnit.MILLISECONDS);
+        subscriberMetrics.getImportPreProcessSuccess().increment();
+    }
+
     private void postProcess(PackageMessage pkgMsg) throws 
ImportPostProcessException {
         log.debug("Executing import post processor for package [{}]", pkgMsg);
 
-        Map<String, Object> props = new HashMap<>();
-        props.put(DISTRIBUTION_TYPE, pkgMsg.getReqType().name());
-        props.put(DISTRIBUTION_PATHS, pkgMsg.getPaths());
-        props.put(DISTRIBUTION_PACKAGE_ID, pkgMsg.getPkgId());
+        Map<String, Object> props = 
this.buildProcessorPropertiesFromMessage(pkgMsg);
 
         long postProcessStartTime = currentTimeMillis();
         subscriberMetrics.getImportPostProcessRequest().increment();
@@ -411,6 +436,23 @@ public class BookKeeper {
         return 
resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, 
subService));
     }
 
+    /**
+     * Constructs processor properties from a {@link PackageMessage}.
+     * Extracts distribution type, paths, and package ID from the message
+     * to create a map used by various processors.
+     *
+     * @param packageMessage the message to extract properties from
+     * @return a map of key properties for processor use
+     */
+    private Map<String, Object> 
buildProcessorPropertiesFromMessage(PackageMessage packageMessage) {
+        Map<String, Object> processorProperties = new HashMap<>();
+        processorProperties.put(DISTRIBUTION_TYPE, 
packageMessage.getReqType().name());
+        processorProperties.put(DISTRIBUTION_PATHS, packageMessage.getPaths());
+        processorProperties.put(DISTRIBUTION_PACKAGE_ID, 
packageMessage.getPkgId());
+
+        return processorProperties;
+    }
+
     static void retryDelay() {
         try {
             Thread.sleep(RETRY_SEND_DELAY);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index f650fc4..584d404 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -23,6 +23,7 @@ import java.util.function.Consumer;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
@@ -48,7 +49,10 @@ public class BookKeeperFactory {
 
     @Reference
     BinaryStore binaryStore;
-    
+
+    @Reference
+    ImportPreProcessor importPreProcessor;
+
     @Reference
     ImportPostProcessor importPostProcessor;
 
@@ -74,6 +78,7 @@ public class BookKeeperFactory {
                 statusSender,
                 logSender,
                 config,
+                importPreProcessor,
                 importPostProcessor,
                 invalidationProcessor);
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 51ae453..686dd0c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -58,6 +58,12 @@ public class SubscriberMetrics {
 
     private final  Timer packageJournalDistributionDuration;
 
+    private final  Timer importPreProcessDuration;
+
+    private final  Counter importPreProcessSuccess;
+
+    private final  Counter importPreProcessRequest;
+
     private final  Timer importPostProcessDuration;
     
     private final  Counter importPostProcessSuccess;
@@ -87,6 +93,9 @@ public class SubscriberMetrics {
         processQueueItemDuration = 
metricsService.timer(getMetricName("process_queue_item_duration"));
         packageDistributedDuration = 
metricsService.timer(getMetricName("request_distributed_duration"));
         packageJournalDistributionDuration = 
metricsService.timer(getMetricName("package_journal_distribution_duration"));
+        importPreProcessDuration = 
metricsService.timer(getMetricName("import_pre_process_duration"));
+        importPreProcessSuccess = 
metricsService.counter(getMetricName("import_pre_process_success_count"));
+        importPreProcessRequest = 
metricsService.counter(getMetricName("import_pre_process_request_count"));
         importPostProcessDuration = 
metricsService.timer(getMetricName("import_post_process_duration"));
         importPostProcessSuccess = 
metricsService.counter(getMetricName("import_post_process_success_count"));
         importPostProcessRequest = 
metricsService.counter(getMetricName("import_post_process_request_count"));
@@ -198,6 +207,18 @@ public class SubscriberMetrics {
         return 
metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), 
"status", status.name()));
     }
 
+    public Timer getImportPreProcessDuration() {
+        return importPreProcessDuration;
+    }
+
+    public Counter getImportPreProcessSuccess() {
+        return importPreProcessSuccess;
+    }
+
+    public Counter getImportPreProcessRequest() {
+        return importPreProcessRequest;
+    }
+
     public Timer getImportPostProcessDuration() {
         return importPostProcessDuration;
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java
new file mode 100644
index 0000000..b3494ff
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/NoOpImportPreProcessor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+import org.apache.sling.distribution.ImportPreProcessor;
+import org.osgi.service.component.annotations.Component;
+
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+
+/**
+ * A no-operation (no-op) implementation of the {@link ImportPreProcessor} 
interface.
+ * This class is intended as a placeholder or default implementation that 
performs
+ * no actions when its {@link #process(Map)} method is invoked. It's useful in 
contexts
+ * where an {@link ImportPreProcessor} is required but no pre-import 
processing is necessary.
+ */
+@Component(property = { "type=default" })
+public class NoOpImportPreProcessor implements ImportPreProcessor {
+
+    /**
+     * Does not perform any pre-import processing on the given properties. 
This method
+     * is intentionally left empty, making this class a no-operation 
implementation.
+     *
+     * @param props properties defining the content to be imported; not used 
by this method.
+     */
+    @Override
+    public void process(@Nonnull Map<String, Object> props) {}
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 5c11b2d..bf72a26 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -33,6 +33,7 @@ import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.messages.LogMessage;
@@ -75,6 +76,9 @@ public class BookKeeperTest {
 
     @Mock
     private Consumer<LogMessage> logSender;
+
+    @Mock
+    private ImportPreProcessor importPreProcessor;
     
     @Mock
     private ImportPostProcessor importPostProcessor;
@@ -88,7 +92,7 @@ public class BookKeeperTest {
 
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract, "package", true);
         bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, 
packageHandler, eventAdmin, sender, logSender, bkConfig,
-            importPostProcessor, invalidationProcessor);
+                importPreProcessor, importPostProcessor, 
invalidationProcessor);
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index d330376..f4bf3c8 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -181,6 +181,7 @@ public class PackageDistributedNotifierTest {
                 .reqType(PackageMessage.ReqType.ADD)
                 .pkgType("journal")
                 .paths(Collections.singletonList("path"))
+                .deepPaths(Collections.singletonList("deep-path"))
                 .build();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index f1fa3d5..c73b4b5 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -60,6 +60,8 @@ import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.ImportPostProcessor;
+import org.apache.sling.distribution.ImportPreProcessException;
+import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
@@ -73,6 +75,7 @@ import 
org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
 import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
 import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
+import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
@@ -178,6 +181,9 @@ public class SubscriberTest {
 
     @Spy
     private SubscriberMetrics subscriberMetrics = new 
SubscriberMetrics(MetricsService.NOOP);
+
+    @Spy
+    private ImportPreProcessor importPreProcessor = new 
NoOpImportPreProcessor();
     
     @Spy
     private ImportPostProcessor importPostProcessor = new 
NoOpImportPostProcessor();
@@ -287,7 +293,7 @@ public class SubscriberTest {
     }
 
     @Test
-    public void testImportPostProcessInvoked() throws DistributionException, 
ImportPostProcessException {
+    public void testImportPreAndPostProcessInvoked() throws 
DistributionException, ImportPostProcessException, ImportPreProcessException {
         assumeNoPrecondition();
         initSubscriber();
         assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
@@ -310,9 +316,23 @@ public class SubscriberTest {
         props.put(DISTRIBUTION_PATHS,  message.getPaths());
         props.put(DISTRIBUTION_PACKAGE_ID, message.getPkgId());
         
+        verify(importPreProcessor, times(1)).process(props);
         verify(importPostProcessor, times(1)).process(props);
     }
 
+    @Test
+    public void testImportPreProcessError() throws ImportPreProcessException {
+        assumeNoPrecondition();
+        initSubscriber(Collections.singletonMap("maxRetries", "0"));
+        doThrow(new ImportPreProcessException("Failed pre process")).
+                when(importPreProcessor).process(any());
+
+        MessageInfo info = createInfo(0L);
+        packageHandler.handle(info, BASIC_ADD_PACKAGE);
+
+        verifyStatusMessageSentWithStatus(Status.REMOVED_FAILED);
+    }
+
     @Test
     public void testImportPostProcessError() throws DistributionException, 
ImportPostProcessException {
         assumeNoPrecondition();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
index 1badd30..bf0ed17 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueCacheTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.when;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -191,6 +192,8 @@ public class PubQueueCacheTest {
                 .pubSlingId("pubSlingId")
                 .reqType(reqType)
                 .pubAgentName(pubAgentName)
+                .paths(List.of("path"))
+                .deepPaths(List.of("deep-path"))
                 .build();
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
index d50a53b..23d66a4 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueProviderTest.java
@@ -28,9 +28,9 @@ import static org.mockito.Mockito.when;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
@@ -270,7 +270,8 @@ public class PubQueueProviderTest {
                 .pkgId(packageId)
                 .reqType(ReqType.ADD)
                 .pkgType("journal")
-                .paths(Arrays.asList("path"))
+                .paths(List.of("path"))
+                .deepPaths(List.of("deep-path"))
                 .build();
     }
 }

Reply via email to