This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new d28b841  SLING-9460 - Avoid seeding messages in PubQueueCache
d28b841 is described below

commit d28b841a1d1007bae8d370e41986d4cb6d6e02a0
Author: Christian Schneider <[email protected]>
AuthorDate: Sat May 23 15:55:34 2020 +0200

    SLING-9460 - Avoid seeding messages in PubQueueCache
---
 .../journal/impl/publisher/DiscoveryService.java   |  12 +-
 .../journal/impl/queue/impl/PubQueueCache.java     | 112 +++---------------
 .../impl/queue/impl/PubQueueCacheService.java      |  13 +-
 .../impl/publisher/DiscoveryServiceTest.java       | 108 +++++++++++------
 .../journal/impl/queue/impl/PubQueueCacheTest.java | 131 +++++----------------
 .../impl/queue/impl/PubQueueProviderTest.java      |   2 +
 6 files changed, 140 insertions(+), 238 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 128d453..2e0b665 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -29,6 +29,7 @@ import java.util.Hashtable;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
@@ -78,6 +79,9 @@ public class DiscoveryService implements Runnable {
 
     @Reference
     private TopologyChangeHandler topologyChangeHandler;
+    
+    @Reference
+    private PubQueueCacheService pubQueueCacheService;
 
     private volatile ServiceRegistration<?> reg;
 
@@ -91,10 +95,12 @@ public class DiscoveryService implements Runnable {
     public DiscoveryService(
             MessagingProvider messagingProvider,
             TopologyChangeHandler topologyChangeHandler,
-            Topics topics) {
+            Topics topics,
+            PubQueueCacheService pubQueueCacheService) {
         this.messagingProvider = messagingProvider;
         this.topologyChangeHandler = topologyChangeHandler;
         this.topics = topics;
+        this.pubQueueCacheService = pubQueueCacheService;
     }
 
     @Activate
@@ -151,12 +157,14 @@ public class DiscoveryService implements Runnable {
 
             long now = System.currentTimeMillis();
             AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), 
disMsg.getSubAgentName());
-
+            long minOffset = Long.MAX_VALUE;
             for (Messages.SubscriberState subStateMsg : 
disMsg.getSubscriberStateList()) {
                 SubscriberConfiguration subConfig = 
disMsg.getSubscriberConfiguration();
                 State subState = new State(subStateMsg.getPubAgentName(), 
subAgentId.getAgentId(), now, subStateMsg.getOffset(), 
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable());
                 viewManager.refreshState(subState);
+                minOffset = Math.min(minOffset, subState.getOffset());
             }
+            pubQueueCacheService.seed(minOffset);
         }
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 438af2e..ede183d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
 
-import static java.lang.System.currentTimeMillis;
 import static org.apache.sling.distribution.journal.HandlerAdapter.create;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -30,9 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -55,11 +52,8 @@ import 
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.FullMessage;
 import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.RunnableUtil;
 
 /**
  * Cache the distribution packages fetched from the package topic.
@@ -85,12 +79,6 @@ public class PubQueueCache {
     private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues 
= new ConcurrentHashMap<>();
 
     /**
-     * Blocks the threads awaiting until the agentQueues
-     * cache has been seeded.
-     */
-    private final CountDownLatch seeded = new CountDownLatch(1);
-
-    /**
      * Only allows to fetch data from the journal
      * with a single thread.
      */
@@ -110,37 +98,35 @@ public class PubQueueCache {
 
     private final EventAdmin eventAdmin;
 
-    private final Closeable tailPoller;
+    private volatile Closeable tailPoller;
 
     private final String topic;
 
-    private final long seedingDelayMs;
-
     private final DistributionMetricsService distributionMetricsService;
-
-    /**
-     * Way out for the threads awaiting on the seeded
-     * latch, when the component is deactivated.
-     */
+    
     private volatile boolean closed;
 
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, long seedingDelayMs) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
-        this.seedingDelayMs = seedingDelayMs;
         this.topic = topic;
+    }
 
