This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9583 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit bb6d005ccd289d75fc92a4d01287b230b1f728ec Author: Christian Schneider <[email protected]> AuthorDate: Mon Jul 13 14:52:16 2020 +0200 SLING-9583 - Extract messaging code from PubQueueCache --- .../impl/publisher/MessagingCacheCallback.java | 84 ++++++++++++++ .../impl/publisher/PackageDistributedNotifier.java | 4 +- .../impl => publisher}/QueueCacheSeeder.java | 2 +- .../{queue/impl => publisher}/RangePoller.java | 8 +- .../{PubQueueProvider.java => CacheCallback.java} | 25 ++--- .../journal/impl/queue/PubQueueProvider.java | 4 + .../journal/impl/queue/impl/PubQueueCache.java | 43 ++------ .../impl/queue/impl/PubQueueCacheService.java | 42 +------ .../impl/queue/impl/PubQueueProviderImpl.java | 5 + .../impl/publisher/MessagingCacheCallbackTest.java | 122 +++++++++++++++++++++ .../publisher/PackageDistributedNotifierTest.java | 4 +- .../impl => publisher}/QueueCacheSeederTest.java | 2 +- .../{queue/impl => publisher}/RangePollerTest.java | 4 +- .../journal/impl/queue/impl/PubQueueCacheTest.java | 115 ++++++------------- .../impl/queue/impl/PubQueueProviderTest.java | 20 ++-- 15 files changed, 293 insertions(+), 191 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java new file mode 100644 index 0000000..01bf428 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.publisher; + +import static org.apache.sling.distribution.journal.HandlerAdapter.create; + +import java.io.Closeable; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.sling.distribution.journal.FullMessage; +import org.apache.sling.distribution.journal.JournalAvailable; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.queue.CacheCallback; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable; +import org.apache.sling.distribution.journal.shared.Topics; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component(property = "type=messaging") +public class MessagingCacheCallback implements CacheCallback { + private Logger log = LoggerFactory.getLogger(this.getClass()); + + @Reference + private MessagingProvider messagingProvider; + + @Reference + private Topics topics; + + @Reference + private JournalAvailable journalAvailable; + + @Reference + private DistributionMetricsService distributionMetricsService; + + /** + * The cache is active only when at least one DistributionSubscriber agent is configured. + */ + @Reference + private PublisherConfigurationAvailable publisherConfigurationAvailable; + + @Override + public Closeable createConsumer(MessageHandler<PackageMessage> handler) { + log.info("Starting consumer"); + QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider.createSender(topics.getPackageTopic())); + Closeable poller = messagingProvider.createPoller( + topics.getPackageTopic(), + Reset.latest, + create(PackageMessage.class, (info, message) -> { seeder.close(); handler.handle(info, message); }) + ); + seeder.startSeeding(); + return () -> IOUtils.closeQuietly(seeder, poller); + } + + @Override + public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException { + distributionMetricsService.getQueueCacheFetchCount().increment(); + return new RangePoller(messagingProvider, topics.getPackageTopic(), minOffset, maxOffset) + .fetchRange(); + } + +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java index ea3adc5..01f4ef0 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java @@ -28,8 +28,8 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; +import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; -import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; import org.apache.sling.distribution.journal.messages.PackageDistributedMessage; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.journal.MessagingProvider; @@ -57,7 +57,7 @@ public class PackageDistributedNotifier implements TopologyChangeHandler { private EventAdmin eventAdmin; @Reference - private PubQueueCacheService pubQueueCacheService; + private PubQueueProvider pubQueueCacheService; @Reference private MessagingProvider messagingProvider; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java similarity index 98% rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java index 26a6eaa..bbf27b6 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.queue.impl; +package org.apache.sling.distribution.journal.impl.publisher; import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java similarity index 94% rename from src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java rename to src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java index 72e7c4a..040db85 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.queue.impl; +package org.apache.sling.distribution.journal.impl.publisher; import static org.apache.sling.distribution.journal.HandlerAdapter.create; @@ -55,12 +55,12 @@ public class RangePoller { public RangePoller(MessagingProvider messagingProvider, String packageTopic, long minOffset, - long maxOffset) { - this.maxOffset = maxOffset; + long maxOffsetExclusive) { + this.maxOffset = maxOffsetExclusive; this.minOffset = minOffset; this.messages = new ArrayList<>(); String assign = messagingProvider.assignTo(minOffset); - LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset); + LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive); headPoller = messagingProvider.createPoller( packageTopic, Reset.earliest, assign, create(PackageMessage.class, this::handlePackage) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java similarity index 56% copy from src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java copy to src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java index bc349b9..43bfeb9 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java @@ -18,23 +18,14 @@ */ package org.apache.sling.distribution.journal.impl.queue; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.ParametersAreNonnullByDefault; +import java.io.Closeable; +import java.util.List; -import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.messages.PackageStatusMessage; -import org.apache.sling.distribution.queue.spi.DistributionQueue; - -@ParametersAreNonnullByDefault -public interface PubQueueProvider { - - @Nonnull - DistributionQueue getQueue(QueueId queueId, long minOffset, int headRetries, @Nullable ClearCallback clearCallback); - - @Nonnull - DistributionQueue getErrorQueue(QueueId queueId); - - void handleStatus(MessageInfo info, PackageStatusMessage message); +import org.apache.sling.distribution.journal.FullMessage; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.messages.PackageMessage; +public interface CacheCallback { + Closeable createConsumer(MessageHandler<PackageMessage> handler); + List<FullMessage<PackageMessage>> fetchRange(long minOffset, long maxOffset) throws InterruptedException; } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java index bc349b9..de57e77 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java @@ -24,6 +24,7 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; +import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.spi.DistributionQueue; @ParametersAreNonnullByDefault @@ -34,6 +35,9 @@ public interface PubQueueProvider { @Nonnull DistributionQueue getErrorQueue(QueueId queueId); + + @Nonnull + OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset); void handleStatus(MessageInfo info, PackageStatusMessage message); diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index fdd6ff7..11f410a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -22,7 +22,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.stream.Collectors.groupingBy; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; import java.io.Closeable; import java.util.HashSet; @@ -40,20 +39,17 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.messages.PackageMessage; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.shared.JMXRegistration; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.sling.distribution.journal.impl.queue.CacheCallback; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; /** * Cache the distribution packages fetched from the package topic. @@ -102,35 +98,16 @@ public class PubQueueCache { private final Set<JMXRegistration> jmxRegs = new HashSet<>(); - private final MessagingProvider messagingProvider; - private final EventAdmin eventAdmin; private volatile Closeable tailPoller; - private final QueueCacheSeeder seeder; - - private final String topic; - - private final DistributionMetricsService distributionMetricsService; + private final CacheCallback callback; - public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) { - this.messagingProvider = messagingProvider; + public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) { this.eventAdmin = eventAdmin; - this.distributionMetricsService = distributionMetricsService; - this.topic = topic; - this.seeder = seeder; - startPoller(); - this.seeder.startSeeding(); - } - - private void startPoller() { - LOG.info("Starting consumer"); - tailPoller = messagingProvider.createPoller( - this.topic, - Reset.latest, - create(PackageMessage.class, this::handlePackage) - ); + this.callback = callback; + tailPoller = callback.createConsumer(this::handlePackage); } @Nonnull @@ -148,7 +125,6 @@ public class PubQueueCache { public void close() { IOUtils.closeQuietly(tailPoller); - IOUtils.closeQuietly(seeder); jmxRegs.forEach(IOUtils::closeQuietly); } @@ -201,12 +177,8 @@ public class PubQueueCache { * cache. */ private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException { - distributionMetricsService.getQueueCacheFetchCount().increment(); - RangePoller headPoller = new RangePoller(messagingProvider, - topic, - requestedMinOffset, - cachedMinOffset); - merge(headPoller.fetchRange()); + List<FullMessage<PackageMessage>> messages = callback.fetchRange(requestedMinOffset, cachedMinOffset); + merge(messages); updateMinOffset(requestedMinOffset); } @@ -274,7 +246,6 @@ public class PubQueueCache { } private void handlePackage(final MessageInfo info, final PackageMessage message) { - seeder.close(); merge(singletonList(new FullMessage<>(info, message))); updateMaxOffset(info.getOffset()); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java index b8d6692..cc9789b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java @@ -21,14 +21,8 @@ package org.apache.sling.distribution.journal.impl.queue.impl; import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.sling.distribution.journal.impl.queue.CacheCallback; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; -import org.apache.sling.distribution.journal.messages.PackageMessage; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable; -import org.apache.sling.distribution.journal.shared.Topics; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.JournalAvailable; -import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -38,7 +32,7 @@ import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Component(immediate = true, service = PubQueueCacheService.class) +@Component(service = PubQueueCacheService.class) @ParametersAreNonnullByDefault public class PubQueueCacheService { @@ -50,40 +44,19 @@ public class PubQueueCacheService { */ private static final int CLEANUP_THRESHOLD = 10_000; - /** - * Will cause the cache to be cleared when we loose the journal - */ - @Reference - private JournalAvailable journalAvailable; - - /** - * The cache is active only when at least one DistributionSubscriber agent is configured. - */ - @Reference - private PublisherConfigurationAvailable publisherConfigurationAvailable; - - @Reference - private MessagingProvider messagingProvider; - - @Reference - private Topics topics; - @Reference private EventAdmin eventAdmin; @Reference - private DistributionMetricsService distributionMetricsService; + private CacheCallback callback; private volatile PubQueueCache cache; public PubQueueCacheService() {} - public PubQueueCacheService(MessagingProvider messagingProvider, - Topics topics, - EventAdmin eventAdmin) { - this.messagingProvider = messagingProvider; - this.topics = topics; + public PubQueueCacheService(EventAdmin eventAdmin, CacheCallback callback) { this.eventAdmin = eventAdmin; + this.callback = callback; } @Activate @@ -130,9 +103,6 @@ public class PubQueueCacheService { } private PubQueueCache newCache() { - String topic = topics.getPackageTopic(); - MessageSender<PackageMessage> sender = messagingProvider.createSender(topic); - QueueCacheSeeder seeder = new QueueCacheSeeder(sender); - return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder); + return new PubQueueCache(eventAdmin, callback); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java index f866d36..b34436f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java @@ -99,6 +99,11 @@ public class PubQueueProviderImpl implements PubQueueProvider { return new PubErrQueue(queueId.getQueueName(), agentQueue, errorQueue); } + @Override + public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) { + return pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset); + } + public void handleStatus(MessageInfo info, PackageStatusMessage message) { if (message.getStatus() == Status.REMOVED_FAILED) { QueueId queueId = new QueueId(message.getPubAgentName(), message.getSubSlingId(), message.getSubAgentName(), ""); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java new file mode 100644 index 0000000..8c66c85 --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.publisher; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.sling.commons.metrics.Counter; +import org.apache.sling.distribution.journal.FullMessage; +import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.JournalAvailable; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessageSender; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; +import org.apache.sling.distribution.journal.shared.DistributionMetricsService; +import org.apache.sling.distribution.journal.shared.Topics; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MessagingCacheCallbackTest { + @Mock + private MessagingProvider messagingProvider; + + @Spy + private Topics topics; + + @Mock + private JournalAvailable journalAvailable; + + @Mock + private DistributionMetricsService distributionMetricsService; + + @Mock + private MessageHandler<PackageMessage> handler; + + @Mock + private MessageSender<Object> sender; + + @Mock + private Counter counter; + + @InjectMocks + private MessagingCacheCallback callback; + + @Captor + private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; + + @Test + public void testCreateConsumer() throws Exception { + when(messagingProvider.createSender(Mockito.any())).thenReturn(sender); + Closeable poller = callback.createConsumer(handler); + assertThat(poller, notNullValue()); + + poller.close(); + } + + @Test + public void testFetchRange() throws Exception { + when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter); + when(messagingProvider.assignTo(Mockito.eq(10l))).thenReturn("0:10"); + CompletableFuture<List<FullMessage<PackageMessage>>> result = CompletableFuture.supplyAsync(this::fetch); + verify(messagingProvider, timeout(100000)).createPoller( + Mockito.anyString(), + Mockito.eq(Reset.earliest), + Mockito.eq("0:10"), + handlerCaptor.capture()); + simulateMessage(19); + simulateMessage(20); + List<FullMessage<PackageMessage>> messages = result.get(100, TimeUnit.SECONDS); + assertThat(messages.size(), equalTo(1)); + } + + private void simulateMessage(int offset) { + FullMessage<PackageMessage> message = RangePollerTest.createMessage(ReqType.ADD, offset); + handlerCaptor.getValue().getHandler().handle(message.getInfo(), message.getMessage()); + } + + List<FullMessage<PackageMessage>> fetch() { + try { + return callback.fetchRange(10l, 20l); + } catch (InterruptedException e) { + throw new IllegalStateException(); + } + } + +} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java index b41029c..e256c63 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java @@ -33,7 +33,7 @@ import java.util.HashSet; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; -import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; +import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.junit.Before; @@ -45,7 +45,7 @@ import org.osgi.service.event.EventAdmin; public class PackageDistributedNotifierTest { @Mock - private PubQueueCacheService pubQueueCacheService; + private PubQueueProvider pubQueueCacheService; @Mock private OffsetQueue<DistributionQueueItem> offsetQueue; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java similarity index 97% rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java index 29a5133..8a3cf76 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.queue.impl; +package org.apache.sling.distribution.journal.impl.publisher; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.timeout; diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java similarity index 96% rename from src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java rename to src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java index 6d82121..217bd59 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.sling.distribution.journal.impl.queue.impl; +package org.apache.sling.distribution.journal.impl.publisher; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.contains; @@ -107,7 +107,7 @@ public class RangePollerTest { } } - private FullMessage<PackageMessage> createMessage(ReqType reqType, int offset) { + public static FullMessage<PackageMessage> createMessage(ReqType reqType, long offset) { MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, System.currentTimeMillis()); PackageMessage message = PackageMessage.builder() .pubAgentName("agent1") diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index aca7d01..a1db345 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -20,39 +20,32 @@ package org.apache.sling.distribution.journal.impl.queue.impl; import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.stream.LongStream; import org.apache.sling.commons.metrics.Counter; -import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageHandler; -import org.apache.sling.distribution.journal.MessageSender; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.MessageInfo; +import org.apache.sling.distribution.journal.impl.queue.CacheCallback; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.shared.LocalStore; import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -82,35 +75,20 @@ public class PubQueueCacheTest { private static final Random RAND = new Random(); @Captor - private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; + private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor; @Mock private EventAdmin eventAdmin; @Mock - private MessagingProvider clientProvider; - - @Mock - private QueueCacheSeeder cacheSeeder; - - @Mock - private DistributionMetricsService distributionMetricsService; + private CacheCallback callback; @Mock private Counter counter; @Mock - private LocalStore seedStore; - - @Mock private Closeable poller; - @Mock - private MessageSender<Object> sender; - - @Mock - private QueueCacheSeeder seeder; - private PubQueueCache cache; private ExecutorService executor; @@ -120,29 +98,12 @@ public class PubQueueCacheTest { @Before public void before() { - when(clientProvider.createPoller( - eq(TOPIC), - eq(Reset.latest), - handlerCaptor.capture())) + when(callback.createConsumer(handlerCaptor.capture())) .thenReturn(poller); - when(clientProvider.createPoller( - eq(TOPIC), - eq(Reset.earliest), - Mockito.anyString(), - handlerCaptor.capture())) - .thenReturn(poller); - when(clientProvider.createSender(Mockito.anyString())) - .thenReturn(sender); - - when(distributionMetricsService.getQueueCacheFetchCount()) - .thenReturn(counter); - when(seedStore.load(anyString(), any())).thenReturn(0L); - - cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder); - verify(seeder).startSeeding(); + cache = new PubQueueCache(eventAdmin, callback); executor = Executors.newFixedThreadPool(10); - tailHandler = handlerCaptor.getValue().getHandler(); + tailHandler = handlerCaptor.getValue(); } @After @@ -165,21 +126,13 @@ public class PubQueueCacheTest { @Test public void testFetchWithSingleConsumer() throws Exception { - simulateMessage(tailHandler, 200); + simulateMessage(tailHandler, 200l); + when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l))) + .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD))); Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100); - // seeding the cache with a message at offset 200 - // wait that the consumer has started fetching the offsets from 100 to 200 - MessageHandler<PackageMessage> headHandler = awaitHeadHandler(); - // simulate messages for the fetched offsets - simulateMessages(headHandler, 100, cache.getMinOffset()); // the consumer returns the offset queue consumer.get(15, SECONDS); - assertEquals(100, cache.getMinOffset()); - } - - private MessageHandler<PackageMessage> awaitHeadHandler() { - return Awaitility.await().ignoreExceptions() - .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue()); + assertEquals(100l, cache.getMinOffset()); } @Test @@ -188,18 +141,15 @@ public class PubQueueCacheTest { // build two consumers for same agent queue, from offset 100 Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100); Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100); - // seeding the cache with a message at offset 200 - // wait that one consumer has started fetching the offsets from 100 to 200 - MessageHandler<PackageMessage> headHandler = awaitHeadHandler(); - // simulate messages for the fetched offsets - simulateMessages(headHandler, 100, cache.getMinOffset()); - // both consumers returns the offset queue + when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l))) + .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, ReqType.ADD))); OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS); OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS); assertEquals(q1.getSize(), q2.getSize()); assertEquals(100, cache.getMinOffset()); - // the offsets have been fetched only once - assertEquals(2, handlerCaptor.getAllValues().size()); + + // Fetch should only happen once + verify(callback, times(1)).fetchRange(Mockito.anyLong(), Mockito.anyLong()); } @Test @@ -213,11 +163,6 @@ public class PubQueueCacheTest { assertEquals(4, cache.size()); } - private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) { - LongStream.rangeClosed(fromOffset, toOffset) - .forEach(offset -> simulateMessage(handler, offset)); - } - private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) { simulateMessage(handler, pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3), @@ -225,19 +170,31 @@ public class PubQueueCacheTest { } private void simulateMessage(MessageHandler<PackageMessage> handler, String pubAgentName, ReqType reqType, long offset) { - PackageMessage msg = PackageMessage.builder() + PackageMessage msg = createTestMessage(pubAgentName, reqType); + simulateMessage(handler, msg, offset); + } + + private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) { + log.info("Simulate msg @ offset {}", offset); + handler.handle(createInfo(offset), msg); + } + + private FullMessage<PackageMessage> createTestMessage(long offset, String pubAgentName, ReqType reqType) { + return new FullMessage<>(createInfo(offset), createTestMessage(pubAgentName, reqType)); + } + + private PackageMessage createTestMessage(String pubAgentName, ReqType reqType) { + return PackageMessage.builder() .pkgType("pkgType") .pkgId(UUID.randomUUID().toString()) .pubSlingId("pubSlingId") .reqType(reqType) .pubAgentName(pubAgentName) .build(); - simulateMessage(handler, msg, offset); } - private void simulateMessage(MessageHandler<PackageMessage> handler, PackageMessage msg, long offset) { - log.info("Simulate msg @ offset {}", offset); - handler.handle(new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis()), msg); + private MessageInfo createInfo(long offset) { + return new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis()); } Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, long minOffset) { diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index ea71824..dc8bf32 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -45,6 +45,7 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.queue.CacheCallback; import org.apache.sling.distribution.journal.impl.queue.QueueId; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; @@ -77,7 +78,7 @@ public class PubQueueProviderTest { private MessagingProvider clientProvider; @Captor - private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; + private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor; @Captor private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> statHandlerCaptor; @@ -94,6 +95,9 @@ public class PubQueueProviderTest { @Mock private MessageSender<Object> sender; + @Mock + private CacheCallback callback; + private PubQueueCacheService pubQueueCacheService; private MessageHandler<PackageMessage> handler; @@ -104,24 +108,18 @@ public class PubQueueProviderTest { @Before public void before() throws PersistenceException { MockitoAnnotations.initMocks(this); - when(clientProvider.createPoller( - Mockito.eq(Topics.PACKAGE_TOPIC), - Mockito.any(Reset.class), - handlerCaptor.capture())) - .thenReturn(poller); + when(callback.createConsumer(handlerCaptor.capture())) + .thenReturn(poller); when(clientProvider.createPoller( Mockito.eq(Topics.STATUS_TOPIC), Mockito.any(Reset.class), statHandlerCaptor.capture())) .thenReturn(statPoller); - when(clientProvider.createSender(Mockito.anyString())) - .thenReturn(sender); - Topics topics = new Topics(); - pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin); + pubQueueCacheService = new PubQueueCacheService(eventAdmin, callback); pubQueueCacheService.activate(); queueProvider = new PubQueueProviderImpl(pubQueueCacheService); queueProvider.activate(); - handler = handlerCaptor.getValue().getHandler(); + handler = handlerCaptor.getValue(); } @After
