This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9460 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit c06ef5fb8861c4206b244b95fc61cbac25ef41c3 Author: Christian Schneider <[email protected]> AuthorDate: Wed May 20 16:54:37 2020 +0200 SLING-9460 - Avoid seeding messages in PubQueueCache --- .../journal/impl/publisher/DiscoveryService.java | 12 +- .../journal/impl/queue/impl/PubQueueCache.java | 112 +++---------------- .../impl/queue/impl/PubQueueCacheService.java | 13 ++- .../impl/publisher/DiscoveryServiceTest.java | 4 +- .../journal/impl/queue/impl/PubQueueCacheTest.java | 123 +++++---------------- .../impl/queue/impl/PubQueueProviderTest.java | 2 + 6 files changed, 70 insertions(+), 196 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java index 128d453..f9b2ece 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java @@ -29,6 +29,7 @@ import java.util.Hashtable; import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.Messages; import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; @@ -78,6 +79,9 @@ public class DiscoveryService implements Runnable { @Reference private TopologyChangeHandler topologyChangeHandler; + + @Reference + private PubQueueCacheService pubQueueCacheService; private volatile ServiceRegistration<?> reg; @@ -91,10 +95,12 @@ public class DiscoveryService implements Runnable { public DiscoveryService( MessagingProvider messagingProvider, TopologyChangeHandler topologyChangeHandler, - Topics topics) { + Topics topics, + PubQueueCacheService pubQueueCacheService) { this.messagingProvider = messagingProvider; this.topologyChangeHandler = topologyChangeHandler; this.topics = topics; + this.pubQueueCacheService = pubQueueCacheService; } @Activate @@ -151,12 +157,14 @@ public class DiscoveryService implements Runnable { long now = System.currentTimeMillis(); AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName()); - + long minOffset = 0; for (Messages.SubscriberState subStateMsg : disMsg.getSubscriberStateList()) { SubscriberConfiguration subConfig = disMsg.getSubscriberConfiguration(); State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable()); viewManager.refreshState(subState); + minOffset = Math.min(minOffset, subState.getOffset()); } + pubQueueCacheService.seed(minOffset); } } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index 438af2e..ede183d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -19,7 +19,6 @@ package org.apache.sling.distribution.journal.impl.queue.impl; -import static java.lang.System.currentTimeMillis; import static org.apache.sling.distribution.journal.HandlerAdapter.create; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -30,9 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -55,11 +52,8 @@ import org.apache.sling.distribution.journal.impl.queue.QueueItemFactory; import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; import org.apache.sling.distribution.journal.FullMessage; import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessageSender; -import org.apache.sling.distribution.journal.MessagingException; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.RunnableUtil; /** * Cache the distribution packages fetched from the package topic. @@ -85,12 +79,6 @@ public class PubQueueCache { private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap<>(); /** - * Blocks the threads awaiting until the agentQueues - * cache has been seeded. - */ - private final CountDownLatch seeded = new CountDownLatch(1); - - /** * Only allows to fetch data from the journal * with a single thread. */ @@ -110,37 +98,35 @@ public class PubQueueCache { private final EventAdmin eventAdmin; - private final Closeable tailPoller; + private volatile Closeable tailPoller; private final String topic; - private final long seedingDelayMs; - private final DistributionMetricsService distributionMetricsService; - - /** - * Way out for the threads awaiting on the seeded - * latch, when the component is deactivated. - */ + private volatile boolean closed; - public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, long seedingDelayMs) { + public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) { this.messagingProvider = messagingProvider; this.eventAdmin = eventAdmin; this.distributionMetricsService = distributionMetricsService; - this.seedingDelayMs = seedingDelayMs; this.topic = topic; + } - tailPoller = messagingProvider.createPoller( - topic, + public void seed(long offset) { + if (tailPoller == null) { + String assignTo = messagingProvider.assignTo(offset); + tailPoller = messagingProvider.createPoller( + this.topic, Reset.latest, + assignTo, create(PackageMessage.class, this::handlePackage)); - - RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding"); + } } @Nonnull public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException { + waitSeeded(); fetchIfNeeded(minOffset); return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>()); } @@ -150,62 +136,11 @@ public class PubQueueCache { } public void close() { - - /* - * Note that we don't close resources using Thread.interrupt() - * because interrupts can stop the Apache Oak repository. - * - * See SLING-9340, OAK-2609 and https://jackrabbit.apache.org/oak/docs/dos_and_donts.html - */ - closed = true; IOUtils.closeQuietly(tailPoller); jmxRegs.forEach(IOUtils::closeQuietly); } - private void seedCache() { - LOG.info("Start message seeder"); - try { - MessageSender<PackageMessage> sender = messagingProvider.createSender(); - do { - sendSeedingMessage(sender); - } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - LOG.info("Stop message seeder"); - } - } - - private void sendSeedingMessage(MessageSender<PackageMessage> sender) { - PackageMessage pkgMsg = createTestMessage(); - LOG.info("Send seeding message"); - try { - sender.send(topic, pkgMsg); - } catch (MessagingException e) { - LOG.warn(e.getMessage(), e); - delay(seedingDelayMs * 10); - } - } - - private static void delay(long sleepMs) { - try { - Thread.sleep(sleepMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private PackageMessage createTestMessage() { - String pkgId = UUID.randomUUID().toString(); - return PackageMessage.newBuilder() - .setPubSlingId("seeder") - .setPkgId(pkgId) - .setPkgType("seeder") - .setReqType(PackageMessage.ReqType.TEST) - .build(); - } - /** * Fetch the package messages from the requested min offset, * up to the current cached min offset. @@ -213,13 +148,7 @@ public class PubQueueCache { * @param requestedMinOffset the min offset to start fetching data from */ private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException { - - // We wait on the cache to be seeded (at least one message handled) - // before computing potential missing offsets. - waitSeeded(); - long cachedMinOffset = getMinOffset(); - if (requestedMinOffset < cachedMinOffset) { LOG.debug("Requested min offset {} smaller than cached min offset {}", requestedMinOffset, cachedMinOffset); @@ -271,15 +200,14 @@ public class PubQueueCache { } private void waitSeeded() throws InterruptedException { - long start = currentTimeMillis(); - while (!closed && currentTimeMillis() - start < MAX_FETCH_WAIT_MS) { - if (seeded.await(seedingDelayMs, MILLISECONDS)) { - return; - } else { - LOG.debug("Waiting for seeded cache"); + long start = System.currentTimeMillis(); + while (getMinOffset() == Long.MAX_VALUE) { + LOG.debug("Waiting for seeded cache"); + if (closed || System.currentTimeMillis() - start > MAX_FETCH_WAIT_MS) { + throw new RuntimeException("Gave up waiting for seeded cache"); } + Thread.sleep(1000); } - throw new RuntimeException("Gave up waiting for seeded cache"); } protected long getMinOffset() { @@ -340,9 +268,5 @@ public class PubQueueCache { private void handlePackage(final MessageInfo info, final PackageMessage message) { merge(singletonList(new FullMessage<>(info, message))); - if (seeded.getCount() > 0) { - LOG.info("Cache has been seeded"); - } - seeded.countDown(); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java index 8f49687..6609b8b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java @@ -57,11 +57,6 @@ public class PubQueueCacheService implements Runnable { private static final int CLEANUP_THRESHOLD = 10_000; /** - * Interval in millisecond between two seeding messages to seed the cache. - */ - private static final long CACHE_SEEDING_DELAY_MS = 10_000; - - /** * Will cause the cache to be cleared when we loose the journal */ @Reference @@ -115,6 +110,12 @@ public class PubQueueCacheService implements Runnable { } } + public void seed(long offset) { + if (cache != null) { + cache.seed(offset); + } + } + private void cleanup() { if (cache != null) { int size = cache.size(); @@ -129,7 +130,7 @@ public class PubQueueCacheService implements Runnable { } private PubQueueCache newCache() { - return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), CACHE_SEEDING_DELAY_MS); + return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic()); } @Override diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java index 02a58c1..782cdd9 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java @@ -30,6 +30,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.UUID; +import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.junit.After; @@ -99,7 +100,8 @@ public class DiscoveryServiceTest { captureHandler.capture())).thenReturn(poller); Topics topics = mock(Topics.class); TopologyChangeHandler topologyChangeHandler = mock(TopologyChangeHandler.class); - discoveryService = new DiscoveryService(clientProvider, topologyChangeHandler, topics); + PubQueueCacheService pubQueueCacheService = mock(PubQueueCacheService.class); + discoveryService = new DiscoveryService(clientProvider, topologyChangeHandler, topics, pubQueueCacheService); discoveryService.activate(bundleContext); discoveryHandler = captureHandler.getValue().getHandler(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index c6245b8..5c624cf 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -24,20 +24,17 @@ import static org.apache.sling.distribution.journal.messages.Messages.PackageMes import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE; import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST; import static org.awaitility.Awaitility.await; -import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.Closeable; import java.io.IOException; -import java.lang.Thread.State; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -47,13 +44,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeoutException; import java.util.stream.LongStream; import org.apache.sling.commons.metrics.Counter; import org.apache.sling.distribution.journal.HandlerAdapter; import org.apache.sling.distribution.journal.MessageHandler; -import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.impl.queue.OffsetQueue; @@ -71,7 +67,6 @@ import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; import org.osgi.service.event.EventAdmin; import org.slf4j.Logger; @@ -96,10 +91,7 @@ public class PubQueueCacheTest { private ArgumentCaptor<PackageMessage> seedingMessageCaptor; @Captor - private ArgumentCaptor<HandlerAdapter<PackageMessage>> tailHandlerCaptor; - - @Captor - private ArgumentCaptor<HandlerAdapter<PackageMessage>> headHandlerCaptor; + private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; @Captor private ArgumentCaptor<String> headAssignCaptor; @@ -117,9 +109,6 @@ public class PubQueueCacheTest { private Counter counter; @Mock - private MessageSender<PackageMessage> pkgSender; - - @Mock private Closeable poller; private PubQueueCache cache; @@ -135,28 +124,18 @@ public class PubQueueCacheTest { when(clientProvider.createPoller( eq(TOPIC), any(Reset.class), - tailHandlerCaptor.capture())) - .thenReturn(poller); - - when(clientProvider.createPoller( - eq(TOPIC), - any(Reset.class), headAssignCaptor.capture(), - headHandlerCaptor.capture())) + handlerCaptor.capture())) .thenReturn(poller); - when(clientProvider.<PackageMessage>createSender()) - .thenReturn(pkgSender); - when(distributionMetricsService.getQueueCacheFetchCount()) .thenReturn(counter); - cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, 250); - verify(pkgSender, timeout(5000)).send(Mockito.eq(TOPIC), seedingMessageCaptor.capture()); - - executor = Executors.newFixedThreadPool(10); + cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC); + cache.seed(0); - tailHandler = tailHandlerCaptor.getValue().getHandler(); + executor = Executors.newFixedThreadPool(10); + tailHandler = handlerCaptor.getValue().getHandler(); } @After @@ -167,7 +146,7 @@ public class PubQueueCacheTest { @Test public void testSeedingFromNewPackageMessage() throws Exception { - Future<OffsetQueue<DistributionQueueItem>> consumer = executor.submit(new Consumer(PUB_AGENT_NAME_1, 0)); + Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0); // The consumer is blocked until the cache is seeded assertFalse(consumer.isDone()); // sending any package message seeds the cache @@ -176,27 +155,13 @@ public class PubQueueCacheTest { } @Test - public void testSeedingFromSeedingMessage() throws Exception { - ArgumentCaptor<PackageMessage> seedingMsgCaptor = ArgumentCaptor.forClass(PackageMessage.class); - Future<OffsetQueue<DistributionQueueItem>> consumer = executor.submit(new Consumer(PUB_AGENT_NAME_1, 0)); - // The consumer is blocked until the cache is seeded - assertFalse(consumer.isDone()); - // wait until a seeding message is sent and captured - verify(pkgSender, timeout(15000).atLeastOnce()).send(eq(TOPIC), seedingMsgCaptor.capture()); - // sending the captured seeding message seeds the cache - simulateMessage(tailHandler, seedingMsgCaptor.getValue(), 0); - consumer.get(15, SECONDS); - } - - @Test public void testSeedingConcurrentConsumers() throws Exception { List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new ArrayList<>(); - consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_1, 0))); - consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_2, 0))); - consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_3, 0))); + consumers.add(consumer(PUB_AGENT_NAME_1, 0)); + consumers.add(consumer(PUB_AGENT_NAME_2, 0)); + consumers.add(consumer(PUB_AGENT_NAME_3, 0)); // All consumers are blocked until the cache is seeded consumers.forEach(future -> assertFalse(future.isDone())); - // sending any package message seeds the cache simulateMessage(tailHandler, 0); consumers.forEach(future -> assertNotNull(get(future))); } @@ -204,45 +169,47 @@ public class PubQueueCacheTest { @Test public void testFetchWithSingleConsumer() throws Exception { // build a consumer form offset 100 - Future<OffsetQueue<DistributionQueueItem>> consumer = executor.submit(new Consumer(PUB_AGENT_NAME_1, 100)); + Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100); // seeding the cache with a message at offset 200 simulateMessage(tailHandler, 200); // wait that the consumer has started fetching the offsets from 100 to 200 - awaitUntil(() -> headAssignCaptor.getValue() != null); - awaitUntil(() -> headHandlerCaptor.getValue() != null); + awaitHeadHandler(); // simulate messages for the fetched offsets long fromOffset = offsetFromAssign(headAssignCaptor.getValue()); - simulateMessages(headHandlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset()); + simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset()); // the consumer returns the offset queue consumer.get(15, SECONDS); assertEquals(100, cache.getMinOffset()); } + private MessageHandler<PackageMessage> awaitHeadHandler() { + return Awaitility.await().ignoreExceptions().until(() -> handlerCaptor.getAllValues().get(1).getHandler(), notNullValue()); + } + @Test public void testFetchWithConcurrentConsumer() throws Exception { // build two consumers for same agent queue, from offset 100 - Future<OffsetQueue<DistributionQueueItem>> consumer1 = executor.submit(new Consumer(PUB_AGENT_NAME_1, 100)); - Future<OffsetQueue<DistributionQueueItem>> consumer2 = executor.submit(new Consumer(PUB_AGENT_NAME_1, 100)); + Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100); + Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100); // seeding the cache with a message at offset 200 simulateMessage(tailHandler, 200); // wait that one consumer has started fetching the offsets from 100 to 200 - awaitUntil(() -> headAssignCaptor.getValue() != null); - awaitUntil(() -> headHandlerCaptor.getValue() != null); + MessageHandler<PackageMessage> headHandler = awaitHeadHandler(); // simulate messages for the fetched offsets long fromOffset = offsetFromAssign(headAssignCaptor.getValue()); - simulateMessages(headHandlerCaptor.getValue().getHandler(), fromOffset, cache.getMinOffset()); + simulateMessages(headHandler, fromOffset, cache.getMinOffset()); // both consumers returns the offset queue OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS); OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS); assertEquals(q1.getSize(), q2.getSize()); assertEquals(100, cache.getMinOffset()); // the offsets have been fetched only once - assertEquals(1, headHandlerCaptor.getAllValues().size()); + assertEquals(2, handlerCaptor.getAllValues().size()); } @Test public void testCacheSize() throws Exception { - Future<OffsetQueue<DistributionQueueItem>> consumer = executor.submit(new Consumer(PUB_AGENT_NAME_1, 0)); + Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0); simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0); simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1); simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2); @@ -255,28 +222,12 @@ public class PubQueueCacheTest { @Test(expected = ExecutionException.class) public void testCloseUnseededPoller() throws Throwable { - FutureTask<OffsetQueue<DistributionQueueItem>> task = new FutureTask<>(new Consumer(PUB_AGENT_NAME_1, 0)); - Thread th = new Thread(task); - th.start(); + Future<OffsetQueue<DistributionQueueItem>> task = consumer(PUB_AGENT_NAME_1, 0); Awaitility.setDefaultPollDelay(Duration.ZERO); - await().until(th::getState, equalTo(State.TIMED_WAITING)); cache.close(); task.get(); } - @Test - public void testFetchWithOnlyTestMessage() throws Exception { - long requestedMinOffset = 0; - PackageMessage seedingMessage = seedingMessageCaptor.getValue(); - simulateMessage(tailHandler, seedingMessage, 200000); - Future<OffsetQueue<DistributionQueueItem>> consumer = executor.submit(new Consumer(PUB_AGENT_NAME_1, requestedMinOffset)); - awaitUntil(() -> headHandlerCaptor.getValue() != null); - MessageHandler<PackageMessage> headHandler = headHandlerCaptor.getValue().getHandler(); - simulateMessage(headHandler, seedingMessage, 200000); - consumer.get(10, SECONDS); - assertEquals("After we fetched from 0 we expect this to be the cached min offset.", - requestedMinOffset, cache.getMinOffset()); - } private void awaitUntil(Callable<Boolean> callable) { await().atMost(15, SECONDS).ignoreExceptions().until(callable); @@ -308,28 +259,14 @@ public class PubQueueCacheTest { handler.handle(new TestMessageInfo(TOPIC, 0, offset, currentTimeMillis()), msg); } - - private class Consumer implements Callable<OffsetQueue<DistributionQueueItem>> { - - final String pubAgentName; - - final long minOffset; - - private Consumer(String pubAgentName, long minOffset) { - this.pubAgentName = pubAgentName; - this.minOffset = minOffset; - } - - @Override - public OffsetQueue<DistributionQueueItem> call() throws Exception { - return cache.getOffsetQueue(pubAgentName, minOffset); - } + Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName, long minOffset) { + return executor.submit(() -> cache.getOffsetQueue(pubAgentName, minOffset)); } private OffsetQueue<DistributionQueueItem> get(Future<OffsetQueue<DistributionQueueItem>> future) { try { - return future.get(); - } catch (Exception e) { + return future.get(1, SECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index 7edfb05..3a1a27a 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -114,6 +114,7 @@ public class PubQueueProviderTest { when(clientProvider.createPoller( Mockito.eq(Topics.PACKAGE_TOPIC), Mockito.any(Reset.class), + Mockito.anyString(), handlerCaptor.capture())) .thenReturn(poller); when(clientProvider.createPoller( @@ -129,6 +130,7 @@ public class PubQueueProviderTest { pubQueueCacheService.activate(); queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics); queueProvider.activate(); + pubQueueCacheService.seed(0); handler = handlerCaptor.getValue().getHandler(); statHandler = statHandlerCaptor.getValue().getHandler(); }
