This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9560 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 1112ce8c336a428d69539188c8110dd55a90fdb4 Author: Christian Schneider <[email protected]> AuthorDate: Tue Jun 30 17:44:01 2020 +0200 SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move PackageHandling into BookKeeper. Extract editable from CommandPoller. --- .../journal/impl/subscriber/BookKeeper.java | 54 ++++++++++---------- .../journal/impl/subscriber/BookKeeperConfig.java | 59 ++++++++++++++++++++++ .../journal/impl/subscriber/CommandPoller.java | 18 +------ .../impl/subscriber/DistributionSubscriber.java | 30 +++++------ .../journal/impl/subscriber/BookKeeperTest.java | 15 ++++-- .../journal/impl/subscriber/CommandPollerTest.java | 39 +++++--------- 6 files changed, 125 insertions(+), 90 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java index 18fe950..49e44bb 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.commons.io.IOUtils; +import org.apache.jackrabbit.vault.packaging.Packaging; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolver; @@ -45,6 +46,7 @@ import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; +import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; @@ -87,43 +89,40 @@ public class BookKeeper implements Closeable { private final PackageHandler packageHandler; private final EventAdmin eventAdmin; private final Consumer<PackageStatusMessage> sender; - private final boolean editable; - private final int maxRetries; + private final BookKeeperConfig config; private final boolean errorQueueEnabled; private final PackageRetries packageRetries = new PackageRetries(); private final LocalStore statusStore; private final LocalStore processedOffsets; - private final String subAgentName; - private final String subSlingId; private final GaugeService<Integer> retriesGauge; private int skippedCounter = 0; public BookKeeper(ResourceResolverFactory resolverFactory, DistributionMetricsService distributionMetricsService, - PackageHandler packageHandler, + Packaging packaging, + DistributionPackageBuilder packageBuilder, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, - String subAgentName, - String subSlingId, - boolean editable, - int maxRetries) { + BookKeeperConfig config) { + String pkgType = packageBuilder.getType(); + ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.getPackageHandling()); + PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor); this.packageHandler = packageHandler; this.eventAdmin = eventAdmin; - String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName; + this.sender = sender; + this.config = config; + String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.getSubAgentName(); this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum); this.resolverFactory = resolverFactory; this.distributionMetricsService = distributionMetricsService; - this.sender = sender; - this.subAgentName = subAgentName; - this.subSlingId = subSlingId; - this.editable = editable; - this.maxRetries = maxRetries; // Error queues are enabled when the number // of retry attempts is limited ; disabled otherwise - this.errorQueueEnabled = (maxRetries >= 0); - this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, subAgentName); - this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, subAgentName); + this.errorQueueEnabled = (config.getMaxRetries() >= 0); + this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, config.getSubAgentName()); + this.processedOffsets = new LocalStore(resolverFactory, STORE_TYPE_PACKAGE, config.getSubAgentName()); + log.info("Started bookkeeper {} with package builder {} editable {} maxRetries {}", + config.getSubAgentName(), pkgType, config.isEditable(), config.getMaxRetries()); } /** @@ -149,7 +148,7 @@ public class BookKeeper implements Closeable { try (Timer.Context context = distributionMetricsService.getImportedPackageDuration().time(); ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { packageHandler.apply(importerResolver, pkgMsg); - if (editable) { + if (config.isEditable()) { storeStatus(importerResolver, new PackageStatus(PackageStatusMessage.Status.IMPORTED, offset, pkgMsg.getPubAgentName())); } storeOffset(importerResolver, offset); @@ -157,7 +156,7 @@ public class BookKeeper implements Closeable { distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength()); distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis() - createdTime), TimeUnit.MILLISECONDS); packageRetries.clear(pkgMsg.getPubAgentName()); - Event event = new ImportedEvent(pkgMsg, subAgentName).toEvent(); + Event event = new ImportedEvent(pkgMsg, config.getSubAgentName()).toEvent(); eventAdmin.postEvent(event); } catch (DistributionException | LoginException | IOException | RuntimeException e) { failure(pkgMsg, offset, e); @@ -176,8 +175,8 @@ public class BookKeeper implements Closeable { MDC.put("pub-agent-name", pubAgentName); MDC.put("distribution-message-type", pkgMsg.getReqType().name()); MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName))); - MDC.put("sub-sling-id", subSlingId); - MDC.put("sub-agent-name", subAgentName); + MDC.put("sub-sling-id", config.getSubSlingId()); + MDC.put("sub-agent-name", config.getSubAgentName()); } /** @@ -194,13 +193,14 @@ public class BookKeeper implements Closeable { String pubAgentName = pkgMsg.getPubAgentName(); int retries = packageRetries.get(pubAgentName); - if (errorQueueEnabled && retries >= maxRetries) { + if (errorQueueEnabled && retries >= config.getMaxRetries()) { log.warn("Failed to import distribution package {} at offset {} after {} retries, removing the package.", pkgMsg.getPkgId(), offset, retries); removeFailedPackage(pkgMsg, offset); } else { packageRetries.increase(pubAgentName); - String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ? Integer.toString(maxRetries) : "infinite"); + String retriesSt = errorQueueEnabled ? Integer.toString(config.getMaxRetries()) : "infinite"; + String msg = format("Error processing distribution package %s. Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, retriesSt); throw new DistributionException(msg, e); } } @@ -210,7 +210,7 @@ public class BookKeeper implements Closeable { pkgMsg.getPkgId(), pkgMsg.getReqType(), offset); Timer.Context context = distributionMetricsService.getRemovedPackageDuration().time(); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { - if (editable) { + if (config.isEditable()) { storeStatus(resolver, new PackageStatus(Status.REMOVED, offset, pkgMsg.getPubAgentName())); } storeOffset(resolver, offset); @@ -263,8 +263,8 @@ public class BookKeeper implements Closeable { private void sendStatusMessage(PackageStatus status) { PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder() - .subSlingId(subSlingId) - .subAgentName(subAgentName) + .subSlingId(config.getSubSlingId()) + .subAgentName(config.getSubAgentName()) .pubAgentName(status.pubAgentName) .offset(status.offset) .status(status.status) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java new file mode 100644 index 0000000..334f24f --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.subscriber; + +public class BookKeeperConfig { + private final String subAgentName; + private final String subSlingId; + private final boolean editable; + private final int maxRetries; + private final PackageHandling packageHandling; + + public BookKeeperConfig(String subAgentName, + String subSlingId, + boolean editable, + int maxRetries, + PackageHandling packageHandling) { + this.subAgentName = subAgentName; + this.subSlingId = subSlingId; + this.editable = editable; + this.maxRetries = maxRetries; + this.packageHandling = packageHandling; + } + + public String getSubAgentName() { + return subAgentName; + } + + public String getSubSlingId() { + return subSlingId; + } + + public boolean isEditable() { + return editable; + } + + public int getMaxRetries() { + return maxRetries; + } + + public PackageHandling getPackageHandling() { + return packageHandling; + } +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java index 703a9d1..45011a6 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java @@ -40,28 +40,14 @@ public class CommandPoller implements Closeable { private final Closeable poller; private final AtomicLong clearOffset = new AtomicLong(-1); - public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName, boolean editable) { + public CommandPoller(MessagingProvider messagingProvider, Topics topics, String subSlingId, String subAgentName) { this.subSlingId = subSlingId; this.subAgentName = subAgentName; - if (editable) { - - /* - * We currently only support commands requiring editable mode. - * As an optimisation, we don't register a poller for non - * editable subscribers. - * - * When supporting commands independent from editable mode, - * this optimisation will be removed. - */ - - poller = messagingProvider.createPoller( + this.poller = messagingProvider.createPoller( topics.getCommandTopic(), Reset.earliest, create(ClearCommand.class, this::handleCommandMessage) ); - } else { - poller = null; - } } public boolean isCleared(long offset) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index b739dcb..1999a2f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -124,7 +124,7 @@ public class DistributionSubscriber { private Closeable packagePoller; - private CommandPoller commandPoller; + private Optional<CommandPoller> commandPoller; private BookKeeper bookKeeper; @@ -167,15 +167,12 @@ public class DistributionSubscriber { } queueNames = getNotEmpty(config.agentNames()); - int maxRetries = config.maxRetries(); - boolean editable = config.editable(); pkgType = requireNonNull(packageBuilder.getType()); - ContentPackageExtractor extractor = new ContentPackageExtractor(packaging, config.packageHandling()); - PackageHandler packageHandler = new PackageHandler(packageBuilder, extractor); Consumer<PackageStatusMessage> sender = messagingProvider.createSender(topics.getStatusTopic()); - bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, - sender, subAgentName, subSlingId, editable, maxRetries); + BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, subSlingId, config.editable(), config.maxRetries(), config.packageHandling()); + bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin, + sender, bkConfig); long startOffset = bookKeeper.loadOffset() + 1; String assign = messagingProvider.assignTo(startOffset); @@ -183,20 +180,20 @@ public class DistributionSubscriber { packagePoller = messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign, HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage)); - commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, editable); + if (config.editable()) { + commandPoller = Optional.of(new CommandPoller(messagingProvider, topics, subSlingId, subAgentName)); + } else { + commandPoller = Optional.empty(); + } queueThread = startBackgroundThread(this::processQueue, format("Queue Processor for Subscriber agent %s", subAgentName)); int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000); announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper, - maxRetries, config.editable(), announceDelay); + config.maxRetries(), config.editable(), announceDelay); - boolean errorQueueEnabled = (maxRetries >= 0); - String msg = format( - "Started Subscriber agent %s at offset %s, subscribed to agent names %s with package builder %s editable %s maxRetries %s errorQueueEnabled %s", - subAgentName, startOffset, queueNames, pkgType, config.editable(), maxRetries, errorQueueEnabled); - LOG.info(msg); + LOG.info("Started Subscriber agent {} at offset {}, subscribed to agent names {}", subAgentName, startOffset, queueNames); } private Set<String> getNotEmpty(String[] agentNames) { @@ -214,7 +211,8 @@ public class DistributionSubscriber { */ IOUtils.closeQuietly(announcer, bookKeeper, - packagePoller, commandPoller); + packagePoller); + commandPoller.ifPresent(IOUtils::closeQuietly); subscriberIdle.ifPresent(IOUtils::closeQuietly); running = false; try { @@ -363,7 +361,7 @@ public class DistributionSubscriber { } private boolean shouldSkip(long offset) { - boolean cleared = commandPoller.isCleared(offset); + boolean cleared = commandPoller.isPresent() ? commandPoller.get().isCleared(offset) : false; Decision decision = waitPrecondition(offset); return cleared || decision == Decision.SKIP; } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java index 3afa938..e2688a7 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java @@ -23,11 +23,13 @@ import static org.junit.Assert.assertThat; import java.util.function.Consumer; +import org.apache.jackrabbit.vault.packaging.Packaging; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Before; import org.junit.Test; @@ -47,9 +49,6 @@ public class BookKeeperTest { private DistributionMetricsService distributionMetricsService; @Mock - private PackageHandler packageHandler; - - @Mock private EventAdmin eventAdmin; @Mock @@ -57,10 +56,16 @@ public class BookKeeperTest { private BookKeeper bookKeeper; + @Mock + private Packaging packaging; + + @Mock + private DistributionPackageBuilder packageBuilder; + @Before public void before() { - bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packageHandler, eventAdmin, sender, - "subAgentName", "subSlingId", true, 10); + BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract); + bookKeeper = new BookKeeper(resolverFactory, distributionMetricsService, packaging, packageBuilder, eventAdmin, sender, bkConfig); } @Test diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java index 9cc3503..a596dcf 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java @@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -83,7 +82,7 @@ public class CommandPollerTest { @Test public void testSkipped() throws DistributionException, InterruptedException, IOException { - createCommandPoller(true); + createCommandPoller(); commandHandler.handle(info, commandMessage(SUBSLING_ID_OTHER, SUB_AGENT_OTHER, 1L)); assertSkipped(); @@ -93,11 +92,15 @@ public class CommandPollerTest { commandHandler.handle(info, commandMessage(SUB_SLING_ID, SUB_AGENT_OTHER, 1L)); assertSkipped(); + + commandPoller.close(); + + verify(poller).close(); } @Test public void testClearOffsets() throws DistributionException, InterruptedException, IOException { - createCommandPoller(true); + createCommandPoller(); commandHandler.handle(info, commandMessage(10L)); assertClearedUpTo(10); @@ -108,6 +111,10 @@ public class CommandPollerTest { // Clearing lower offset should not change cleared offset commandHandler.handle(info, commandMessage(1L)); assertClearedUpTo(11); + + commandPoller.close(); + + verify(poller).close(); } private void assertClearedUpTo(int max) { @@ -118,24 +125,6 @@ public class CommandPollerTest { } - @Test - public void testEditable() throws DistributionException, InterruptedException, IOException { - createCommandPoller(true); - - commandPoller.close(); - - verify(poller).close(); - } - - @Test - public void testNotEditable() throws DistributionException, InterruptedException, IOException { - createCommandPoller(false); - - commandPoller.close(); - - verify(poller, never()).close(); - } - private void assertSkipped() { assertThat(commandPoller.isCleared(1), equalTo(false)); } @@ -152,16 +141,14 @@ public class CommandPollerTest { .build(); } - private void createCommandPoller(boolean editable) { + private void createCommandPoller() { when(clientProvider.createPoller( Mockito.anyString(), Mockito.eq(Reset.earliest), handlerCaptor.capture())) .thenReturn(poller); - commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME, editable); - if (editable) { - commandHandler = handlerCaptor.getValue().getHandler(); - } + commandPoller = new CommandPoller(clientProvider, topics, SUB_SLING_ID, SUB_AGENT_NAME); + commandHandler = handlerCaptor.getValue().getHandler(); } }
