This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new f4af344 SLING-9259 - DistributionSubscriber does not have to
implement DistributionAgent
f4af344 is described below
commit f4af344894622eec9cf61aedbfe0efb07a8f3530
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Jun 22 11:43:19 2020 +0200
SLING-9259 - DistributionSubscriber does not have to implement
DistributionAgent
---
.../impl/subscriber/DistributionSubscriber.java | 88 ++--------------------
.../journal/impl/subscriber/SubscriberTest.java | 31 --------
2 files changed, 7 insertions(+), 112 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index dcd016c..94ed9ff 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -31,9 +31,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -44,7 +41,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
-import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
@@ -52,14 +48,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.vault.packaging.Packaging;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
@@ -70,17 +61,12 @@ import
org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
import
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.log.spi.DistributionLog;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -101,15 +87,13 @@ import org.slf4j.LoggerFactory;
"announceDelay=10000" }, configurationPid =
"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
@Designate(ocd = SubscriberConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
-public class DistributionSubscriber implements DistributionAgent {
+public class DistributionSubscriber {
private static final int PRECONDITION_TIMEOUT = 60;
static int RETRY_DELAY = 5000;
static int QUEUE_FETCH_DELAY = 1000;
private static final Logger LOG =
LoggerFactory.getLogger(DistributionSubscriber.class);
- private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES =
Collections.emptySet();
-
private static final DistributionQueueItem STOPPED_ITEM = new
DistributionQueueItem("stop-item", emptyMap());
@Reference(name = "packageBuilder")
@@ -223,28 +207,12 @@ public class DistributionSubscriber implements
DistributionAgent {
"Started Subscriber agent %s at offset %s, subscribed to agent
names %s with package builder %s editable %s maxRetries %s errorQueueEnabled
%s",
subAgentName, startOffset, queueNames, pkgType,
config.editable(), maxRetries, errorQueueEnabled);
LOG.info(msg);
- Dictionary<String, Object> props = createServiceProps(config);
- componentReg = context.registerService(DistributionAgent.class, this,
props);
}
private Set<String> getNotEmpty(String[] agentNames) {
return
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
}
- private Dictionary<String, Object>
createServiceProps(SubscriberConfiguration config) {
- Dictionary<String, Object> props = new Hashtable<>();
- props.put("name", config.name());
- props.put("title", config.name());
- props.put("details", config.name());
- props.put("agentNames", config.agentNames());
- props.put("editable", config.editable());
- props.put("maxRetries", config.maxRetries());
- props.put("packageBuilder.target", config.packageBuilder_target());
- props.put("precondition.target", config.precondition_target());
- props.put("webconsole.configurationFactory.nameHint",
config.webconsole_configurationFactory_nameHint());
- return props;
- }
-
@Deactivate
public void deactivate() {
@@ -265,55 +233,13 @@ public class DistributionSubscriber implements
DistributionAgent {
subAgentName, queueNames, pkgType);
LOG.info(msg);
}
-
- @Nonnull
- @Override
- public Iterable<String> getQueueNames() {
- return queueNames;
- }
-
- @Override
- public DistributionQueue getQueue(@Nonnull String queueName) {
- DistributionQueueItem head = queueItemsBuffer.stream()
- .filter(item -> isIn(queueName, item))
- .findFirst()
- .orElse(null);
- return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
- }
-
- private boolean isIn(String queueName, DistributionQueueItem queueItem) {
- PackageMessage packageMsg =
queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
- return queueName.equals(packageMsg.getPubAgentName());
- }
-
- @Nonnull
- @Override
- public DistributionLog getLog() {
- return this::emptyDistributionLog;
- }
-
- private List<String> emptyDistributionLog() {
- return Collections.emptyList();
- }
-
- @Nonnull
- @Override
+
public DistributionAgentState getState() {
- return AgentState.getState(this);
- }
-
- @Nonnull
- @Override
- public DistributionResponse execute(ResourceResolver resourceResolver,
DistributionRequest request) {
- return executeUnsupported(request);
- }
-
- @Nonnull
- private DistributionResponse executeUnsupported(DistributionRequest
request) {
- String msg = format("Request type %s is not supported by this agent,
expected one of %s",
- request.getRequestType(), SUPPORTED_REQ_TYPES);
- LOG.info(msg);
- return new
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
+ boolean isBlocked = bookKeeper.getPackageRetries().getSum() > 0;
+ if (isBlocked) {
+ return DistributionAgentState.BLOCKED;
+ }
+ return queueItemsBuffer.size() > 0 ? DistributionAgentState.RUNNING :
DistributionAgentState.IDLE;
}
private void handlePackageMessage(MessageInfo info, PackageMessage
message) {
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index a54a26b..d21c41a 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,7 +21,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
import static
org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
@@ -42,7 +41,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -66,20 +64,11 @@ import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
-import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.distribution.agent.DistributionAgentState;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
@@ -229,8 +218,6 @@ public class SubscriberTest {
assumeNoPrecondition();
initSubscriber();
- assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
-
assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(),
equalTo(DistributionQueueState.IDLE));
assertThat(subscriber.getState(),
equalTo(DistributionAgentState.IDLE));
MessageInfo info = new TestMessageInfo("", 1, 0, 0);
@@ -244,16 +231,9 @@ public class SubscriberTest {
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
- DistributionQueueEntry item = queue.getHead();
- assertThat(item.getStatus().getItemState(),
equalTo(DistributionQueueItemState.QUEUED));
-
sem.release();
waitSubscriber(IDLE);
verify(statusSender, times(0)).accept(anyObject());
- List<String> log = subscriber.getLog().getLines();
- // We do not use the DistributionLog anymore
- assertThat(log.size(), equalTo(0));
}
@Test
@@ -277,17 +257,6 @@ public class SubscriberTest {
}
@Test
- public void testExecuteNotSupported() throws DistributionException {
- assumeNoPrecondition();
- initSubscriber();
-
- DistributionRequest request = new
SimpleDistributionRequest(DistributionRequestType.ADD, "test");
- DistributionResponse response = subscriber.execute(resourceResolver,
request);
- assertThat(response.getState(),
equalTo(DistributionRequestState.DROPPED));
- }
-
-
- @Test
public void testSendFailedStatus() throws DistributionException {
assumeNoPrecondition();
initSubscriber(ImmutableMap.of("maxRetries", "1"));