This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 757422e SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move
PackageHandling into BookKeeper. Extract editable from CommandPoller. (#47)
757422e is described below
commit 757422e40ea79fa05140af991c1f627cd0c96784
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Jun 30 18:53:38 2020 +0200
SLING-9560 - Configure BookKeeper via BookKeeperConfig. Move
PackageHandling into BookKeeper. Extract editable from CommandPoller. (#47)
---
.../journal/impl/subscriber/BookKeeper.java | 54 ++++++++++----------
.../journal/impl/subscriber/BookKeeperConfig.java | 59 ++++++++++++++++++++++
.../journal/impl/subscriber/CommandPoller.java | 18 +------
.../impl/subscriber/DistributionSubscriber.java | 30 +++++------
.../journal/impl/subscriber/BookKeeperTest.java | 15 ++++--
.../journal/impl/subscriber/CommandPollerTest.java | 39 +++++---------
6 files changed, 125 insertions(+), 90 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
index 18fe950..49e44bb 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeper.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
@@ -45,6 +46,7 @@ import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsServ
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -87,43 +89,40 @@ public class BookKeeper implements Closeable {
private final PackageHandler packageHandler;
private final EventAdmin eventAdmin;
private final Consumer<PackageStatusMessage> sender;
- private final boolean editable;
- private final int maxRetries;
+ private final BookKeeperConfig config;
private final boolean errorQueueEnabled;
private final PackageRetries packageRetries = new PackageRetries();
private final LocalStore statusStore;
private final LocalStore processedOffsets;
- private final String subAgentName;
- private final String subSlingId;
private final GaugeService<Integer> retriesGauge;
private int skippedCounter = 0;
public BookKeeper(ResourceResolverFactory resolverFactory,
DistributionMetricsService distributionMetricsService,
- PackageHandler packageHandler,
+ Packaging packaging,
+ DistributionPackageBuilder packageBuilder,
EventAdmin eventAdmin,
Consumer<PackageStatusMessage> sender,
- String subAgentName,
- String subSlingId,
- boolean editable,
- int maxRetries) {
+ BookKeeperConfig config) {
+ String pkgType = packageBuilder.getType();
+ ContentPackageExtractor extractor = new
ContentPackageExtractor(packaging, config.getPackageHandling());
+ PackageHandler packageHandler = new PackageHandler(packageBuilder,
extractor);
this.packageHandler = packageHandler;
this.eventAdmin = eventAdmin;
- String nameRetries = DistributionMetricsService.SUB_COMPONENT +
".current_retries;sub_name=" + subAgentName;
+ this.sender = sender;
+ this.config = config;
+ String nameRetries = DistributionMetricsService.SUB_COMPONENT +
".current_retries;sub_name=" + config.getSubAgentName();
this.retriesGauge =
distributionMetricsService.createGauge(nameRetries, "Retries of current
package", packageRetries::getSum);
this.resolverFactory = resolverFactory;
this.distributionMetricsService = distributionMetricsService;
- this.sender = sender;
- this.subAgentName = subAgentName;
- this.subSlingId = subSlingId;
- this.editable = editable;
- this.maxRetries = maxRetries;
// Error queues are enabled when the number
// of retry attempts is limited ; disabled otherwise
- this.errorQueueEnabled = (maxRetries >= 0);
- this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS,
subAgentName);
- this.processedOffsets = new LocalStore(resolverFactory,
STORE_TYPE_PACKAGE, subAgentName);
+ this.errorQueueEnabled = (config.getMaxRetries() >= 0);
+ this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS,
config.getSubAgentName());
+ this.processedOffsets = new LocalStore(resolverFactory,
STORE_TYPE_PACKAGE, config.getSubAgentName());
+ log.info("Started bookkeeper {} with package builder {} editable {}
maxRetries {}",
+ config.getSubAgentName(), pkgType, config.isEditable(),
config.getMaxRetries());
}
/**
@@ -149,7 +148,7 @@ public class BookKeeper implements Closeable {
try (Timer.Context context =
distributionMetricsService.getImportedPackageDuration().time();
ResourceResolver importerResolver =
getServiceResolver(SUBSERVICE_IMPORTER)) {
packageHandler.apply(importerResolver, pkgMsg);
- if (editable) {
+ if (config.isEditable()) {
storeStatus(importerResolver, new
PackageStatus(PackageStatusMessage.Status.IMPORTED, offset,
pkgMsg.getPubAgentName()));
}
storeOffset(importerResolver, offset);
@@ -157,7 +156,7 @@ public class BookKeeper implements Closeable {
distributionMetricsService.getImportedPackageSize().update(pkgMsg.getPkgLength());
distributionMetricsService.getPackageDistributedDuration().update((currentTimeMillis()
- createdTime), TimeUnit.MILLISECONDS);
packageRetries.clear(pkgMsg.getPubAgentName());
- Event event = new ImportedEvent(pkgMsg, subAgentName).toEvent();
+ Event event = new ImportedEvent(pkgMsg,
config.getSubAgentName()).toEvent();
eventAdmin.postEvent(event);
} catch (DistributionException | LoginException | IOException |
RuntimeException e) {
failure(pkgMsg, offset, e);
@@ -176,8 +175,8 @@ public class BookKeeper implements Closeable {
MDC.put("pub-agent-name", pubAgentName);
MDC.put("distribution-message-type", pkgMsg.getReqType().name());
MDC.put("retries", Integer.toString(packageRetries.get(pubAgentName)));
- MDC.put("sub-sling-id", subSlingId);
- MDC.put("sub-agent-name", subAgentName);
+ MDC.put("sub-sling-id", config.getSubSlingId());
+ MDC.put("sub-agent-name", config.getSubAgentName());
}
/**
@@ -194,13 +193,14 @@ public class BookKeeper implements Closeable {
String pubAgentName = pkgMsg.getPubAgentName();
int retries = packageRetries.get(pubAgentName);
- if (errorQueueEnabled && retries >= maxRetries) {
+ if (errorQueueEnabled && retries >= config.getMaxRetries()) {
log.warn("Failed to import distribution package {} at offset {}
after {} retries, removing the package.",
pkgMsg.getPkgId(), offset, retries);
removeFailedPackage(pkgMsg, offset);
} else {
packageRetries.increase(pubAgentName);
- String msg = format("Error processing distribution package %s.
Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, errorQueueEnabled ?
Integer.toString(maxRetries) : "infinite");
+ String retriesSt = errorQueueEnabled ?
Integer.toString(config.getMaxRetries()) : "infinite";
+ String msg = format("Error processing distribution package %s.
Retry attempts %s/%s.", pkgMsg.getPkgId(), retries, retriesSt);
throw new DistributionException(msg, e);
}
}
@@ -210,7 +210,7 @@ public class BookKeeper implements Closeable {
pkgMsg.getPkgId(), pkgMsg.getReqType(), offset);
Timer.Context context =
distributionMetricsService.getRemovedPackageDuration().time();
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
- if (editable) {
+ if (config.isEditable()) {
storeStatus(resolver, new PackageStatus(Status.REMOVED,
offset, pkgMsg.getPubAgentName()));
}
storeOffset(resolver, offset);
@@ -263,8 +263,8 @@ public class BookKeeper implements Closeable {
private void sendStatusMessage(PackageStatus status) {
PackageStatusMessage pkgStatMsg = PackageStatusMessage.builder()
- .subSlingId(subSlingId)
- .subAgentName(subAgentName)
+ .subSlingId(config.getSubSlingId())
+ .subAgentName(config.getSubAgentName())
.pubAgentName(status.pubAgentName)
.offset(status.offset)
.status(status.status)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
new file mode 100644
index 0000000..334f24f
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+public class BookKeeperConfig {
+ private final String subAgentName;
+ private final String subSlingId;
+ private final boolean editable;
+ private final int maxRetries;
+ private final PackageHandling packageHandling;
+
+ public BookKeeperConfig(String subAgentName,
+ String subSlingId,
+ boolean editable,
+ int maxRetries,
+ PackageHandling packageHandling) {
+ this.subAgentName = subAgentName;
+ this.subSlingId = subSlingId;
+ this.editable = editable;
+ this.maxRetries = maxRetries;
+ this.packageHandling = packageHandling;
+ }
+
+ public String getSubAgentName() {
+ return subAgentName;
+ }
+
+ public String getSubSlingId() {
+ return subSlingId;
+ }
+
+ public boolean isEditable() {
+ return editable;
+ }
+
+ public int getMaxRetries() {
+ return maxRetries;
+ }
+
+ public PackageHandling getPackageHandling() {
+ return packageHandling;
+ }
+}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index 703a9d1..45011a6 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -40,28 +40,14 @@ public class CommandPoller implements Closeable {
private final Closeable poller;
private final AtomicLong clearOffset = new AtomicLong(-1);
- public CommandPoller(MessagingProvider messagingProvider, Topics topics,
String subSlingId, String subAgentName, boolean editable) {
+ public CommandPoller(MessagingProvider messagingProvider, Topics topics,
String subSlingId, String subAgentName) {
this.subSlingId = subSlingId;
this.subAgentName = subAgentName;
- if (editable) {
-
- /*
- * We currently only support commands requiring editable mode.
- * As an optimisation, we don't register a poller for non
- * editable subscribers.
- *
- * When supporting commands independent from editable mode,
- * this optimisation will be removed.
- */
-
- poller = messagingProvider.createPoller(
+ this.poller = messagingProvider.createPoller(
topics.getCommandTopic(),
Reset.earliest,
create(ClearCommand.class, this::handleCommandMessage)
);
- } else {
- poller = null;
- }
}
public boolean isCleared(long offset) {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index b739dcb..1999a2f 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -124,7 +124,7 @@ public class DistributionSubscriber {
private Closeable packagePoller;
- private CommandPoller commandPoller;
+ private Optional<CommandPoller> commandPoller;
private BookKeeper bookKeeper;
@@ -167,15 +167,12 @@ public class DistributionSubscriber {
}
queueNames = getNotEmpty(config.agentNames());
- int maxRetries = config.maxRetries();
- boolean editable = config.editable();
pkgType = requireNonNull(packageBuilder.getType());
- ContentPackageExtractor extractor = new
ContentPackageExtractor(packaging, config.packageHandling());
- PackageHandler packageHandler = new PackageHandler(packageBuilder,
extractor);
Consumer<PackageStatusMessage> sender =
messagingProvider.createSender(topics.getStatusTopic());
- bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packageHandler, eventAdmin,
- sender, subAgentName, subSlingId, editable, maxRetries);
+ BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName,
subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
+ bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packaging, packageBuilder, eventAdmin,
+ sender, bkConfig);
long startOffset = bookKeeper.loadOffset() + 1;
String assign = messagingProvider.assignTo(startOffset);
@@ -183,20 +180,20 @@ public class DistributionSubscriber {
packagePoller =
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
HandlerAdapter.create(PackageMessage.class,
this::handlePackageMessage));
- commandPoller = new CommandPoller(messagingProvider, topics,
subSlingId, subAgentName, editable);
+ if (config.editable()) {
+ commandPoller = Optional.of(new CommandPoller(messagingProvider,
topics, subSlingId, subAgentName));
+ } else {
+ commandPoller = Optional.empty();
+ }
queueThread = startBackgroundThread(this::processQueue,
format("Queue Processor for Subscriber agent %s",
subAgentName));
int announceDelay =
PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
announcer = new Announcer(subSlingId, subAgentName, queueNames,
messagingProvider.createSender(topics.getDiscoveryTopic()), bookKeeper,
- maxRetries, config.editable(), announceDelay);
+ config.maxRetries(), config.editable(), announceDelay);
- boolean errorQueueEnabled = (maxRetries >= 0);
- String msg = format(
- "Started Subscriber agent %s at offset %s, subscribed to agent
names %s with package builder %s editable %s maxRetries %s errorQueueEnabled
%s",
- subAgentName, startOffset, queueNames, pkgType,
config.editable(), maxRetries, errorQueueEnabled);
- LOG.info(msg);
+ LOG.info("Started Subscriber agent {} at offset {}, subscribed to
agent names {}", subAgentName, startOffset, queueNames);
}
private Set<String> getNotEmpty(String[] agentNames) {
@@ -214,7 +211,8 @@ public class DistributionSubscriber {
*/
IOUtils.closeQuietly(announcer, bookKeeper,
- packagePoller, commandPoller);
+ packagePoller);
+ commandPoller.ifPresent(IOUtils::closeQuietly);
subscriberIdle.ifPresent(IOUtils::closeQuietly);
running = false;
try {
@@ -363,7 +361,7 @@ public class DistributionSubscriber {
}
private boolean shouldSkip(long offset) {
- boolean cleared = commandPoller.isCleared(offset);
+ boolean cleared = commandPoller.isPresent() ?
commandPoller.get().isCleared(offset) : false;
Decision decision = waitPrecondition(offset);
return cleared || decision == Decision.SKIP;
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
index 3afa938..e2688a7 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/BookKeeperTest.java
@@ -23,11 +23,13 @@ import static org.junit.Assert.assertThat;
import java.util.function.Consumer;
+import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.Before;
import org.junit.Test;
@@ -47,9 +49,6 @@ public class BookKeeperTest {
private DistributionMetricsService distributionMetricsService;
@Mock
- private PackageHandler packageHandler;
-
- @Mock
private EventAdmin eventAdmin;
@Mock
@@ -57,10 +56,16 @@ public class BookKeeperTest {
private BookKeeper bookKeeper;
+ @Mock
+ private Packaging packaging;
+
+ @Mock
+ private DistributionPackageBuilder packageBuilder;
+
@Before
public void before() {
- bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packageHandler, eventAdmin, sender,
- "subAgentName", "subSlingId", true, 10);
+ BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName",
"subSlingId", true, 10, PackageHandling.Extract);
+ bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packaging, packageBuilder, eventAdmin, sender,
bkConfig);
}
@Test
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
index 9cc3503..a596dcf 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPollerTest.java
@@ -20,7 +20,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -83,7 +82,7 @@ public class CommandPollerTest {
@Test
public void testSkipped() throws DistributionException,
InterruptedException, IOException {
- createCommandPoller(true);
+ createCommandPoller();
commandHandler.handle(info, commandMessage(SUBSLING_ID_OTHER,
SUB_AGENT_OTHER, 1L));
assertSkipped();
@@ -93,11 +92,15 @@ public class CommandPollerTest {
commandHandler.handle(info, commandMessage(SUB_SLING_ID,
SUB_AGENT_OTHER, 1L));
assertSkipped();
+
+ commandPoller.close();
+
+ verify(poller).close();
}
@Test
public void testClearOffsets() throws DistributionException,
InterruptedException, IOException {
- createCommandPoller(true);
+ createCommandPoller();
commandHandler.handle(info, commandMessage(10L));
assertClearedUpTo(10);
@@ -108,6 +111,10 @@ public class CommandPollerTest {
// Clearing lower offset should not change cleared offset
commandHandler.handle(info, commandMessage(1L));
assertClearedUpTo(11);
+
+ commandPoller.close();
+
+ verify(poller).close();
}
private void assertClearedUpTo(int max) {
@@ -118,24 +125,6 @@ public class CommandPollerTest {
}
- @Test
- public void testEditable() throws DistributionException,
InterruptedException, IOException {
- createCommandPoller(true);
-
- commandPoller.close();
-
- verify(poller).close();
- }
-
- @Test
- public void testNotEditable() throws DistributionException,
InterruptedException, IOException {
- createCommandPoller(false);
-
- commandPoller.close();
-
- verify(poller, never()).close();
- }
-
private void assertSkipped() {
assertThat(commandPoller.isCleared(1), equalTo(false));
}
@@ -152,16 +141,14 @@ public class CommandPollerTest {
.build();
}
- private void createCommandPoller(boolean editable) {
+ private void createCommandPoller() {
when(clientProvider.createPoller(
Mockito.anyString(),
Mockito.eq(Reset.earliest),
handlerCaptor.capture()))
.thenReturn(poller);
- commandPoller = new CommandPoller(clientProvider, topics,
SUB_SLING_ID, SUB_AGENT_NAME, editable);
- if (editable) {
- commandHandler = handlerCaptor.getValue().getHandler();
- }
+ commandPoller = new CommandPoller(clientProvider, topics,
SUB_SLING_ID, SUB_AGENT_NAME);
+ commandHandler = handlerCaptor.getValue().getHandler();
}
}