This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9560
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 1112ce8c336a428d69539188c8110dd55a90fdb4
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 30 17:44:01 2020 +0200

    SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move 
PackageHandling into BookKeeper. Extract editable from CommandPoller.
---
 .../journal/impl/subscriber/BookKeeper.java        | 54 ++++++++++----------
 .../journal/impl/subscriber/BookKeeperConfig.java  | 59 ++++++++++++++++++++++
 .../journal/impl/subscriber/CommandPoller.java     | 18 +------
 .../impl/subscriber/DistributionSubscriber.java    | 30 +++++------
 .../journal/impl/subscriber/BookKeeperTest.java    | 15 ++++--
 .../journal/impl/subscriber/CommandPollerTest.java | 39 +++++---------
 6 files changed, 125 insertions(+), 90 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 18fe950..49e44bb 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -45,6 +46,7 @@ import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -87,43 +89,40 @@ public class BookKeeper implements Closeable {
     private final PackageHandler packageHandler;
     private final EventAdmin eventAdmin;
     private final Consumer<PackageStatusMessage> sender;
-    private final boolean editable;
-    private final int maxRetries;
+    private final BookKeeperConfig config;
     private final boolean errorQueueEnabled;
 
     private final PackageRetries packageRetries = new PackageRetries();
     private final LocalStore statusStore;
     private final LocalStore processedOffsets;
-    private final String subAgentName;
-    private final String subSlingId;
     private final GaugeService<Integer> retriesGauge;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
             DistributionMetricsService distributionMetricsService,
-            PackageHandler packageHandler,
+            Packaging packaging,
+            DistributionPackageBuilder packageBuilder,
             EventAdmin eventAdmin,
             Consumer<PackageStatusMessage> sender,
-            String subAgentName,
-            String subSlingId,
-            boolean editable, 
-            int maxRetries) { 
+            BookKeeperConfig config) { 
+        String pkgType = packageBuilder.getType();
+        ContentPackageExtractor extractor = new 
ContentPackageExtractor(packaging, config.getPackageHandling());
+        PackageHandler packageHandler = new PackageHandler(packageBuilder, 
extractor);
         this.packageHandler = packageHandler;
         this.eventAdmin = eventAdmin;
-        String nameRetries = DistributionMetricsService.SUB_COMPONENT + 
".current_retries;sub_name=" + subAgentName;
+        this.sender = sender;
+        this.config = config;
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + 
".current_retries;sub_name=" + config.getSubAgentName();
         this.retriesGauge = 
distributionMetricsService.createGauge(nameRetries, "Retries of current 
package", packageRetries::getSum);
         this.resolverFactory = resolverFactory;
         this.distributionMetricsService = distributionMetricsService;
-        this.sender = sender;
-        this.subAgentName = subAgentName;
-        this.subSlingId = subSlingId;
-        this.editable = editable;
-        this.maxRetries = maxRetries;
         // Error queues are enabled when the number
         // of retry attempts is limited ; disabled otherwise
-        this.errorQueueEnabled = (maxRetries >= 0);
-        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, 
subAgentName);
-        this.processedOffsets = new LocalStore(resolverFactory, 
STORE_TYPE_PACKAGE, subAgentName);
+        this.errorQueueEnabled = (config.getMaxRetries() >= 0);
+        this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, 
config.getSubAgentName());
+        this.processedOffsets = new LocalStore(resolverFactory, 
STORE_TYPE_PACKAGE, config.getSubAgentName());
+        log.info("Started bookkeeper {} with package builder {} editable {} 
maxRetries {}",
+                config.getSubAgentName(), pkgType, config.isEditable(), 
config.getMaxRetries());
     }
     
     /**
@@ -149,7 +148,7 @@ public class BookKeeper implements Closeable {
         try (Timer.Context context = 
distributionMetricsService.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
             packageHandler.apply(importerResolver, pkgMsg);
-            if (editable) {
+            if (config.isEditable()) {
                 storeStatus(importerResolver, new 
PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, 
pkgMsg.getPubAgentName()));
             }
             storeOffset(importerResolver, offset);
@@ -157,7 +156,7 @@ public class BookKeeper implements Closeable {
             
distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
             
distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis()
 - createdTime), TimeUnit.MILLISECONDS);
             packageRetries.clear(pkgMsg.getPubAgentName());
-            Event event = new ImportedEvent(pkgMsg, subAgentName).toEvent();
+            Event event = new ImportedEvent(pkgMsg, 
config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
         } catch (DistributionException | LoginException | IOException | 
RuntimeException e) {
             failure(pkgMsg, offset, e);
@@ -176,8 +175,8 @@ public class BookKeeper implements Closeable {
         MDC.put("pub-agent-name", pubAgentName);
         MDC.put("distribution-message-type", pkgMsg.getReqType().name());
         MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
-        MDC.put("sub-sling-id", subSlingId);
-        MDC.put("sub-agent-name", subAgentName);
+        MDC.put("sub-sling-id", config.getSubSlingId());
+        MDC.put("sub-agent-name", config.getSubAgentName());
     }
     
     /**
@@ -194,13 +193,14 @@ public class BookKeeper implements Closeable {
 
         String pubAgentName = pkgMsg.getPubAgentName();
         int retries = packageRetries.get(pubAgentName);
-        if (errorQueueEnabled && retries >= maxRetries) {
+        if (errorQueueEnabled && retries >= config.getMaxRetries()) {
             log.warn("Failed to import distribution package {} at offset {} 
after {} retries, removing the package.", 
                     pkgMsg.getPkgId(), offset, retries);
             removeFailedPackage(pkgMsg, offset);
         } else {
             packageRetries.increase(pubAgentName);
-            String msg = format("Error processing distribution package %s. 
Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? 
Integer.toString(maxRetries) : "infinite");
+            String retriesSt = errorQueueEnabled ? 
Integer.toString(config.getMaxRetries()) : "infinite";
+            String msg = format("Error processing distribution package %s. 
Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, retriesSt);
             throw new DistributionException(msg, e);
         }
     }
@@ -210,7 +210,7 @@ public class BookKeeper implements Closeable {
                 pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
         Timer.Context context = 
distributionMetricsService.getRemovedPackageDuration().time();
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
-            if (editable) {
+            if (config.isEditable()) {
                 storeStatus(resolver, new PackageStatus(Status.REMOVED, 
offset, pkgMsg.getPubAgentName()));
             }
             storeOffset(resolver, offset);
@@ -263,8 +263,8 @@ public class BookKeeper implements Closeable {
     
     private void sendStatusMessage(PackageStatus status) {
         PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder()
-                .subSlingId(subSlingId)
-                .subAgentName(subAgentName)
+                .subSlingId(config.getSubSlingId())
+                .subAgentName(config.getSubAgentName())
                 .pubAgentName(status.pubAgentName)
                 .offset(status.offset)
                 .status(status.status)
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
new file mode 100644
index 0000000..334f24f
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+public class BookKeeperConfig {
+    private final String subAgentName;
+    private final String subSlingId;
+    private final boolean editable;
+    private final int maxRetries;
+    private final PackageHandling packageHandling;
+
+    public BookKeeperConfig(String subAgentName,
+            String subSlingId,
+            boolean editable, 
+            int maxRetries,
+            PackageHandling packageHandling) {
+                this.subAgentName = subAgentName;
+                this.subSlingId = subSlingId;
+                this.editable = editable;
+                this.maxRetries = maxRetries;
+                this.packageHandling = packageHandling;
+    }
+    
+    public String getSubAgentName() {
+        return subAgentName;
+    }
+    
+    public String getSubSlingId() {
+        return subSlingId;
+    }
+    
+    public boolean isEditable() {
+        return editable;
+    }
+    
+    public int getMaxRetries() {
+        return maxRetries;
+    }
+    
+    public PackageHandling getPackageHandling() {
+        return packageHandling;
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 703a9d1..45011a6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -40,28 +40,14 @@ public class CommandPoller implements Closeable {
     private final Closeable poller;
     private final AtomicLong clearOffset = new AtomicLong(-1);
 
-    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName, boolean editable) {
+    public CommandPoller(MessagingProvider messagingProvider, Topics topics, 
String subSlingId, String subAgentName) {
         this.subSlingId = subSlingId;
         this.subAgentName = subAgentName;
-        if (editable) {
-
-            /*
-             * We currently only support commands requiring editable mode.
-             * As an optimisation, we don't register a poller for non
-             * editable subscribers.
-             *
-             * When supporting commands independent from editable mode,
-             * this optimisation will be removed.
-             */
-
-            poller = messagingProvider.createPoller(
+        this.poller = messagingProvider.createPoller(
                     topics.getCommandTopic(),
                     Reset.earliest,
                     create(ClearCommand.class, this::handleCommandMessage)
                     );
-        } else {
-            poller = null;
-        }
     }
     
     public boolean isCleared(long offset) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b739dcb..1999a2f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -124,7 +124,7 @@ public class DistributionSubscriber {
     
     private Closeable packagePoller;
 
-    private CommandPoller commandPoller;
+    private Optional<CommandPoller> commandPoller;
 
     private BookKeeper bookKeeper;
 
@@ -167,15 +167,12 @@ public class DistributionSubscriber {
         }
         
         queueNames = getNotEmpty(config.agentNames());
-        int maxRetries = config.maxRetries();
-        boolean editable = config.editable();
         pkgType = requireNonNull(packageBuilder.getType());
 
-        ContentPackageExtractor extractor = new 
ContentPackageExtractor(packaging, config.packageHandling());
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, 
extractor);
         Consumer<PackageStatusMessage> sender = 
