This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9577 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 6345a750c435f444c289d00f194ff390d6668dbf Author: Christian Schneider <[email protected]> AuthorDate: Fri Jul 10 10:18:30 2020 +0200 SLING-9577 - Switch to seeding thread --- .../journal/impl/queue/impl/PubQueueCache.java | 60 +++------------- .../impl/queue/impl/PubQueueCacheService.java | 28 ++------ .../journal/impl/queue/impl/QueueCacheSeeder.java | 82 +++++++++++----------- .../impl/queue/impl/QueueCacheSeederTask.java | 61 ---------------- .../journal/impl/queue/impl/PubQueueCacheTest.java | 54 +++++++------- .../impl/queue/impl/PubQueueProviderTest.java | 4 -- .../impl/queue/impl/QueueCacheSeederTest.java | 61 ++-------------- 7 files changed, 90 insertions(+), 260 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index 940313b..5546c63 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -37,13 +37,11 @@ import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; import org.apache.sling.distribution.journal.shared.JMXRegistration; -import org.apache.sling.distribution.journal.shared.LocalStore; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.osgi.service.event.Event; import org.osgi.service.event.EventAdmin; @@ -102,12 +100,6 @@ public class PubQueueCache { */ private final AtomicLong maxOffset = new AtomicLong(-1L); - /** - * Holds the last known seed offset stored to the - * seed store. - */ - private volatile long seedOffset = -1L; - private final Set<JMXRegistration> jmxRegs = new HashSet<>(); private final MessagingProvider messagingProvider; @@ -120,44 +112,23 @@ public class PubQueueCache { private final String topic; - private final LocalStore seedStore; - private final DistributionMetricsService distributionMetricsService; - public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) { + public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, QueueCacheSeeder seeder) { this.messagingProvider = messagingProvider; this.eventAdmin = eventAdmin; this.distributionMetricsService = distributionMetricsService; this.topic = topic; - this.seedStore = seedStore; this.seeder = seeder; - Long offset = seedStore.load("offset", Long.class); - if (offset != null) { - seedOffset = offset; - startPoller(seedOffset); - /* - * We need at least one seeding message - * for cases where the seedOffset is no - * longer on the journal. - */ - seeder.seedOne(); - } else { - /* - * Fallback to seeding messages when - * no offset could be found in the - * repository. - */ - seeder.seed(this::startPoller); - } + startPoller(); + this.seeder.startSeeding(); } - private void startPoller(long offset) { - LOG.info("Seed with offset: {}", offset); - String assignTo = messagingProvider.assignTo(offset); + private void startPoller() { + LOG.info("Starting consumer"); tailPoller = messagingProvider.createPoller( this.topic, - Reset.earliest, - assignTo, + Reset.latest, create(PackageMessage.class, this::handlePackage) ); } @@ -171,23 +142,6 @@ public class PubQueueCache { return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>()); } - public void storeSeed() { - long newSeed = maxOffset.longValue(); - if (newSeed > seedOffset) { - storeSeed(newSeed); - seedOffset = newSeed; - } - } - - private void storeSeed(long offset) { - LOG.info("Store seed offset {}", offset); - try { - seedStore.store("offset", offset); - } catch (PersistenceException e) { - LOG.warn("Failed to persist seed offset", e); - } - } - public int size() { return agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum(); } @@ -320,6 +274,8 @@ public class PubQueueCache { } private void handlePackage(final MessageInfo info, final PackageMessage message) { + LOG.info("Receive package {} {}", info, message); + seeder.close(); merge(singletonList(new FullMessage<>(info, message))); updateMaxOffset(info.getOffset()); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java index f9291ed..779e8bd 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java @@ -23,12 +23,13 @@ import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; +import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.shared.DistributionMetricsService; -import org.apache.sling.distribution.journal.shared.LocalStore; import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.JournalAvailable; +import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.settings.SlingSettingsService; import org.osgi.service.component.annotations.Activate; @@ -75,16 +76,8 @@ public class PubQueueCacheService { @Reference private DistributionMetricsService distributionMetricsService; - @Reference - private SlingSettingsService slingSettings; - - @Reference - private ResourceResolverFactory resolverFactory; - private volatile PubQueueCache cache; - private String pubSlingId; - public PubQueueCacheService() {} public PubQueueCacheService(MessagingProvider messagingProvider, @@ -96,14 +89,10 @@ public class PubQueueCacheService { this.messagingProvider = messagingProvider; this.topics = topics; this.eventAdmin = eventAdmin; - this.slingSettings = slingSettingsService; - this.resolverFactory = resolverFactory; - this.pubSlingId = pubSlingId; } @Activate public void activate() { - pubSlingId = slingSettings.getSlingId(); cache = newCache(); LOG.info("Started Publisher queue cache service"); } @@ -145,17 +134,10 @@ public class PubQueueCacheService { } } - public void storeSeed() { - PubQueueCache queueCache = this.cache; - if (queueCache != null) { - queueCache.storeSeed(); - } - } - private PubQueueCache newCache() { - LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId); String topic = topics.getPackageTopic(); - QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic); - return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder); + MessageSender<PackageMessage> sender = messagingProvider.createSender(topic); + QueueCacheSeeder seeder = new QueueCacheSeeder(sender); + return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seeder); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java index e3ac96c..9ffd37c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java @@ -18,81 +18,71 @@ */ package org.apache.sling.distribution.journal.impl.queue.impl; +import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; + import java.io.Closeable; import java.util.UUID; -import java.util.function.LongConsumer; -import org.apache.commons.io.IOUtils; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingException; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.sling.distribution.journal.HandlerAdapter.create; -import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; - public class QueueCacheSeeder implements Closeable { - - private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class); /** * Interval in millisecond between two seeding messages to seed the cache. */ - private static final long CACHE_SEEDING_DELAY_MS = 10_000; + private static final long DEFAULT_CACHE_SEEDING_DELAY_MS = 10_000; - private final String topic; - - private final MessagingProvider provider; + private volatile boolean closed; - private volatile Closeable poller; + private final long seedingDelay; - private volatile boolean closed; + private MessageSender<PackageMessage> sender; - public QueueCacheSeeder(MessagingProvider provider, String topic) { - this.provider = provider; - this.topic = topic; + private Thread seedingThread; + + public QueueCacheSeeder(MessageSender<PackageMessage> sender) { + this(sender, DEFAULT_CACHE_SEEDING_DELAY_MS); } - public void seedOne() { - startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed"); + public QueueCacheSeeder(MessageSender<PackageMessage> sender, long seedingDelay) { + this.sender = sender; + this.seedingDelay = seedingDelay; } - public void seed(LongConsumer callback) { - poller = provider.createPoller(topic, Reset.latest, - create(PackageMessage.class, (info, msg) -> { - close(); - callback.accept(info.getOffset()); - })); - startBackgroundThread(this::sendSeedingMessages, "Seeder thread"); + public void startSeeding() { + seedingThread = startBackgroundThread(this::sendSeedingMessages, "Seeder thread"); } @Override public void close() { closed = true; - IOUtils.closeQuietly(poller); + try { + seedingThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } + /** + * We repeatedly send seeding messages as the first message is sometimes not received by the consumer. + */ private void sendSeedingMessages() { LOG.info("Start message seeder"); try { - MessageSender<PackageMessage> sender = provider.createSender(topic); while (!closed) { sendSeedingMessage(sender); - delay(CACHE_SEEDING_DELAY_MS); + delay(seedingDelay); } } finally { LOG.info("Stop message seeder"); } } - private void sendSeedingMessage() { - sendSeedingMessage(provider.createSender(topic)); - } - private void sendSeedingMessage(MessageSender<PackageMessage> sender) { PackageMessage pkgMsg = createTestMessage(); LOG.info("Send seeding message"); @@ -100,15 +90,27 @@ public class QueueCacheSeeder implements Closeable { sender.send(pkgMsg); } catch (MessagingException e) { LOG.warn(e.getMessage(), e); - delay(CACHE_SEEDING_DELAY_MS * 10); + delay(seedingDelay * 10); } } - private static void delay(long sleepMs) { - try { - Thread.sleep(sleepMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + /** + * Sleep with handling of interrupts and quick exit in case of closed. + * We do not interrupt the seeder thread from outside as this sometimes fails in the messaging impl code. + * + * @param sleepMs milliseconds to sleep + */ + private void delay(long sleepMs) { + long sleepCycles = sleepMs / 100; + for (int curCycle=0; curCycle < sleepCycles; curCycle++) { + if (closed) { + return; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java deleted file mode 100644 index b18ee55..0000000 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.impl.queue.impl; - -import javax.annotation.ParametersAreNonnullByDefault; - -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD; -import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON; -import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER; - -/** - * Periodical task to persist a cache seed - * to the repository. The task must run only - * on the leader instance to avoid concurrent - * writes and reduce write operations in - * clustered deployments. - */ -@Component( - service = Runnable.class, - property = { - PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false", - PROPERTY_SCHEDULER_RUN_ON + "=" + VALUE_RUN_ON_LEADER, - PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes - }) -@ParametersAreNonnullByDefault -public class QueueCacheSeederTask implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeederTask.class); - - @Reference - private PubQueueCacheService queueCacheService; - - @Override - public void run() { - LOG.debug("Starting package cache seeder task"); - queueCacheService.storeSeed(); - LOG.debug("Stopping package cache seeder task"); - } -} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index 0c9b634..aca7d01 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -25,9 +25,9 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.Closeable; @@ -42,6 +42,7 @@ import java.util.stream.LongStream; import org.apache.sling.commons.metrics.Counter; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; @@ -59,6 +60,7 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; @@ -82,9 +84,6 @@ public class PubQueueCacheTest { @Captor private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; - @Captor - private ArgumentCaptor<String> headAssignCaptor; - @Mock private EventAdmin eventAdmin; @@ -106,31 +105,42 @@ public class PubQueueCacheTest { @Mock private Closeable poller; + @Mock + private MessageSender<Object> sender; + + @Mock + private QueueCacheSeeder seeder; + private PubQueueCache cache; private ExecutorService executor; private MessageHandler<PackageMessage> tailHandler; + @Before public void before() { - when(clientProvider.assignTo(anyLong())).then( - answer -> "0:" + answer.getArguments()[0]); when(clientProvider.createPoller( eq(TOPIC), - any(Reset.class), - headAssignCaptor.capture(), + eq(Reset.latest), + handlerCaptor.capture())) + .thenReturn(poller); + when(clientProvider.createPoller( + eq(TOPIC), + eq(Reset.earliest), + Mockito.anyString(), handlerCaptor.capture())) .thenReturn(poller); + when(clientProvider.createSender(Mockito.anyString())) + .thenReturn(sender); when(distributionMetricsService.getQueueCacheFetchCount()) - .thenReturn(counter); + .thenReturn(counter); when(seedStore.load(anyString(), any())).thenReturn(0L); - cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder); - cache.storeSeed(); - + cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seeder); + verify(seeder).startSeeding(); executor = Executors.newFixedThreadPool(10); tailHandler = handlerCaptor.getValue().getHandler(); } @@ -159,17 +169,17 @@ public class PubQueueCacheTest { Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100); // seeding the cache with a message at offset 200 // wait that the consumer has started fetching the offsets from 100 to 200 - awaitHeadHandler(); + MessageHandler<PackageMessage> headHandler = awaitHeadHandler(); // simulate messages for the fetched offsets - long fromOffset = offsetFromAssign(headAssignCaptor.getValue()); - simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset()); + simulateMessages(headHandler, 100, cache.getMinOffset()); // the consumer returns the offset queue consumer.get(15, SECONDS); assertEquals(100, cache.getMinOffset()); } private MessageHandler<PackageMessage> awaitHeadHandler() { - return Awaitility.await().ignoreExceptions().until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue()); + return Awaitility.await().ignoreExceptions() + .until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue()); } @Test @@ -182,8 +192,7 @@ public class PubQueueCacheTest { // wait that one consumer has started fetching the offsets from 100 to 200 MessageHandler<PackageMessage> headHandler = awaitHeadHandler(); // simulate messages for the fetched offsets - long fromOffset = offsetFromAssign(headAssignCaptor.getValue()); - simulateMessages(headHandler, fromOffset, cache.getMinOffset()); + simulateMessages(headHandler, 100, cache.getMinOffset()); // both consumers returns the offset queue OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS); OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS); @@ -205,7 +214,8 @@ public class PubQueueCacheTest { } private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) { - LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset)); + LongStream.rangeClosed(fromOffset, toOffset) + .forEach(offset -> simulateMessage(handler, offset)); } private void simulateMessage(MessageHandler<PackageMessage> handler, long offset) { @@ -242,10 +252,4 @@ public class PubQueueCacheTest { return c[RAND.nextInt(c.length)]; } - private Long offsetFromAssign(String assign) { - String[] chunks = assign.split(":"); - return Long.parseLong(chunks[1]); - } - - } \ No newline at end of file diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index 846386c..db2c02f 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -51,7 +51,6 @@ import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status; -import org.apache.sling.distribution.journal.shared.LocalStore; import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; @@ -117,7 +116,6 @@ public class PubQueueProviderTest { when(clientProvider.createPoller( Mockito.eq(Topics.PACKAGE_TOPIC), Mockito.any(Reset.class), - Mockito.anyString(), handlerCaptor.capture())) .thenReturn(poller); when(clientProvider.createPoller( @@ -130,8 +128,6 @@ public class PubQueueProviderTest { Topics topics = new Topics(); String slingId = UUID.randomUUID().toString(); when(slingSettings.getSlingId()).thenReturn(slingId); - LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId); - seedStore.store("offset", 1L); pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId); pubQueueCacheService.activate(); queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java index 2bddbe7..612647b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java @@ -18,28 +18,14 @@ */ package org.apache.sling.distribution.journal.impl.queue.impl; -import static java.lang.System.currentTimeMillis; -import static org.apache.sling.distribution.journal.shared.Topics.PACKAGE_TOPIC; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.io.Closeable; import java.io.IOException; -import java.util.function.LongConsumer; -import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageSender; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.messages.PackageMessage; -import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; -import org.apache.sling.distribution.journal.shared.TestMessageInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,62 +33,33 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class QueueCacheSeederTest { - @Mock - private MessagingProvider clientProvider; - - @Captor - private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor; - @Captor private ArgumentCaptor<PackageMessage> pkgMsgCaptor; @Mock - private Closeable poller; - - @Mock private MessageSender<PackageMessage> sender; - @Mock - private LongConsumer callback; - private QueueCacheSeeder seeder; @Before public void before() { MockitoAnnotations.initMocks(this); - when(clientProvider.createPoller( - eq(PACKAGE_TOPIC), - any(Reset.class), - pkgHandlerCaptor.capture())) - .thenReturn(poller); doNothing().when(sender).send(pkgMsgCaptor.capture()); - when(clientProvider.<PackageMessage>createSender(eq(PACKAGE_TOPIC))) - .thenReturn(sender); - seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC); - } - - @Test - public void testSeededCallback() throws IOException { - seeder.seed(callback); - long offset = 15L; - simulateSeedingMsg(offset); - verify(callback).accept(offset); - verify(poller).close(); + seeder = new QueueCacheSeeder(sender, 100); } @Test - public void testSendingSeeds() { - seeder.seed(callback); - verify(sender, timeout(5000).atLeastOnce()).send(pkgMsgCaptor.capture()); - PackageMessage seedMsg = pkgMsgCaptor.getValue(); - assertNotNull(seedMsg); - assertEquals(ReqType.TEST, seedMsg.getReqType()); + public void testSeeding() throws IOException { + seeder.startSeeding(); + + verify(sender, timeout(1000)).send(Mockito.anyObject()); } @After @@ -110,10 +67,4 @@ public class QueueCacheSeederTest { seeder.close(); } - private void simulateSeedingMsg(long offset) { - PackageMessage msg = seeder.createTestMessage(); - pkgHandlerCaptor.getValue().getHandler().handle( - new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()), - msg); - } } \ No newline at end of file
