This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9583
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit bb6d005ccd289d75fc92a4d01287b230b1f728ec
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Jul 13 14:52:16 2020 +0200

    SLING-9583 - Extract messaging code from PubQueueCache
---
 .../impl/publisher/MessagingCacheCallback.java     |  84 ++++++++++++++
 .../impl/publisher/PackageDistributedNotifier.java |   4 +-
 .../impl => publisher}/QueueCacheSeeder.java       |   2 +-
 .../{queue/impl => publisher}/RangePoller.java     |   8 +-
 .../{PubQueueProvider.java => CacheCallback.java}  |  25 ++---
 .../journal/impl/queue/PubQueueProvider.java       |   4 +
 .../journal/impl/queue/impl/PubQueueCache.java     |  43 ++------
 .../impl/queue/impl/PubQueueCacheService.java      |  42 +------
 .../impl/queue/impl/PubQueueProviderImpl.java      |   5 +
 .../impl/publisher/MessagingCacheCallbackTest.java | 122 +++++++++++++++++++++
 .../publisher/PackageDistributedNotifierTest.java  |   4 +-
 .../impl => publisher}/QueueCacheSeederTest.java   |   2 +-
 .../{queue/impl => publisher}/RangePollerTest.java |   4 +-
 .../journal/impl/queue/impl/PubQueueCacheTest.java | 115 ++++++-------------
 .../impl/queue/impl/PubQueueProviderTest.java      |  20 ++--
 15 files changed, 293 insertions(+), 191 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
