This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new f4af344  SLING-9259 - DistributionSubscriber does not have to 
implement DistributionAgent
f4af344 is described below

commit f4af344894622eec9cf61aedbfe0efb07a8f3530
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Jun 22 11:43:19 2020 +0200

    SLING-9259 - DistributionSubscriber does not have to implement
    DistributionAgent
---
 .../impl/subscriber/DistributionSubscriber.java    | 88 ++--------------------
 .../journal/impl/subscriber/SubscriberTest.java    | 31 --------
 2 files changed, 7 insertions(+), 112 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index dcd016c..94ed9ff 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -31,9 +31,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -44,7 +41,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 
-import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
@@ -52,14 +48,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
@@ -70,17 +61,12 @@ import 
org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import org.apache.sling.distribution.journal.impl.queue.impl.SubQueue;
-import org.apache.sling.distribution.journal.impl.shared.AgentState;
 import 
org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
-import 
org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.log.spi.DistributionLog;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -101,15 +87,13 @@ import org.slf4j.LoggerFactory;
         "announceDelay=10000" }, configurationPid = 
"org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
 @Designate(ocd = SubscriberConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
-public class DistributionSubscriber implements DistributionAgent {
+public class DistributionSubscriber {
     private static final int PRECONDITION_TIMEOUT = 60;
     static int RETRY_DELAY = 5000;
     static int QUEUE_FETCH_DELAY = 1000;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DistributionSubscriber.class);
 
-    private static final Set<DistributionRequestType> SUPPORTED_REQ_TYPES = 
Collections.emptySet();
-
     private static final DistributionQueueItem STOPPED_ITEM = new 
DistributionQueueItem("stop-item", emptyMap());
 
     @Reference(name = "packageBuilder")
@@ -223,28 +207,12 @@ public class DistributionSubscriber implements 
DistributionAgent {
                 "Started Subscriber agent %s at offset %s, subscribed to agent 
names %s with package builder %s editable %s maxRetries %s errorQueueEnabled 
%s",
                 subAgentName, startOffset, queueNames, pkgType, 
config.editable(), maxRetries, errorQueueEnabled);
         LOG.info(msg);
-        Dictionary<String, Object> props = createServiceProps(config);
-        componentReg = context.registerService(DistributionAgent.class, this, 
props);
     }
 
     private Set<String> getNotEmpty(String[] agentNames) {
         return 
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
     }
 
-    private Dictionary<String, Object> 
createServiceProps(SubscriberConfiguration config) {
-        Dictionary<String, Object> props = new Hashtable<>();
-        props.put("name", config.name());
-        props.put("title", config.name());
-        props.put("details", config.name());
-        props.put("agentNames", config.agentNames());
-        props.put("editable", config.editable());
-        props.put("maxRetries", config.maxRetries());
-        props.put("packageBuilder.target", config.packageBuilder_target());
-        props.put("precondition.target", config.precondition_target());
-        props.put("webconsole.configurationFactory.nameHint", 
config.webconsole_configurationFactory_nameHint());
-        return props;
-    }
-
     @Deactivate
     public void deactivate() {
 
@@ -265,55 +233,13 @@ public class DistributionSubscriber implements 
DistributionAgent {
                 subAgentName, queueNames, pkgType);
         LOG.info(msg);
     }
-
-    @Nonnull
-    @Override
-    public Iterable<String> getQueueNames() {
-        return queueNames;
-    }
-
-    @Override
-    public DistributionQueue getQueue(@Nonnull String queueName) {
-        DistributionQueueItem head = queueItemsBuffer.stream()
-                .filter(item -> isIn(queueName, item))
-                .findFirst()
-                .orElse(null);
-        return new SubQueue(queueName, head, bookKeeper.getPackageRetries());
-    }
-
-    private boolean isIn(String queueName, DistributionQueueItem queueItem) {
-        PackageMessage packageMsg = 
queueItem.get(QueueItemFactory.PACKAGE_MSG, PackageMessage.class);
-        return queueName.equals(packageMsg.getPubAgentName());
-    }
-
-    @Nonnull
-    @Override
-    public DistributionLog getLog() {
-        return this::emptyDistributionLog;
-    }
-
-    private List<String> emptyDistributionLog() {
-        return Collections.emptyList();
-    }
-
-    @Nonnull
-    @Override
+    
     public DistributionAgentState getState() {
-        return AgentState.getState(this);
-    }
-
-    @Nonnull
-    @Override
-    public DistributionResponse execute(ResourceResolver resourceResolver, 
DistributionRequest request) {
-        return executeUnsupported(request);
-    }
-
-    @Nonnull
-    private DistributionResponse executeUnsupported(DistributionRequest 
request) {
-        String msg = format("Request type %s is not supported by this agent, 
expected one of %s",
-                request.getRequestType(), SUPPORTED_REQ_TYPES);
-        LOG.info(msg);
-        return new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
+        boolean isBlocked = bookKeeper.getPackageRetries().getSum() > 0;
+        if (isBlocked) {
+            return DistributionAgentState.BLOCKED;
+        }
+        return queueItemsBuffer.size() > 0 ? DistributionAgentState.RUNNING : 
DistributionAgentState.IDLE;
     }
 
     private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index a54a26b..d21c41a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -21,7 +21,6 @@ package org.apache.sling.distribution.journal.impl.subscriber;
 import static org.apache.sling.distribution.agent.DistributionAgentState.IDLE;
 import static 
org.apache.sling.distribution.agent.DistributionAgentState.RUNNING;
 import static org.awaitility.Awaitility.await;
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -42,7 +41,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -66,20 +64,11 @@ import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.Timer;
-import org.apache.sling.distribution.DistributionRequest;
-import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
-import org.apache.sling.distribution.DistributionResponse;
-import org.apache.sling.distribution.SimpleDistributionRequest;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
-import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
-import org.apache.sling.distribution.queue.DistributionQueueState;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
@@ -229,8 +218,6 @@ public class SubscriberTest {
         assumeNoPrecondition();
         initSubscriber();
 
-        assertThat(subscriber.getQueueNames(), contains(PUB1_AGENT_NAME));
-        
assertThat(subscriber.getQueue(PUB1_AGENT_NAME).getStatus().getState(), 
equalTo(DistributionQueueState.IDLE));
         assertThat(subscriber.getState(), 
equalTo(DistributionAgentState.IDLE));
         
         MessageInfo info = new TestMessageInfo("", 1, 0, 0);
@@ -244,16 +231,9 @@ public class SubscriberTest {
         packageHandler.handle(info, message);
         
         waitSubscriber(RUNNING);
-        DistributionQueue queue = subscriber.getQueue(PUB1_AGENT_NAME);
-        DistributionQueueEntry item = queue.getHead();
-        assertThat(item.getStatus().getItemState(), 
equalTo(DistributionQueueItemState.QUEUED));
-        
         sem.release();
         waitSubscriber(IDLE);
         verify(statusSender, times(0)).accept(anyObject());
-        List<String> log = subscriber.getLog().getLines();
-        // We do not use the DistributionLog anymore
-        assertThat(log.size(), equalTo(0));
     }
 
        @Test
@@ -277,17 +257,6 @@ public class SubscriberTest {
     }
 
     @Test
-    public void testExecuteNotSupported() throws DistributionException {
-        assumeNoPrecondition();
-        initSubscriber();
-
-        DistributionRequest request = new 
SimpleDistributionRequest(DistributionRequestType.ADD, "test");
-        DistributionResponse response = subscriber.execute(resourceResolver, 
request);
-        assertThat(response.getState(), 
equalTo(DistributionRequestState.DROPPED));
-    }
-
-
-    @Test
     public void testSendFailedStatus() throws DistributionException {
         assumeNoPrecondition();
         initSubscriber(ImmutableMap.of("maxRetries", "1"));

Reply via email to