messagingProvider.createSender(topics.getStatusTopic());
-        bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packageHandler, eventAdmin,
-                sender, subAgentName, subSlingId, editable, maxRetries);
+        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, 
subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
+        bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packaging, packageBuilder, eventAdmin,
+                sender, bkConfig);
         
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = messagingProvider.assignTo(startOffset);
@@ -183,20 +180,20 @@ public class DistributionSubscriber {
         packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
                 HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage));
 
-        commandPoller = new CommandPoller(messagingProvider, topics, 
subSlingId, subAgentName, editable);
+        if (config.editable()) {
+            commandPoller = Optional.of(new CommandPoller(messagingProvider, 
topics, subSlingId, subAgentName));
+        } else {
+            commandPoller = Optional.empty();
+        }
 
         queueThread = startBackgroundThread(this::processQueue,
                 format("Queue Processor for Subscriber agent %s", 
subAgentName));
 
         int announceDelay = 
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
         announcer = new Announcer(subSlingId, subAgentName, queueNames, 
messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
-                maxRetries, config.editable(), announceDelay);
+                config.maxRetries(), config.editable(), announceDelay);
 
-        boolean errorQueueEnabled = (maxRetries >= 0);
-        String msg = format(
-                "Started Subscriber agent %s at offset %s, subscribed to agent 
names %s with package builder %s editable %s maxRetries %s errorQueueEnabled 
%s",
-                subAgentName, startOffset, queueNames, pkgType, 
config.editable(), maxRetries, errorQueueEnabled);
-        LOG.info(msg);
+        LOG.info("Started Subscriber agent {} at offset {}, subscribed to 
agent names {}", subAgentName, startOffset, queueNames);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
@@ -214,7 +211,8 @@ public class DistributionSubscriber {
          */
 
         IOUtils.closeQuietly(announcer, bookKeeper, 
-                packagePoller, commandPoller);
+                packagePoller);
+        commandPoller.ifPresent(IOUtils::closeQuietly);
         subscriberIdle.ifPresent(IOUtils::closeQuietly);
         running = false;
         try {
@@ -363,7 +361,7 @@ public class DistributionSubscriber {
     }
 
     private boolean shouldSkip(long offset) {
-        boolean cleared = commandPoller.isCleared(offset);
+        boolean cleared = commandPoller.isPresent() ? 
commandPoller.get().isCleared(offset) : false;
         Decision decision = waitPrecondition(offset);
         return cleared || decision == Decision.SKIP;
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 3afa938..e2688a7 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -23,11 +23,13 @@ import static org.junit.Assert.assertThat;
 
 import java.util.function.Consumer;
 
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,9 +49,6 @@ public class BookKeeperTest {
     private DistributionMetricsService distributionMetricsService;
 
     @Mock
-    private PackageHandler packageHandler;
-
-    @Mock
     private EventAdmin eventAdmin;
 
     @Mock
@@ -57,10 +56,16 @@ public class BookKeeperTest {
 
     private BookKeeper bookKeeper;
 
+    @Mock
+    private Packaging packaging;
+
+    @Mock
+    private DistributionPackageBuilder packageBuilder;
+
     @Before
     public void before() {
-        bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packageHandler, eventAdmin, sender,
-                "subAgentName", "subSlingId", true, 10);
+        BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract);
+        bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packaging, packageBuilder, eventAdmin, sender, 
bkConfig);
     }
 
     @Test
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 9cc3503..a596dcf 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -83,7 +82,7 @@ public class CommandPollerTest {
 
     @Test
     public void testSkipped() throws DistributionException, 
InterruptedException, IOException {
-        createCommandPoller(true);
+        createCommandPoller();
         
         commandHandler.handle(info, commandMessage(SUBSLING_ID_OTHER, 
SUB_AGENT_OTHER, 1L));
         assertSkipped();
@@ -93,11 +92,15 @@ public class CommandPollerTest {
         
         commandHandler.handle(info, commandMessage(SUB_SLING_ID, 
SUB_AGENT_OTHER, 1L));
         assertSkipped();
+        
+        commandPoller.close();
+        
+        verify(poller).close();
     }
     
     @Test
     public void testClearOffsets() throws DistributionException, 
InterruptedException, IOException {
-        createCommandPoller(true);
+        createCommandPoller();
 
         commandHandler.handle(info, commandMessage(10L));
         assertClearedUpTo(10);
@@ -108,6 +111,10 @@ public class CommandPollerTest {
         // Clearing lower offset should not change cleared offset
         commandHandler.handle(info, commandMessage(1L));
         assertClearedUpTo(11);
+        
+        commandPoller.close();
+        
+        verify(poller).close();
     }
 
     private void assertClearedUpTo(int max) {
@@ -118,24 +125,6 @@ public class CommandPollerTest {
 
     }
 
-    @Test
-    public void testEditable() throws DistributionException, 
InterruptedException, IOException {
-        createCommandPoller(true);
-        
-        commandPoller.close();
-        
-        verify(poller).close();
-    }
-    
-    @Test
-    public void testNotEditable() throws DistributionException, 
InterruptedException, IOException {
-        createCommandPoller(false);
-        
-        commandPoller.close();
-        
-        verify(poller, never()).close();
-    }
-
     private void assertSkipped() {
         assertThat(commandPoller.isCleared(1), equalTo(false));
     }
@@ -152,16 +141,14 @@ public class CommandPollerTest {
                 .build();
     }
 
-    private void createCommandPoller(boolean editable) {
+    private void createCommandPoller() {
         when(clientProvider.createPoller(
                 Mockito.anyString(),
                 Mockito.eq(Reset.earliest), 
                 handlerCaptor.capture()))
             .thenReturn(poller);
-        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME, editable);
-        if (editable) {
-            commandHandler = handlerCaptor.getValue().getHandler();
-        }
+        commandPoller = new CommandPoller(clientProvider, topics, 
SUB_SLING_ID, SUB_AGENT_NAME);
+        commandHandler = handlerCaptor.getValue().getHandler();
     }
 
 }

Reply via email to