new file mode 100644
index 0000000..01bf428
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallback.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.CacheCallback;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import 
org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(property = "type=messaging")
+public class MessagingCacheCallback implements CacheCallback {
+    private Logger log = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    private MessagingProvider messagingProvider;
+    
+    @Reference
+    private Topics topics;
+    
+    @Reference
+    private JournalAvailable journalAvailable;
+    
+    @Reference
+    private DistributionMetricsService distributionMetricsService;
+    
+    /**
+     * The cache is active only when at least one DistributionSubscriber agent 
is configured.
+     */
+    @Reference
+    private PublisherConfigurationAvailable publisherConfigurationAvailable;
+
+    @Override
+    public Closeable createConsumer(MessageHandler<PackageMessage> handler) {
+        log.info("Starting consumer");
+        QueueCacheSeeder seeder = new 
QueueCacheSeeder(messagingProvider.createSender(topics.getPackageTopic()));
+        Closeable poller = messagingProvider.createPoller(
+                topics.getPackageTopic(),
+                Reset.latest,
+                create(PackageMessage.class, (info, message) -> { 
seeder.close(); handler.handle(info, message); }) 
+                );
+        seeder.startSeeding();
+        return () -> IOUtils.closeQuietly(seeder, poller);
+    }
+    
+    @Override
+    public List<FullMessage<PackageMessage>> fetchRange(long minOffset, long 
maxOffset) throws InterruptedException {
+        distributionMetricsService.getQueueCacheFetchCount().increment();
+        return new RangePoller(messagingProvider, topics.getPackageTopic(), 
minOffset, maxOffset)
+                .fetchRange();
+    }
+
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
index ea3adc5..01f4ef0 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifier.java
@@ -28,8 +28,8 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
-import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import 
org.apache.sling.distribution.journal.messages.PackageDistributedMessage;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -57,7 +57,7 @@ public class PackageDistributedNotifier implements 
TopologyChangeHandler {
     private EventAdmin eventAdmin;
 
     @Reference
-    private PubQueueCacheService pubQueueCacheService;
+    private PubQueueProvider pubQueueCacheService;
 
     @Reference
     private MessagingProvider messagingProvider;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
similarity index 98%
rename from 
src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
rename to 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
index 26a6eaa..bbf27b6 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeeder.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
similarity index 94%
rename from 
src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
rename to 
src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
index 72e7c4a..040db85 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePoller.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/RangePoller.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
@@ -55,12 +55,12 @@ public class RangePoller {
     public RangePoller(MessagingProvider messagingProvider,
                           String packageTopic,
                           long minOffset,
-                          long maxOffset) {
-        this.maxOffset = maxOffset;
+                          long maxOffsetExclusive) {
+        this.maxOffset = maxOffsetExclusive;
         this.minOffset = minOffset;
         this.messages = new ArrayList<>();
         String assign = messagingProvider.assignTo(minOffset);
-        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffset);
+        LOG.info("Fetching offsets [{},{}[", minOffset, maxOffsetExclusive);
         headPoller = messagingProvider.createPoller(
                 packageTopic, Reset.earliest, assign,
                 create(PackageMessage.class, this::handlePackage)
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java
similarity index 56%
copy from 
src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
copy to 
src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java
index bc349b9..43bfeb9 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/CacheCallback.java
@@ -18,23 +18,14 @@
  */
 package org.apache.sling.distribution.journal.impl.queue;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.ParametersAreNonnullByDefault;
+import java.io.Closeable;
+import java.util.List;
 
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
-
-@ParametersAreNonnullByDefault
-public interface PubQueueProvider {
-
-    @Nonnull
-    DistributionQueue getQueue(QueueId queueId, long minOffset, int 
headRetries, @Nullable ClearCallback clearCallback);
-
-    @Nonnull
-    DistributionQueue getErrorQueue(QueueId queueId);
-
-    void handleStatus(MessageInfo info, PackageStatusMessage message);
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 
+public interface CacheCallback {
+    Closeable createConsumer(MessageHandler<PackageMessage> handler);
+    List<FullMessage<PackageMessage>> fetchRange(long minOffset, long 
maxOffset) throws InterruptedException;
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
index bc349b9..de57e77 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/PubQueueProvider.java
@@ -24,6 +24,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 @ParametersAreNonnullByDefault
@@ -34,6 +35,9 @@ public interface PubQueueProvider {
 
     @Nonnull
     DistributionQueue getErrorQueue(QueueId queueId);
+    
+    @Nonnull
+    OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, 
long minOffset);
 
     void handleStatus(MessageInfo info, PackageStatusMessage message);
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index fdd6ff7..11f410a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -22,7 +22,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.stream.Collectors.groupingBy;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 
 import java.io.Closeable;
 import java.util.HashSet;
@@ -40,20 +39,17 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.sling.distribution.journal.impl.queue.CacheCallback;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 
 /**
  * Cache the distribution packages fetched from the package topic.
@@ -102,35 +98,16 @@ public class PubQueueCache {
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
-    private final MessagingProvider messagingProvider;
-
     private final EventAdmin eventAdmin;
 
     private volatile Closeable tailPoller;
 
-    private final QueueCacheSeeder seeder;
-
-    private final String topic;
-
-    private final DistributionMetricsService distributionMetricsService;
+    private final CacheCallback callback;
     
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, QueueCacheSeeder seeder) {
-        this.messagingProvider = messagingProvider;
+    public PubQueueCache(EventAdmin eventAdmin, CacheCallback callback) {
         this.eventAdmin = eventAdmin;
-        this.distributionMetricsService = distributionMetricsService;
-        this.topic = topic;
-        this.seeder = seeder;
-        startPoller();
-        this.seeder.startSeeding();
-    }
-
-    private void startPoller() {
-        LOG.info("Starting consumer");
-        tailPoller = messagingProvider.createPoller(
-                this.topic,
-                Reset.latest,
-                create(PackageMessage.class, this::handlePackage) 
-                );
+        this.callback = callback;
+        tailPoller = callback.createConsumer(this::handlePackage);
     }
 
     @Nonnull
@@ -148,7 +125,6 @@ public class PubQueueCache {
 
     public void close() {
         IOUtils.closeQuietly(tailPoller);
-        IOUtils.closeQuietly(seeder);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
 
@@ -201,12 +177,8 @@ public class PubQueueCache {
      * cache.
      */
     private void fetch(long requestedMinOffset, long cachedMinOffset) throws 
InterruptedException {
-        distributionMetricsService.getQueueCacheFetchCount().increment();
-        RangePoller headPoller = new RangePoller(messagingProvider,
-                topic,
-                requestedMinOffset,
-                cachedMinOffset);
-        merge(headPoller.fetchRange());
+        List<FullMessage<PackageMessage>> messages = 
callback.fetchRange(requestedMinOffset, cachedMinOffset);
+        merge(messages);
         updateMinOffset(requestedMinOffset);
     }
 
@@ -274,7 +246,6 @@ public class PubQueueCache {
     }
 
     private void handlePackage(final MessageInfo info, final PackageMessage 
message) {
-        seeder.close();
         merge(singletonList(new FullMessage<>(info, message)));
         updateMaxOffset(info.getOffset());
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index b8d6692..cc9789b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -21,14 +21,8 @@ package 
org.apache.sling.distribution.journal.impl.queue.impl;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.distribution.journal.impl.queue.CacheCallback;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import 
org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
-import org.apache.sling.distribution.journal.shared.Topics;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.JournalAvailable;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -38,7 +32,7 @@ import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(immediate = true, service = PubQueueCacheService.class)
+@Component(service = PubQueueCacheService.class)
 @ParametersAreNonnullByDefault
 public class PubQueueCacheService {
 
@@ -50,40 +44,19 @@ public class PubQueueCacheService {
      */
     private static final int CLEANUP_THRESHOLD = 10_000;
 
-    /**
-     * Will cause the cache to be cleared when we loose the journal
-     */
-    @Reference
-    private JournalAvailable journalAvailable;
-
-    /**
-     * The cache is active only when at least one DistributionSubscriber agent 
is configured.
-     */
-    @Reference
-    private PublisherConfigurationAvailable publisherConfigurationAvailable;
-
-    @Reference
-    private MessagingProvider messagingProvider;
-
-    @Reference
-    private Topics topics;
-
     @Reference
     private EventAdmin eventAdmin;
 
     @Reference
-    private DistributionMetricsService distributionMetricsService;
+    private CacheCallback callback;
 
     private volatile PubQueueCache cache;
 
     public PubQueueCacheService() {}
 
-    public PubQueueCacheService(MessagingProvider messagingProvider,
-                                Topics topics,
-                                EventAdmin eventAdmin) {
-        this.messagingProvider = messagingProvider;
-        this.topics = topics;
+    public PubQueueCacheService(EventAdmin eventAdmin, CacheCallback callback) 
{
         this.eventAdmin = eventAdmin;
+        this.callback = callback;
     }
 
     @Activate
@@ -130,9 +103,6 @@ public class PubQueueCacheService {
     }
 
     private PubQueueCache newCache() {
-        String topic = topics.getPackageTopic();
-        MessageSender<PackageMessage> sender = 
messagingProvider.createSender(topic);
-        QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
-        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topic, seeder);
+        return new PubQueueCache(eventAdmin, callback);
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index f866d36..b34436f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -99,6 +99,11 @@ public class PubQueueProviderImpl implements 
PubQueueProvider {
         return new PubErrQueue(queueId.getQueueName(), agentQueue, errorQueue);
     }
 
+    @Override
+    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) {
+        return pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
+    }
+
     public void handleStatus(MessageInfo info, PackageStatusMessage message) {
         if (message.getStatus() == Status.REMOVED_FAILED) {
             QueueId queueId = new QueueId(message.getPubAgentName(), 
message.getSubSlingId(), message.getSubAgentName(), "");
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
new file mode 100644
index 0000000..8c66c85
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/MessagingCacheCallbackTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.publisher;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.distribution.journal.FullMessage;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
+import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.shared.Topics;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MessagingCacheCallbackTest {
+    @Mock
+    private MessagingProvider messagingProvider;
+    
+    @Spy
+    private Topics topics;
+    
+    @Mock
+    private JournalAvailable journalAvailable;
+    
+    @Mock
+    private DistributionMetricsService distributionMetricsService;
+    
+    @Mock
+    private MessageHandler<PackageMessage> handler;
+
+    @Mock
+    private MessageSender<Object> sender;
+    
+    @Mock
+    private Counter counter;
+
+    @InjectMocks
+    private MessagingCacheCallback callback;
+
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+
+    @Test
+    public void testCreateConsumer() throws Exception {
+        when(messagingProvider.createSender(Mockito.any())).thenReturn(sender);
+        Closeable poller = callback.createConsumer(handler);
+        assertThat(poller, notNullValue());
+        
+        poller.close();
+    }
+
+    @Test
+    public void testFetchRange() throws Exception {
+        
when(distributionMetricsService.getQueueCacheFetchCount()).thenReturn(counter);
+        when(messagingProvider.assignTo(Mockito.eq(10l))).thenReturn("0:10");
+        CompletableFuture<List<FullMessage<PackageMessage>>> result = 
CompletableFuture.supplyAsync(this::fetch);
+        verify(messagingProvider, timeout(100000)).createPoller(
+                Mockito.anyString(), 
+                Mockito.eq(Reset.earliest), 
+                Mockito.eq("0:10"),
+                handlerCaptor.capture());
+        simulateMessage(19);
+        simulateMessage(20);
+        List<FullMessage<PackageMessage>> messages = result.get(100, 
TimeUnit.SECONDS);
+        assertThat(messages.size(), equalTo(1));
+    }
+
+    private void simulateMessage(int offset) {
+        FullMessage<PackageMessage> message = 
RangePollerTest.createMessage(ReqType.ADD, offset);
+        handlerCaptor.getValue().getHandler().handle(message.getInfo(), 
message.getMessage());
+    }
+
+    List<FullMessage<PackageMessage>> fetch() {
+        try {
+            return callback.fetchRange(10l, 20l);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException();
+        }
+    }
+
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
index b41029c..e256c63 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/PackageDistributedNotifierTest.java
@@ -33,7 +33,7 @@ import java.util.HashSet;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
-import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.junit.Before;
@@ -45,7 +45,7 @@ import org.osgi.service.event.EventAdmin;
 public class PackageDistributedNotifierTest {
 
     @Mock
-    private PubQueueCacheService pubQueueCacheService;
+    private PubQueueProvider pubQueueCacheService;
 
     @Mock
     private OffsetQueue<DistributionQueueItem> offsetQueue;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
similarity index 97%
rename from 
src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
rename to 
src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
index 29a5133..8a3cf76 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/QueueCacheSeederTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.timeout;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
similarity index 96%
rename from 
src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
rename to 
src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
index 6d82121..217bd59 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/RangePollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/RangePollerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.impl.queue.impl;
+package org.apache.sling.distribution.journal.impl.publisher;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.contains;
@@ -107,7 +107,7 @@ public class RangePollerTest {
         }
     }
 
-    private FullMessage<PackageMessage> createMessage(ReqType reqType, int 
offset) {
+    public static FullMessage<PackageMessage> createMessage(ReqType reqType, 
long offset) {
         MessageInfo info = new TestMessageInfo(TOPIC, 0, offset, 
System.currentTimeMillis());
         PackageMessage message = PackageMessage.builder()
                 .pubAgentName("agent1")
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index aca7d01..a1db345 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -20,39 +20,32 @@ package 
org.apache.sling.distribution.journal.impl.queue.impl;
 
 import static java.lang.System.currentTimeMillis;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.stream.LongStream;
 
 import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.impl.queue.CacheCallback;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,35 +75,20 @@ public class PubQueueCacheTest {
     private static final Random RAND = new Random();
 
     @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+    private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Mock
     private EventAdmin eventAdmin;
 
     @Mock
-    private MessagingProvider clientProvider;
-
-    @Mock
-    private QueueCacheSeeder cacheSeeder;
-
-    @Mock
-    private DistributionMetricsService distributionMetricsService;
+    private CacheCallback callback;
 
     @Mock
     private Counter counter;
 
     @Mock
-    private LocalStore seedStore;
-
-    @Mock
     private Closeable poller;
 
-    @Mock
-    private MessageSender<Object> sender;
-
-    @Mock
-    private QueueCacheSeeder seeder;
-
     private PubQueueCache cache;
 
     private ExecutorService executor;
@@ -120,29 +98,12 @@ public class PubQueueCacheTest {
 
     @Before
     public void before() {
-        when(clientProvider.createPoller(
-                eq(TOPIC),
-                eq(Reset.latest),
-                handlerCaptor.capture()))
+        when(callback.createConsumer(handlerCaptor.capture()))
                 .thenReturn(poller);
-        when(clientProvider.createPoller(
-                eq(TOPIC),
-                eq(Reset.earliest),
-                Mockito.anyString(),
-                handlerCaptor.capture()))
-                .thenReturn(poller);
-        when(clientProvider.createSender(Mockito.anyString()))
-            .thenReturn(sender);
-
-        when(distributionMetricsService.getQueueCacheFetchCount())
-            .thenReturn(counter);
 
-        when(seedStore.load(anyString(), any())).thenReturn(0L);
-
-        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, seeder);
-        verify(seeder).startSeeding();
+        cache = new PubQueueCache(eventAdmin, callback);
         executor = Executors.newFixedThreadPool(10);
-        tailHandler = handlerCaptor.getValue().getHandler();
+        tailHandler = handlerCaptor.getValue();
     }
 
     @After
@@ -165,21 +126,13 @@ public class PubQueueCacheTest {
 
     @Test
     public void testFetchWithSingleConsumer() throws Exception {
-        simulateMessage(tailHandler, 200);
+        simulateMessage(tailHandler, 200l);
+        when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+                .thenReturn(Arrays.asList(createTestMessage(100, 
PUB_AGENT_NAME_1, ReqType.ADD)));
         Future<OffsetQueue<DistributionQueueItem>> consumer = 
consumer(PUB_AGENT_NAME_1, 100);
-        // seeding the cache with a message at offset 200
-        // wait that the consumer has started fetching the offsets from 100 to 
200
-        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
-        // simulate messages for the fetched offsets
-        simulateMessages(headHandler, 100, cache.getMinOffset());
         // the consumer returns the offset queue
         consumer.get(15, SECONDS);
-        assertEquals(100, cache.getMinOffset());
-    }
-
-    private MessageHandler<PackageMessage> awaitHeadHandler() {
-        return Awaitility.await().ignoreExceptions()
-                .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), 
notNullValue());
+        assertEquals(100l, cache.getMinOffset());
     }
 
        @Test
@@ -188,18 +141,15 @@ public class PubQueueCacheTest {
         // build two consumers for same agent queue, from offset 100
         Future<OffsetQueue<DistributionQueueItem>> consumer1 = 
consumer(PUB_AGENT_NAME_1, 100);
         Future<OffsetQueue<DistributionQueueItem>> consumer2 = 
consumer(PUB_AGENT_NAME_1, 100);
-        // seeding the cache with a message at offset 200
-        // wait that one consumer has started fetching the offsets from 100 to 
200
-        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
-        // simulate messages for the fetched offsets
-        simulateMessages(headHandler, 100, cache.getMinOffset());
-        // both consumers returns the offset queue
+        when(callback.fetchRange(Mockito.eq(100l), Mockito.eq(200l)))
+        .thenReturn(Arrays.asList(createTestMessage(100, PUB_AGENT_NAME_1, 
ReqType.ADD)));
         OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
         OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
         assertEquals(q1.getSize(), q2.getSize());
         assertEquals(100, cache.getMinOffset());
-        // the offsets have been fetched only once
-        assertEquals(2, handlerCaptor.getAllValues().size());
+        
+        // Fetch should only happen once
+        verify(callback, times(1)).fetchRange(Mockito.anyLong(), 
Mockito.anyLong());
     }
 
     @Test
@@ -213,11 +163,6 @@ public class PubQueueCacheTest {
         assertEquals(4, cache.size());
     }
 
-    private void simulateMessages(MessageHandler<PackageMessage> handler, long 
fromOffset, long toOffset) {
-        LongStream.rangeClosed(fromOffset, toOffset)
-            .forEach(offset -> simulateMessage(handler, offset));
-    }
-    
     private void simulateMessage(MessageHandler<PackageMessage> handler, long 
offset) {
         simulateMessage(handler,
                 pickAny(PUB_AGENT_NAME_1, PUB_AGENT_NAME_2, PUB_AGENT_NAME_3),
@@ -225,19 +170,31 @@ public class PubQueueCacheTest {
     }
 
     private void simulateMessage(MessageHandler<PackageMessage> handler, 
String pubAgentName, ReqType reqType, long offset) {
-        PackageMessage msg = PackageMessage.builder()
+        PackageMessage msg = createTestMessage(pubAgentName, reqType);
+        simulateMessage(handler, msg, offset);
+    }
+
+    private void simulateMessage(MessageHandler<PackageMessage> handler, 
PackageMessage msg, long offset) {
+        log.info("Simulate msg @ offset {}", offset);
+        handler.handle(createInfo(offset), msg);
+    }
+    
+    private FullMessage<PackageMessage> createTestMessage(long offset, String 
pubAgentName, ReqType reqType) {
+        return new FullMessage<>(createInfo(offset), 
createTestMessage(pubAgentName, reqType));
+    }
+
+    private PackageMessage createTestMessage(String pubAgentName, ReqType 
reqType) {
+        return PackageMessage.builder()
                 .pkgType("pkgType")
                 .pkgId(UUID.randomUUID().toString())
                 .pubSlingId("pubSlingId")
                 .reqType(reqType)
                 .pubAgentName(pubAgentName)
                 .build();
-        simulateMessage(handler, msg, offset);
     }
 
-    private void simulateMessage(MessageHandler<PackageMessage> handler, 
PackageMessage msg, long offset) {
-        log.info("Simulate msg @ offset {}", offset);
-        handler.handle(new TestMessageInfo(TOPIC, 0, offset, 
currentTimeMillis()), msg);
+    private MessageInfo createInfo(long offset) {
+        return new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis());
     }
 
     Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, 
long minOffset) {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index ea71824..dc8bf32 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -45,6 +45,7 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.queue.CacheCallback;
 import org.apache.sling.distribution.journal.impl.queue.QueueId;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
@@ -77,7 +78,7 @@ public class PubQueueProviderTest {
     private MessagingProvider clientProvider;
     
     @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
+    private ArgumentCaptor<MessageHandler<PackageMessage>> handlerCaptor;
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageStatusMessage>> 
statHandlerCaptor;
@@ -94,6 +95,9 @@ public class PubQueueProviderTest {
     @Mock
     private MessageSender<Object> sender;
 
+    @Mock
+    private CacheCallback callback;
+
     private PubQueueCacheService pubQueueCacheService;
 
     private MessageHandler<PackageMessage> handler;
@@ -104,24 +108,18 @@ public class PubQueueProviderTest {
     @Before
     public void before() throws PersistenceException {
         MockitoAnnotations.initMocks(this);
-        when(clientProvider.createPoller(
-                Mockito.eq(Topics.PACKAGE_TOPIC),
-                Mockito.any(Reset.class),
-                handlerCaptor.capture()))
-        .thenReturn(poller);
+        when(callback.createConsumer(handlerCaptor.capture()))
+                .thenReturn(poller);
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.STATUS_TOPIC), 
                 Mockito.any(Reset.class),
                 statHandlerCaptor.capture()))
         .thenReturn(statPoller);
-        when(clientProvider.createSender(Mockito.anyString()))
-        .thenReturn(sender);
-        Topics topics = new Topics();
-        pubQueueCacheService = new PubQueueCacheService(clientProvider, 
topics, eventAdmin);
+        pubQueueCacheService = new PubQueueCacheService(eventAdmin, callback);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService);
         queueProvider.activate();
-        handler = handlerCaptor.getValue().getHandler();
+        handler = handlerCaptor.getValue();
     }
 
     @After

Reply via email to