This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9577
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 6345a750c435f444c289d00f194ff390d6668dbf
Author: Christian Schneider <[email protected]>
AuthorDate: Fri Jul 10 10:18:30 2020 +0200

    SLING-9577 - Switch to seeding thread
---
 .../journal/impl/queue/impl/PubQueueCache.java     | 60 +++-------------
 .../impl/queue/impl/PubQueueCacheService.java      | 28 ++------
 .../journal/impl/queue/impl/QueueCacheSeeder.java  | 82 +++++++++++-----------
 .../impl/queue/impl/QueueCacheSeederTask.java      | 61 ----------------
 .../journal/impl/queue/impl/PubQueueCacheTest.java | 54 +++++++-------
 .../impl/queue/impl/PubQueueProviderTest.java      |  4 --
 .../impl/queue/impl/QueueCacheSeederTest.java      | 61 ++--------------
 7 files changed, 90 insertions(+), 260 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 940313b..5546c63 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -37,13 +37,11 @@ import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.shared.JMXRegistration;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -102,12 +100,6 @@ public class PubQueueCache {
      */
     private final AtomicLong maxOffset = new AtomicLong(-1L);
 
-    /**
-     * Holds the last known seed offset stored to the
-     * seed store.
-     */
-    private volatile long seedOffset = -1L;
-
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
     private final MessagingProvider messagingProvider;
@@ -120,44 +112,23 @@ public class PubQueueCache {
 
     private final String topic;
 
-    private final LocalStore seedStore;
-
     private final DistributionMetricsService distributionMetricsService;
     
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, LocalStore seedStore, QueueCacheSeeder seeder) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, QueueCacheSeeder seeder) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
         this.topic = topic;
-        this.seedStore = seedStore;
         this.seeder = seeder;
-        Long offset = seedStore.load("offset", Long.class);
-        if (offset != null) {
-            seedOffset = offset;
-            startPoller(seedOffset);
-            /*
-             * We need at least one seeding message
-             * for cases where the seedOffset is no
-             * longer on the journal.
-             */
-            seeder.seedOne();
-        } else {
-            /*
-             * Fallback to seeding messages when
-             * no offset could be found in the
-             * repository.
-             */
-            seeder.seed(this::startPoller);
-        }
+        startPoller();
+        this.seeder.startSeeding();
     }
 
-    private void startPoller(long offset) {
-        LOG.info("Seed with offset: {}", offset);
-        String assignTo = messagingProvider.assignTo(offset);
+    private void startPoller() {
+        LOG.info("Starting consumer");
         tailPoller = messagingProvider.createPoller(
                 this.topic,
-                Reset.earliest,
-                assignTo,
+                Reset.latest,
                 create(PackageMessage.class, this::handlePackage) 
                 );
     }
@@ -171,23 +142,6 @@ public class PubQueueCache {
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
 
-    public void storeSeed() {
-        long newSeed = maxOffset.longValue();
-        if (newSeed > seedOffset) {
-            storeSeed(newSeed);
-            seedOffset = newSeed;
-        }
-    }
-
-    private void storeSeed(long offset) {
-        LOG.info("Store seed offset {}", offset);
-        try {
-            seedStore.store("offset", offset);
-        } catch (PersistenceException e) {
-            LOG.warn("Failed to persist seed offset", e);
-        }
-    }
-
     public int size() {
         return 
agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
     }
@@ -320,6 +274,8 @@ public class PubQueueCache {
     }
 
     private void handlePackage(final MessageInfo info, final PackageMessage 
message) {
+        LOG.info("Receive package {} {}", info, message);
+        seeder.close();
         merge(singletonList(new FullMessage<>(info, message)));
         updateMaxOffset(info.getOffset());
     }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index f9291ed..779e8bd 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -23,12 +23,13 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import 
org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.service.component.annotations.Activate;
@@ -75,16 +76,8 @@ public class PubQueueCacheService {
     @Reference
     private DistributionMetricsService distributionMetricsService;
 
-    @Reference
-    private SlingSettingsService slingSettings;
-
-    @Reference
-    private ResourceResolverFactory resolverFactory;
-
     private volatile PubQueueCache cache;
 
-    private String pubSlingId;
-
     public PubQueueCacheService() {}
 
     public PubQueueCacheService(MessagingProvider messagingProvider,
@@ -96,14 +89,10 @@ public class PubQueueCacheService {
         this.messagingProvider = messagingProvider;
         this.topics = topics;
         this.eventAdmin = eventAdmin;
-        this.slingSettings = slingSettingsService;
-        this.resolverFactory = resolverFactory;
-        this.pubSlingId = pubSlingId;
     }
 
     @Activate
     public void activate() {
-        pubSlingId = slingSettings.getSlingId();
         cache = newCache();
         LOG.info("Started Publisher queue cache service");
     }
@@ -145,17 +134,10 @@ public class PubQueueCacheService {
         }
     }
 
-    public void storeSeed() {
-        PubQueueCache queueCache = this.cache;
-        if (queueCache != null) {
-            queueCache.storeSeed();
-        }
-    }
-
     private PubQueueCache newCache() {
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", 
pubSlingId);
         String topic = topics.getPackageTopic();
-        QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, 
topic);
-        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topic, seedStore, seeder);
+        MessageSender<PackageMessage> sender = 
messagingProvider.createSender(topic);
+        QueueCacheSeeder seeder = new QueueCacheSeeder(sender);
+        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topic, seeder);
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
index e3ac96c..9ffd37c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -18,81 +18,71 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
+import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
 import java.io.Closeable;
 import java.util.UUID;