-        tailPoller = messagingProvider.createPoller(
-                topic,
+    public void seed(long offset) {
+        if (tailPoller == null) {
+            String assignTo = messagingProvider.assignTo(offset);
+            tailPoller = messagingProvider.createPoller(
+                this.topic,
                 Reset.latest,
+                assignTo,
                 create(PackageMessage.class, this::handlePackage));
-
-        RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+        }
     }
 
     @Nonnull
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String 
pubAgentName, long minOffset) throws InterruptedException {
+        waitSeeded();
         fetchIfNeeded(minOffset);
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
@@ -150,62 +136,11 @@ public class PubQueueCache {
     }
 
     public void close() {
-
-        /*
-         * Note that we don't close resources using Thread.interrupt()
-         * because interrupts can stop the Apache Oak repository.
-         *
-         * See SLING-9340, OAK-2609 and 
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
-         */
-
         closed = true;
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
 
-    private void seedCache() {
-        LOG.info("Start message seeder");
-        try {
-            MessageSender<PackageMessage> sender = 
messagingProvider.createSender();
-            do {
-                sendSeedingMessage(sender);
-            } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } finally {
-            LOG.info("Stop message seeder");
-        }
-    }
-
-    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
-        PackageMessage pkgMsg = createTestMessage();
-        LOG.info("Send seeding message");
-        try {
-            sender.send(topic, pkgMsg);
-        } catch (MessagingException e) {
-            LOG.warn(e.getMessage(), e);
-            delay(seedingDelayMs * 10);
-        }
-    }
-
-    private static void delay(long sleepMs) {
-        try {
-            Thread.sleep(sleepMs);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private PackageMessage createTestMessage() {
-        String pkgId = UUID.randomUUID().toString();
-        return PackageMessage.newBuilder()
-                .setPubSlingId("seeder")
-                .setPkgId(pkgId)
-                .setPkgType("seeder")
-                .setReqType(PackageMessage.ReqType.TEST)
-                .build();
-    }
-
     /**
      * Fetch the package messages from the requested min offset,
      * up to the current cached min offset.
@@ -213,13 +148,7 @@ public class PubQueueCache {
      * @param requestedMinOffset the min offset to start fetching data from
      */
     private void fetchIfNeeded(long requestedMinOffset) throws 
InterruptedException {
-
-        // We wait on the cache to be seeded (at least one message handled)
-        // before computing potential missing offsets.
-        waitSeeded();
-
         long cachedMinOffset = getMinOffset();
-
         if (requestedMinOffset < cachedMinOffset) {
 
             LOG.debug("Requested min offset {} smaller than cached min offset 
{}", requestedMinOffset, cachedMinOffset);
@@ -271,15 +200,14 @@ public class PubQueueCache {
     }
 
     private void waitSeeded() throws InterruptedException {
-        long start = currentTimeMillis();
-        while (!closed && currentTimeMillis() - start < MAX_FETCH_WAIT_MS) {
-            if (seeded.await(seedingDelayMs, MILLISECONDS)) {
-                return;
-            } else {
-                LOG.debug("Waiting for seeded cache");
+        long start = System.currentTimeMillis();
+        while (getMinOffset() == Long.MAX_VALUE) {
+            LOG.debug("Waiting for seeded cache");
+            if (closed || System.currentTimeMillis() - start > 
MAX_FETCH_WAIT_MS) {
+                throw new RuntimeException("Gave up waiting for seeded cache");
             }
+            Thread.sleep(1000);
         }
-        throw new RuntimeException("Gave up waiting for seeded cache");
     }
 
     protected long getMinOffset() {
@@ -340,9 +268,5 @@ public class PubQueueCache {
 
     private void handlePackage(final MessageInfo info, final PackageMessage 
message) {
         merge(singletonList(new FullMessage<>(info, message)));
-        if (seeded.getCount() > 0) {
-            LOG.info("Cache has been seeded");
-        }
-        seeded.countDown();
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 8f49687..6609b8b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -57,11 +57,6 @@ public class PubQueueCacheService implements Runnable {
     private static final int CLEANUP_THRESHOLD = 10_000;
 
     /**
-     * Interval in millisecond between two seeding messages to seed the cache.
-     */
-    private static final long CACHE_SEEDING_DELAY_MS = 10_000;
-
-    /**
      * Will cause the cache to be cleared when we loose the journal
      */
     @Reference
@@ -115,6 +110,12 @@ public class PubQueueCacheService implements Runnable {
         }
     }
 
+    public void seed(long offset) {
+        if (cache != null) {
+            cache.seed(offset);
+        }
+    }
+
     private void cleanup() {
         if (cache != null) {
             int size = cache.size();
@@ -129,7 +130,7 @@ public class PubQueueCacheService implements Runnable {
     }
 
     private PubQueueCache newCache() {
-        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topics.getPackageTopic(), CACHE_SEEDING_DELAY_MS);
+        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topics.getPackageTopic());
     }
 
     @Override
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 02a58c1..736618e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -18,50 +18,84 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.UUID;
 
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
+import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
 import org.mockito.Mockito;
-
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.framework.BundleContext;
 
-@SuppressWarnings({ "unchecked", "rawtypes" })
+@RunWith(MockitoJUnitRunner.class)
 public class DiscoveryServiceTest {
 
     private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
     private static final String SUB1_AGENT = "subagent";
     private static final String PUB1_AGENT_NAME = "pubagent1";
     
+    @Mock
     private Closeable poller;
+    
+    @Mock
+    BundleContext bundleContext;
+
+    @Mock
+    MessagingProvider clientProvider;
+    
+    @Captor
+    ArgumentCaptor<HandlerAdapter<DiscoveryMessage>> captureHandler;
+
+    @Spy
+    Topics topics = new Topics();
+
+    @Mock
+    TopologyChangeHandler topologyChangeHandler;
+    
+    @Mock
+    private PubQueueCacheService pubQueueCacheService;
+
     private MessageHandler<DiscoveryMessage> discoveryHandler;
+    
     private DiscoveryService discoveryService;
+    
 
     @Before
     public void before() {
-        mockDiscoveryService();
+        discoveryService = new DiscoveryService(
+                clientProvider, topologyChangeHandler, 
+                topics, pubQueueCacheService);
+        when(clientProvider.createPoller(
+                Mockito.anyString(), 
+                Mockito.any(Reset.class),
+                captureHandler.capture())).thenReturn(poller);
+        discoveryService.activate(bundleContext);
+        discoveryHandler = captureHandler.getValue().getHandler();
     }
     
     @Test
@@ -69,13 +103,25 @@ public class DiscoveryServiceTest {
         String subAgentId = SUB1_SLING_ID + "-" + SUB1_AGENT; 
         
assertTrue(discoveryService.getTopologyView().getSubscriberAgentStates(subAgentId).isEmpty());
         
-        MessageInfo info = new TestMessageInfo("topic", 0, 0, 0);
-        DiscoveryMessage message = discoveryMessage(SUB1_SLING_ID, SUB1_AGENT, 
PUB1_AGENT_NAME, 10);
-        discoveryHandler.handle(info, message);
+        DiscoveryMessage message = discoveryMessage(SUB1_SLING_ID, SUB1_AGENT,
+                subscriberState(PUB1_AGENT_NAME, 10));
+        discoveryHandler.handle((MessageInfo) messageInfo(0), message);
 
         discoveryService.run();
         assertThat(discoveryService.getTopologyView().getState(subAgentId, 
PUB1_AGENT_NAME).getOffset(), equalTo(10L));
     }
+
+    @Test
+    public void testPubQueueCacheSeed() throws IOException {
+        DiscoveryMessage message = discoveryMessage(
+                SUB1_SLING_ID, 
+                SUB1_AGENT, 
+                subscriberState(PUB1_AGENT_NAME, 20),
+                subscriberState(PUB1_AGENT_NAME, 10)
+                );
+        discoveryHandler.handle(messageInfo(0), message);
+        verify(pubQueueCacheService).seed(Mockito.eq(10l));
+    }
     
     @After
     public void after() throws IOException {
@@ -88,27 +134,11 @@ public class DiscoveryServiceTest {
         // TODO If a subscriber does not respond after a certain timeout its 
offsets must be purged
     }
 
-    private void mockDiscoveryService() {
-        poller = mock(Closeable.class);
-        BundleContext bundleContext = mock(BundleContext.class);
-        MessagingProvider clientProvider = mock(MessagingProvider.class);
-        ArgumentCaptor<HandlerAdapter> captureHandler =  
ArgumentCaptor.forClass(HandlerAdapter.class);
-        when(clientProvider.createPoller(
-                Mockito.anyString(), 
-                Mockito.any(Reset.class),
-                captureHandler.capture())).thenReturn(poller);
-        Topics topics = mock(Topics.class);
-        TopologyChangeHandler topologyChangeHandler = 
mock(TopologyChangeHandler.class);
-        discoveryService = new DiscoveryService(clientProvider, 
topologyChangeHandler, topics);
-        discoveryService.activate(bundleContext);
-        discoveryHandler = captureHandler.getValue().getHandler();
+    private MessageInfo messageInfo(int offset) {
+        return new TestMessageInfo("topic", 0, offset, 0);
     }
 
-    private DiscoveryMessage discoveryMessage(String subSlingId, String 
subAgentName, String pubAgentName, int offset) {
-        SubscriberState queueoffset = SubscriberState.newBuilder()
-                .setPubAgentName(pubAgentName)
-                .setOffset(offset).build();
-        
+    private DiscoveryMessage discoveryMessage(String subSlingId, String 
subAgentName, SubscriberState... subStates) {
         return DiscoveryMessage.newBuilder()
                 .setSubSlingId(subSlingId)
                 .setSubAgentName(subAgentName)
@@ -117,6 +147,12 @@ public class DiscoveryServiceTest {
                         .setEditable(false)
                         .setMaxRetries(-1)
                         .build())
-                .addSubscriberState(queueoffset).build();
+                .addAllSubscriberState(Arrays.asList(subStates)).build();
+    }
+
+    private SubscriberState subscriberState(String pubAgentName, int offset) {
+        return SubscriberState.newBuilder()
+                .setPubAgentName(pubAgentName)
+                .setOffset(offset).build();
     }
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index c6245b8..7d0808a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -23,37 +23,31 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.ADD;
 import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
 import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
-import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.lang.Thread.State;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.LongStream;
 
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
@@ -71,7 +65,6 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -96,10 +89,7 @@ public class PubQueueCacheTest {
     private ArgumentCaptor<PackageMessage> seedingMessageCaptor;
 
     @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> tailHandlerCaptor;
-
-    @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> headHandlerCaptor;
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
 
     @Captor
     private ArgumentCaptor<String> headAssignCaptor;
@@ -117,9 +107,6 @@ public class PubQueueCacheTest {
     private Counter counter;
 
     @Mock
-    private MessageSender<PackageMessage> pkgSender;
-
-    @Mock
     private Closeable poller;
 
     private PubQueueCache cache;
@@ -135,28 +122,18 @@ public class PubQueueCacheTest {
         when(clientProvider.createPoller(
                 eq(TOPIC),
                 any(Reset.class),
-                tailHandlerCaptor.capture()))
-        .thenReturn(poller);
-
-        when(clientProvider.createPoller(
-                eq(TOPIC),
-                any(Reset.class),
                 headAssignCaptor.capture(),
-                headHandlerCaptor.capture()))
+                handlerCaptor.capture()))
                 .thenReturn(poller);
 
-        when(clientProvider.<PackageMessage>createSender())
-        .thenReturn(pkgSender);
-
         when(distributionMetricsService.getQueueCacheFetchCount())
                 .thenReturn(counter);
 
-        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, 250);
-        verify(pkgSender, timeout(5000)).send(Mockito.eq(TOPIC), 
seedingMessageCaptor.capture());
-        
-        executor = Executors.newFixedThreadPool(10);
+        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC);
+        cache.seed(0);
 
-        tailHandler = tailHandlerCaptor.getValue().getHandler();
+        executor = Executors.newFixedThreadPool(10);
+        tailHandler = handlerCaptor.getValue().getHandler();
     }
 
     @After
@@ -167,7 +144,7 @@ public class PubQueueCacheTest {
 
     @Test
     public void testSeedingFromNewPackageMessage() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
+        Future<OffsetQueue<DistributionQueueItem>> consumer = 
consumer(PUB_AGENT_NAME_1, 0);
         // The consumer is blocked until the cache is seeded
         assertFalse(consumer.isDone());
         // sending any package message seeds the cache
@@ -176,27 +153,13 @@ public class PubQueueCacheTest {
     }
 
     @Test
-    public void testSeedingFromSeedingMessage() throws Exception {
-        ArgumentCaptor<PackageMessage> seedingMsgCaptor = 
ArgumentCaptor.forClass(PackageMessage.class);
-        Future<OffsetQueue<DistributionQueueItem>> consumer = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
-        // The consumer is blocked until the cache is seeded
-        assertFalse(consumer.isDone());
-        // wait until a seeding message is sent and captured
-        verify(pkgSender, timeout(15000).atLeastOnce()).send(eq(TOPIC), 
seedingMsgCaptor.capture());
-        // sending the captured seeding message seeds the cache
-        simulateMessage(tailHandler, seedingMsgCaptor.getValue(), 0);
-        consumer.get(15, SECONDS);
-    }
-
-    @Test
     public void testSeedingConcurrentConsumers() throws Exception {
         List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new 
ArrayList<>();
-        consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_1, 0)));
-        consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_2, 0)));
-        consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_3, 0)));
+        consumers.add(consumer(PUB_AGENT_NAME_1, 0));
+        consumers.add(consumer(PUB_AGENT_NAME_2, 0));
+        consumers.add(consumer(PUB_AGENT_NAME_3, 0));
         // All consumers are blocked until the cache is seeded
         consumers.forEach(future -> assertFalse(future.isDone()));
-        // sending any package message seeds the cache
         simulateMessage(tailHandler, 0);
         consumers.forEach(future -> assertNotNull(get(future)));
     }
@@ -204,45 +167,47 @@ public class PubQueueCacheTest {
     @Test
     public void testFetchWithSingleConsumer() throws Exception {
         // build a consumer form offset 100
-        Future<OffsetQueue<DistributionQueueItem>> consumer = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
+        Future<OffsetQueue<DistributionQueueItem>> consumer = 
consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
         simulateMessage(tailHandler, 200);
         // wait that the consumer has started fetching the offsets from 100 to 
200
-        awaitUntil(() -> headAssignCaptor.getValue() != null);
-        awaitUntil(() -> headHandlerCaptor.getValue() != null);
+        awaitHeadHandler();
         // simulate messages for the fetched offsets
         long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(headHandlerCaptor.getValue().getHandler(), 
fromOffset, cache.getMinOffset());
+        simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, 
cache.getMinOffset());
         // the consumer returns the offset queue
         consumer.get(15, SECONDS);
         assertEquals(100, cache.getMinOffset());
     }
 
+    private MessageHandler<PackageMessage> awaitHeadHandler() {
+        return Awaitility.await().ignoreExceptions().until(() -> 
handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+    }
+
        @Test
     public void testFetchWithConcurrentConsumer() throws Exception {
         // build two consumers for same agent queue, from offset 100
-        Future<OffsetQueue<DistributionQueueItem>> consumer1 = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
-        Future<OffsetQueue<DistributionQueueItem>> consumer2 = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
+        Future<OffsetQueue<DistributionQueueItem>> consumer1 = 
consumer(PUB_AGENT_NAME_1, 100);
+        Future<OffsetQueue<DistributionQueueItem>> consumer2 = 
consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
         simulateMessage(tailHandler, 200);
         // wait that one consumer has started fetching the offsets from 100 to 
200
-        awaitUntil(() -> headAssignCaptor.getValue() != null);
-        awaitUntil(() -> headHandlerCaptor.getValue() != null);
+        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
         long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(headHandlerCaptor.getValue().getHandler(), 
fromOffset, cache.getMinOffset());
+        simulateMessages(headHandler, fromOffset, cache.getMinOffset());
         // both consumers returns the offset queue
         OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
         OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
         assertEquals(q1.getSize(), q2.getSize());
         assertEquals(100, cache.getMinOffset());
         // the offsets have been fetched only once
-        assertEquals(1, headHandlerCaptor.getAllValues().size());
+        assertEquals(2, handlerCaptor.getAllValues().size());
     }
 
     @Test
     public void testCacheSize() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
+        Future<OffsetQueue<DistributionQueueItem>> consumer = 
consumer(PUB_AGENT_NAME_1, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
         simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
@@ -255,34 +220,14 @@ public class PubQueueCacheTest {
 
     @Test(expected = ExecutionException.class)
     public void testCloseUnseededPoller() throws Throwable {
-        FutureTask<OffsetQueue<DistributionQueueItem>> task = new 
FutureTask<>(new Consumer(PUB_AGENT_NAME_1, 0));
-        Thread th = new Thread(task);
-        th.start();
+        Future<OffsetQueue<DistributionQueueItem>> task = 
consumer(PUB_AGENT_NAME_1, 0);
         Awaitility.setDefaultPollDelay(Duration.ZERO);
-        await().until(th::getState, equalTo(State.TIMED_WAITING));
         cache.close();
         task.get();
     }
     
-    @Test
-    public void testFetchWithOnlyTestMessage() throws Exception {
-       long requestedMinOffset = 0;
-               PackageMessage seedingMessage = seedingMessageCaptor.getValue();
-               simulateMessage(tailHandler, seedingMessage, 200000);
-               Future<OffsetQueue<DistributionQueueItem>> consumer = 
executor.submit(new Consumer(PUB_AGENT_NAME_1, requestedMinOffset));
-        awaitUntil(() -> headHandlerCaptor.getValue() != null);
-        MessageHandler<PackageMessage> headHandler = 
headHandlerCaptor.getValue().getHandler();
-        simulateMessage(headHandler, seedingMessage, 200000);
-        consumer.get(10, SECONDS);
-        assertEquals("After we fetched from 0 we expect this to be the cached 
min offset.", 
-                       requestedMinOffset, cache.getMinOffset());
-    }
-
-    private void awaitUntil(Callable<Boolean> callable) {
-               await().atMost(15, SECONDS).ignoreExceptions().until(callable);
-       }
 
-       private void simulateMessages(MessageHandler<PackageMessage> handler, 
long fromOffset, long toOffset) {
+    private void simulateMessages(MessageHandler<PackageMessage> handler, long 
fromOffset, long toOffset) {
         LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> 
simulateMessage(handler, offset));
     }
     
@@ -308,28 +253,14 @@ public class PubQueueCacheTest {
         handler.handle(new TestMessageInfo(TOPIC, 0, offset, 
currentTimeMillis()), msg);
     }
 
-
-    private class Consumer implements 
Callable<OffsetQueue<DistributionQueueItem>> {
-
-        final String pubAgentName;
-
-        final long minOffset;
-
-        private Consumer(String pubAgentName, long minOffset) {
-            this.pubAgentName = pubAgentName;
-            this.minOffset = minOffset;
-        }
-
-        @Override
-        public OffsetQueue<DistributionQueueItem> call() throws Exception {
-            return cache.getOffsetQueue(pubAgentName, minOffset);
-        }
+    Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, 
long minOffset) {
+        return executor.submit(() -> cache.getOffsetQueue(pubAgentName, 
minOffset));
     }
 
     private OffsetQueue<DistributionQueueItem> 
get(Future<OffsetQueue<DistributionQueueItem>> future) {
         try {
-            return future.get();
-        } catch (Exception e) {
+            return future.get(1, SECONDS);
+        } catch (TimeoutException | InterruptedException | ExecutionException 
e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 7edfb05..3a1a27a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -114,6 +114,7 @@ public class PubQueueProviderTest {
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
                 Mockito.any(Reset.class),
+                Mockito.anyString(),
                 handlerCaptor.capture()))
         .thenReturn(poller);
         when(clientProvider.createPoller(
@@ -129,6 +130,7 @@ public class PubQueueProviderTest {
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, 
clientProvider, topics);
         queueProvider.activate();
+        pubQueueCacheService.seed(0);
         handler = handlerCaptor.getValue().getHandler();
         statHandler = statHandlerCaptor.getValue().getHandler();
     }

Reply via email to