This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9583 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 5df16b72a9c4b8dc86f1d176878dd6ce11d804e3 Author: Christian Schneider <[email protected]> AuthorDate: Sun Jul 12 11:27:42 2020 +0200 SLING-9583 - Extract messaging code from PubQueueProviderImpl --- .../impl/publisher/DistributionPublisher.java | 32 +++++++++++++- .../impl/queue/{impl => }/ClearCallback.java | 2 +- .../journal/impl/queue/PubQueueProvider.java | 7 +++- .../journal/impl/queue/impl/PubQueue.java | 1 + .../impl/queue/impl/PubQueueProviderImpl.java | 49 ++-------------------- .../impl/publisher/DistributionPublisherTest.java | 3 +- .../impl/queue/impl/PubQueueProviderTest.java | 11 ++--- 7 files changed, 49 insertions(+), 56 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 2c01d7b..a317290 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -27,6 +27,7 @@ import static org.apache.sling.distribution.DistributionRequestType.DELETE; import static org.apache.sling.distribution.DistributionRequestType.TEST; import static org.apache.sling.distribution.journal.shared.DistributionMetricsService.timed; +import java.io.Closeable; import java.util.Arrays; import java.util.Collections; import java.util.Dictionary; @@ -46,9 +47,12 @@ import javax.management.NotCompliantMBeanException; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; +import org.apache.sling.distribution.journal.impl.queue.ClearCallback; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.impl.queue.QueueId; +import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.shared.AgentState; import org.apache.sling.distribution.journal.shared.DefaultDistributionLog; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; @@ -77,6 +81,8 @@ import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.osgi.service.metatype.annotations.Designate; import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.JournalAvailable; /** @@ -136,11 +142,14 @@ public class DistributionPublisher implements DistributionAgent { private ServiceRegistration<DistributionAgent> componentReg; private Consumer<PackageMessage> sender; + private Consumer<ClearCommand> commandSender; private JMXRegistration reg; private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge; + private Closeable statusPoller; + public DistributionPublisher() { log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO); REQ_TYPES.put(ADD, this::sendAndWait); @@ -159,6 +168,7 @@ public class DistributionPublisher implements DistributionAgent { pkgType = packageBuilder.getType(); this.sender = messagingProvider.createSender(topics.getPackageTopic()); + this.commandSender = messagingProvider.createSender(topics.getCommandTopic()); Dictionary<String, Object> props = createServiceProps(config); componentReg = requireNonNull(context.registerService(DistributionAgent.class, this, props)); @@ -178,11 +188,19 @@ public class DistributionPublisher implements DistributionAgent { "Current number of publish subscribers", () -> discoveryService.getTopologyView().getSubscribedAgentIds().size() ); + + statusPoller = messagingProvider.createPoller( + topics.getStatusTopic(), + Reset.earliest, + HandlerAdapter.create(PackageStatusMessage.class, pubQueueProvider::handleStatus) + ); + log.info(msg); } @Deactivate public void deactivate() { + IOUtils.closeQuietly(statusPoller); reg.close(); componentReg.unregister(); String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s", @@ -262,10 +280,22 @@ public class DistributionPublisher implements DistributionAgent { State state = view.getState(subAgentId.getAgentId(), pubAgentName); if (state != null) { QueueId queueId = new QueueId(pubAgentName, subAgentId.getSlingId(), subAgentId.getAgentName(), queueName); - return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), state.isEditable()); + ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset); + ClearCallback clearCallback = state.isEditable() ? editableCallback : null; + return pubQueueProvider.getQueue(queueId, state.getOffset() + 1, state.getRetries(), clearCallback); } return null; } + + private void sendClearCommand(String subSlingId, String subAgentName, long offset) { + ClearCommand commandMessage = ClearCommand.builder() + .subSlingId(subSlingId) + .subAgentName(subAgentName) + .offset(offset) + .build(); + log.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset); + commandSender.accept(commandMessage); + } @Nonnull @Override diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java similarity index 93% rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java rename to src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java index 0396b6e..ec10967 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/ClearCallback.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/ClearCallback.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.queue.impl; +package org.apache.sling.distribution.journal.impl.queue; import javax.annotation.ParametersAreNonnullByDefault; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java index f3bac6b..bc349b9 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java @@ -19,17 +19,22 @@ package org.apache.sling.distribution.journal.impl.queue; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.queue.spi.DistributionQueue; @ParametersAreNonnullByDefault public interface PubQueueProvider { @Nonnull - DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable); + DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, @Nullable ClearCallback clearCallback); @Nonnull DistributionQueue getErrorQueue(QueueId queueId); + void handleStatus(MessageInfo info, PackageStatusMessage message); + } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java index 23aaf15..8689a39 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java @@ -39,6 +39,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.sling.distribution.journal.impl.queue.ClearCallback; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java index f9dcc45..f866d36 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java @@ -18,27 +18,19 @@ */ package org.apache.sling.distribution.journal.impl.queue.impl; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; - -import java.io.Closeable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.queue.ClearCallback; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.impl.queue.QueueId; -import org.apache.sling.distribution.journal.messages.ClearCommand; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; -import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.spi.DistributionQueue; import org.osgi.service.component.annotations.Activate; @@ -63,54 +55,31 @@ public class PubQueueProviderImpl implements PubQueueProvider { private final Map<String, OffsetQueue<Long>> errorQueues = new ConcurrentHashMap<>(); @Reference - private MessagingProvider messagingProvider; - - @Reference - private Topics topics; - - @Reference private PubQueueCacheService pubQueueCacheService; - private Closeable statusPoller; - - private Consumer<ClearCommand> sender; - public PubQueueProviderImpl() { } public PubQueueProviderImpl( - PubQueueCacheService pubQueueCacheService, - MessagingProvider messagingProvider, - Topics topics) { + PubQueueCacheService pubQueueCacheService) { this.pubQueueCacheService = pubQueueCacheService; - this.messagingProvider = messagingProvider; - this.topics = topics; } @Activate public void activate() { - statusPoller = messagingProvider.createPoller( - topics.getStatusTopic(), - Reset.earliest, - create(PackageStatusMessage.class, this::handleStatus) - ); - sender = messagingProvider.createSender(topics.getCommandTopic()); LOG.info("Started Publisher queue provider service"); } @Deactivate public void deactivate() { - IOUtils.closeQuietly(statusPoller); LOG.info("Stopped Publisher queue provider service"); } @Nonnull @Override - public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, boolean editable) { + public DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, ClearCallback clearCallback) { OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(queueId.getPubAgentName(), minOffset); - ClearCallback editableCallback = offset -> sendClearCommand(queueId.getSubSlingId(), queueId.getSubAgentName(), offset); - ClearCallback callback = editable ? editableCallback : null; - return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, callback); + return new PubQueue(queueId.getQueueName(), agentQueue.getMinOffsetQueue(minOffset), headRetries, clearCallback); } @Nonnull @@ -139,14 +108,4 @@ public class PubQueueProviderImpl implements PubQueueProvider { } } - private void sendClearCommand(String subSlingId, String subAgentName, long offset) { - ClearCommand commandMessage = ClearCommand.builder() - .subSlingId(subSlingId) - .subAgentName(subAgentName) - .offset(offset) - .build(); - LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset); - sender.accept(commandMessage); - } - } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java index 0a66ad7..fe8268a 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java @@ -54,6 +54,7 @@ import org.apache.sling.distribution.agent.spi.DistributionAgent; import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.impl.queue.ClearCallback; import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; @@ -262,7 +263,7 @@ public class DistributionPublisherTest { when(topology.getSubscribedAgentIds(PUB1AGENT1)).thenReturn(Collections.singleton(QUEUE_NAME)); State state = stateWithMaxRetries(1); when(topology.getState(QUEUE_NAME, PUB1AGENT1)).thenReturn(state); - when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.eq(false))) + when(pubQueueProvider.getQueue(Mockito.any(), Mockito.eq(2l), Mockito.eq(0), Mockito.isNull(ClearCallback.class))) .thenThrow(new RuntimeException("Error")); Counter counter = new TestCounter(); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index 00eae27..ea71824 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -97,7 +97,6 @@ public class PubQueueProviderTest { private PubQueueCacheService pubQueueCacheService; private MessageHandler<PackageMessage> handler; - private MessageHandler<PackageStatusMessage> statHandler; private PubQueueProviderImpl queueProvider; private MBeanServer mbeanServer; @@ -120,10 +119,9 @@ public class PubQueueProviderTest { Topics topics = new Topics(); pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin); pubQueueCacheService.activate(); - queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics); + queueProvider = new PubQueueProviderImpl(pubQueueCacheService); queueProvider.activate(); handler = handlerCaptor.getValue().getHandler(); - statHandler = statHandlerCaptor.getValue().getHandler(); } @After @@ -131,7 +129,6 @@ public class PubQueueProviderTest { pubQueueCacheService.deactivate(); queueProvider.deactivate(); verify(poller).close(); - verify(statPoller).close(); } @Test @@ -142,13 +139,13 @@ public class PubQueueProviderTest { // Full pub1 queue contains all packages from pub1 QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); - DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, false); + DistributionQueue queue = queueProvider.getQueue(queueId, 0, -1, null); Iterator<DistributionQueueEntry> it1 = queue.getEntries(0, -1).iterator(); assertThat(it1.next().getItem().getPackageId(), equalTo("packageid1")); assertThat(it1.next().getItem().getPackageId(), equalTo("packageid3")); // With offset 1 first package is removed - DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, false); + DistributionQueue queue2 = queueProvider.getQueue(queueId, 1, -1, null); Iterator<DistributionQueueEntry> it2 = queue2.getEntries(0, 20).iterator(); assertThat(it2.next().getItem().getPackageId(), equalTo("packageid3")); assertThat(it2.hasNext(), equalTo(false)); @@ -177,7 +174,7 @@ public class PubQueueProviderTest { MessageInfo info = info(1L); handler.handle(info, pkgMsg1); PackageStatusMessage statusMsg1 = statusMessage(info.getOffset(), pkgMsg1); - statHandler.handle(info, statusMsg1); + queueProvider.handleStatus(info, statusMsg1); QueueId queueId = new QueueId(PUB1_AGENT_NAME, SUB_SLING_ID, SUB_AGENT_NAME, SUB_AGENT_ID); DistributionQueue queue = queueProvider.getErrorQueue(queueId);