-import java.util.function.LongConsumer;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-
 public class QueueCacheSeeder implements Closeable {
-
-
     private static final Logger LOG = 
LoggerFactory.getLogger(QueueCacheSeeder.class);
 
     /**
      * Interval in millisecond between two seeding messages to seed the cache.
      */
-    private static final long CACHE_SEEDING_DELAY_MS = 10_000;
+    private static final long DEFAULT_CACHE_SEEDING_DELAY_MS = 10_000;
 
-    private final String topic;
-
-    private final MessagingProvider provider;
+    private volatile boolean closed;
 
-    private volatile Closeable poller;
+    private final long seedingDelay;
 
-    private volatile boolean closed;
+    private MessageSender<PackageMessage> sender;
 
-    public QueueCacheSeeder(MessagingProvider provider, String topic) {
-        this.provider = provider;
-        this.topic = topic;
+    private Thread seedingThread;
+    
+    public QueueCacheSeeder(MessageSender<PackageMessage> sender) {
+        this(sender, DEFAULT_CACHE_SEEDING_DELAY_MS);
     }
 
-    public void seedOne() {
-        startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one 
seed");
+    public QueueCacheSeeder(MessageSender<PackageMessage> sender, long 
seedingDelay) {
+        this.sender = sender;
+        this.seedingDelay = seedingDelay;
     }
 
-    public void seed(LongConsumer callback) {
-        poller = provider.createPoller(topic, Reset.latest,
-                create(PackageMessage.class, (info, msg) -> {
-                    close();
-                    callback.accept(info.getOffset());
-                }));
-        startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+    public void startSeeding() {
+        seedingThread = startBackgroundThread(this::sendSeedingMessages, 
"Seeder thread");
     }
 
     @Override
     public void close() {
         closed = true;
-        IOUtils.closeQuietly(poller);
+        try {
+            seedingThread.join();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
     }
 
+    /**
+     * We repeatedly send seeding messages as the first message is sometimes 
not received by the consumer.
+     */
     private void sendSeedingMessages() {
         LOG.info("Start message seeder");
         try {
-            MessageSender<PackageMessage> sender = 
provider.createSender(topic);
             while (!closed) {
                 sendSeedingMessage(sender);
-                delay(CACHE_SEEDING_DELAY_MS);
+                delay(seedingDelay);
             }
         } finally {
             LOG.info("Stop message seeder");
         }
     }
 
-    private void sendSeedingMessage() {
-        sendSeedingMessage(provider.createSender(topic));
-    }
-
     private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
         PackageMessage pkgMsg = createTestMessage();
         LOG.info("Send seeding message");
@@ -100,15 +90,27 @@ public class QueueCacheSeeder implements Closeable {
             sender.send(pkgMsg);
         } catch (MessagingException e) {
             LOG.warn(e.getMessage(), e);
-            delay(CACHE_SEEDING_DELAY_MS * 10);
+            delay(seedingDelay * 10);
         }
     }
 
-    private static void delay(long sleepMs) {
-        try {
-            Thread.sleep(sleepMs);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
+    /**
+     * Sleep with handling of interrupts and quick exit in case of closed.
+     * We do not interrupt the seeder thread from outside as this sometimes 
fails in the messaging impl code.
+     * 
+     * @param sleepMs milliseconds to sleep
+     */
+    private void delay(long sleepMs) {
+        long sleepCycles = sleepMs / 100;
+        for (int curCycle=0; curCycle < sleepCycles; curCycle++) {
+            if (closed) {
+                return;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
deleted file mode 100644
index b18ee55..0000000
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.queue.impl;
-
-import javax.annotation.ParametersAreNonnullByDefault;
-
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
-import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
-
-/**
- * Periodical task to persist a cache seed
- * to the repository. The task must run only
- * on the leader instance to avoid concurrent
- * writes and reduce write operations in
- * clustered deployments.
- */
-@Component(
-        service = Runnable.class,
-        property = {
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER,
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes
-        })
-@ParametersAreNonnullByDefault
-public class QueueCacheSeederTask implements Runnable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(QueueCacheSeederTask.class);
-
-    @Reference
-    private PubQueueCacheService queueCacheService;
-
-    @Override
-    public void run() {
-        LOG.debug("Starting package cache seeder task");
-        queueCacheService.storeSeed();
-        LOG.debug("Stopping package cache seeder task");
-    }
-}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 0c9b634..aca7d01 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
@@ -42,6 +42,7 @@ import java.util.stream.LongStream;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
@@ -59,6 +60,7 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.service.event.EventAdmin;
 import org.slf4j.Logger;
@@ -82,9 +84,6 @@ public class PubQueueCacheTest {
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
 
-    @Captor
-    private ArgumentCaptor<String> headAssignCaptor;
-
     @Mock
     private EventAdmin eventAdmin;
 
@@ -106,31 +105,42 @@ public class PubQueueCacheTest {
     @Mock
     private Closeable poller;
 
+    @Mock
+    private MessageSender<Object> sender;
+
+    @Mock
+    private QueueCacheSeeder seeder;
+
     private PubQueueCache cache;
 
     private ExecutorService executor;
 
     private MessageHandler<PackageMessage> tailHandler;
 
+
     @Before
     public void before() {
-        when(clientProvider.assignTo(anyLong())).then(
-                answer -> "0:" + answer.getArguments()[0]);
         when(clientProvider.createPoller(
                 eq(TOPIC),
-                any(Reset.class),
-                headAssignCaptor.capture(),
+                eq(Reset.latest),
+                handlerCaptor.capture()))
+                .thenReturn(poller);
+        when(clientProvider.createPoller(
+                eq(TOPIC),
+                eq(Reset.earliest),
+                Mockito.anyString(),
                 handlerCaptor.capture()))
                 .thenReturn(poller);
+        when(clientProvider.createSender(Mockito.anyString()))
+            .thenReturn(sender);
 
         when(distributionMetricsService.getQueueCacheFetchCount())
-                .thenReturn(counter);
+            .thenReturn(counter);
 
         when(seedStore.load(anyString(), any())).thenReturn(0L);
 
-        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, seedStore, cacheSeeder);
-        cache.storeSeed();
-
+        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, seeder);
+        verify(seeder).startSeeding();
         executor = Executors.newFixedThreadPool(10);
         tailHandler = handlerCaptor.getValue().getHandler();
     }
@@ -159,17 +169,17 @@ public class PubQueueCacheTest {
         Future<OffsetQueue<DistributionQueueItem>> consumer = 
consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
         // wait that the consumer has started fetching the offsets from 100 to 
200
-        awaitHeadHandler();
+        MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
-        long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, 
cache.getMinOffset());
+        simulateMessages(headHandler, 100, cache.getMinOffset());
         // the consumer returns the offset queue
         consumer.get(15, SECONDS);
         assertEquals(100, cache.getMinOffset());
     }
 
     private MessageHandler<PackageMessage> awaitHeadHandler() {
-        return Awaitility.await().ignoreExceptions().until(() -> 
handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+        return Awaitility.await().ignoreExceptions()
+                .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), 
notNullValue());
     }
 
        @Test
@@ -182,8 +192,7 @@ public class PubQueueCacheTest {
         // wait that one consumer has started fetching the offsets from 100 to 
200
         MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
-        long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
-        simulateMessages(headHandler, fromOffset, cache.getMinOffset());
+        simulateMessages(headHandler, 100, cache.getMinOffset());
         // both consumers returns the offset queue
         OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
         OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
@@ -205,7 +214,8 @@ public class PubQueueCacheTest {
     }
 
     private void simulateMessages(MessageHandler<PackageMessage> handler, long 
fromOffset, long toOffset) {
-        LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> 
simulateMessage(handler, offset));
+        LongStream.rangeClosed(fromOffset, toOffset)
+            .forEach(offset -> simulateMessage(handler, offset));
     }
     
     private void simulateMessage(MessageHandler<PackageMessage> handler, long 
offset) {
@@ -242,10 +252,4 @@ public class PubQueueCacheTest {
         return c[RAND.nextInt(c.length)];
     }
 
-    private Long offsetFromAssign(String assign) {
-        String[] chunks = assign.split(":");
-        return Long.parseLong(chunks[1]);
-    }
-
-
 }
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 846386c..db2c02f 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -51,7 +51,6 @@ import 
org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.apache.sling.distribution.journal.shared.LocalStore;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -117,7 +116,6 @@ public class PubQueueProviderTest {
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
                 Mockito.any(Reset.class),
-                Mockito.anyString(),
                 handlerCaptor.capture()))
         .thenReturn(poller);
         when(clientProvider.createPoller(
@@ -130,8 +128,6 @@ public class PubQueueProviderTest {
         Topics topics = new Topics();
         String slingId = UUID.randomUUID().toString();
         when(slingSettings.getSlingId()).thenReturn(slingId);
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", 
slingId);
-        seedStore.store("offset", 1L);
         pubQueueCacheService = new PubQueueCacheService(clientProvider, 
topics, eventAdmin, slingSettings, resolverFactory, slingId);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, 
clientProvider, topics);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
index 2bddbe7..612647b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -18,28 +18,14 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
-import static java.lang.System.currentTimeMillis;
-import static 
org.apache.sling.distribution.journal.shared.Topics.PACKAGE_TOPIC;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.util.function.LongConsumer;
 
-import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
-import org.apache.sling.distribution.journal.shared.TestMessageInfo;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -47,62 +33,33 @@ import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.mockito.runners.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class QueueCacheSeederTest {
 
-    @Mock
-    private MessagingProvider clientProvider;
-
-    @Captor
-    private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
-
     @Captor
     private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
 
     @Mock
-    private Closeable poller;
-
-    @Mock
     private MessageSender<PackageMessage> sender;
 
-    @Mock
-    private LongConsumer callback;
-
     private QueueCacheSeeder seeder;
 
     @Before
     public void before() {
         MockitoAnnotations.initMocks(this);
-        when(clientProvider.createPoller(
-                eq(PACKAGE_TOPIC),
-                any(Reset.class),
-                pkgHandlerCaptor.capture()))
-                .thenReturn(poller);
         doNothing().when(sender).send(pkgMsgCaptor.capture());
-        when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC)))
-                .thenReturn(sender);
-        seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
-    }
-
-    @Test
-    public void testSeededCallback() throws IOException {
-        seeder.seed(callback);
-        long offset = 15L;
-        simulateSeedingMsg(offset);
-        verify(callback).accept(offset);
-        verify(poller).close();
+        seeder = new QueueCacheSeeder(sender, 100);
     }
 
     @Test
-    public void testSendingSeeds() {
-        seeder.seed(callback);
-        verify(sender, 
timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture());
-        PackageMessage seedMsg = pkgMsgCaptor.getValue();
-        assertNotNull(seedMsg);
-        assertEquals(ReqType.TEST, seedMsg.getReqType());
+    public void testSeeding() throws IOException {
+        seeder.startSeeding();
+        
+        verify(sender, timeout(1000)).send(Mockito.anyObject());
     }
 
     @After
@@ -110,10 +67,4 @@ public class QueueCacheSeederTest {
         seeder.close();
     }
 
-    private void simulateSeedingMsg(long offset) {
-        PackageMessage msg = seeder.createTestMessage();
-        pkgHandlerCaptor.getValue().getHandler().handle(
-                new TestMessageInfo(PACKAGE_TOPIC, 0, offset, 
currentTimeMillis()),
-                msg);
-    }
 }
\ No newline at end of file

Reply via email to