This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new d28b841 SLING-9460 - Avoid seeding messages in PubQueueCache
d28b841 is described below
commit d28b841a1d1007bae8d370e41986d4cb6d6e02a0
Author: Christian Schneider <[email protected]>
AuthorDate: Sat May 23 15:55:34 2020 +0200
SLING-9460 - Avoid seeding messages in PubQueueCache
---
.../journal/impl/publisher/DiscoveryService.java | 12 +-
.../journal/impl/queue/impl/PubQueueCache.java | 112 +++---------------
.../impl/queue/impl/PubQueueCacheService.java | 13 +-
.../impl/publisher/DiscoveryServiceTest.java | 108 +++++++++++------
.../journal/impl/queue/impl/PubQueueCacheTest.java | 131 +++++----------------
.../impl/queue/impl/PubQueueProviderTest.java | 2 +
6 files changed, 140 insertions(+), 238 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 128d453..2e0b665 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -29,6 +29,7 @@ import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
+import
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
@@ -78,6 +79,9 @@ public class DiscoveryService implements Runnable {
@Reference
private TopologyChangeHandler topologyChangeHandler;
+
+ @Reference
+ private PubQueueCacheService pubQueueCacheService;
private volatile ServiceRegistration<?> reg;
@@ -91,10 +95,12 @@ public class DiscoveryService implements Runnable {
public DiscoveryService(
MessagingProvider messagingProvider,
TopologyChangeHandler topologyChangeHandler,
- Topics topics) {
+ Topics topics,
+ PubQueueCacheService pubQueueCacheService) {
this.messagingProvider = messagingProvider;
this.topologyChangeHandler = topologyChangeHandler;
this.topics = topics;
+ this.pubQueueCacheService = pubQueueCacheService;
}
@Activate
@@ -151,12 +157,14 @@ public class DiscoveryService implements Runnable {
long now = System.currentTimeMillis();
AgentId subAgentId = new AgentId(disMsg.getSubSlingId(),
disMsg.getSubAgentName());
-
+ long minOffset = Long.MAX_VALUE;
for (Messages.SubscriberState subStateMsg :
disMsg.getSubscriberStateList()) {
SubscriberConfiguration subConfig =
disMsg.getSubscriberConfiguration();
State subState = new State(subStateMsg.getPubAgentName(),
subAgentId.getAgentId(), now, subStateMsg.getOffset(),
subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.getEditable());
viewManager.refreshState(subState);
+ minOffset = Math.min(minOffset, subState.getOffset());
}
+ pubQueueCacheService.seed(minOffset);
}
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 438af2e..ede183d 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -19,7 +19,6 @@
package org.apache.sling.distribution.journal.impl.queue.impl;
-import static java.lang.System.currentTimeMillis;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -30,9 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -55,11 +52,8 @@ import
org.apache.sling.distribution.journal.impl.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.RunnableUtil;
/**
* Cache the distribution packages fetched from the package topic.
@@ -85,12 +79,6 @@ public class PubQueueCache {
private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues
= new ConcurrentHashMap<>();
/**
- * Blocks the threads awaiting until the agentQueues
- * cache has been seeded.
- */
- private final CountDownLatch seeded = new CountDownLatch(1);
-
- /**
* Only allows to fetch data from the journal
* with a single thread.
*/
@@ -110,37 +98,35 @@ public class PubQueueCache {
private final EventAdmin eventAdmin;
- private final Closeable tailPoller;
+ private volatile Closeable tailPoller;
private final String topic;
- private final long seedingDelayMs;
-
private final DistributionMetricsService distributionMetricsService;
-
- /**
- * Way out for the threads awaiting on the seeded
- * latch, when the component is deactivated.
- */
+
private volatile boolean closed;
- public PubQueueCache(MessagingProvider messagingProvider, EventAdmin
eventAdmin, DistributionMetricsService distributionMetricsService, String
topic, long seedingDelayMs) {
+ public PubQueueCache(MessagingProvider messagingProvider, EventAdmin
eventAdmin, DistributionMetricsService distributionMetricsService, String
topic) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
this.distributionMetricsService = distributionMetricsService;
- this.seedingDelayMs = seedingDelayMs;
this.topic = topic;
+ }
- tailPoller = messagingProvider.createPoller(
- topic,
+ public void seed(long offset) {
+ if (tailPoller == null) {
+ String assignTo = messagingProvider.assignTo(offset);
+ tailPoller = messagingProvider.createPoller(
+ this.topic,
Reset.latest,
+ assignTo,
create(PackageMessage.class, this::handlePackage));
-
- RunnableUtil.startBackgroundThread(this::seedCache, "queue seeding");
+ }
}
@Nonnull
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String
pubAgentName, long minOffset) throws InterruptedException {
+ waitSeeded();
fetchIfNeeded(minOffset);
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
@@ -150,62 +136,11 @@ public class PubQueueCache {
}
public void close() {
-
- /*
- * Note that we don't close resources using Thread.interrupt()
- * because interrupts can stop the Apache Oak repository.
- *
- * See SLING-9340, OAK-2609 and
https://jackrabbit.apache.org/oak/docs/dos_and_donts.html
- */
-
closed = true;
IOUtils.closeQuietly(tailPoller);
jmxRegs.forEach(IOUtils::closeQuietly);
}
- private void seedCache() {
- LOG.info("Start message seeder");
- try {
- MessageSender<PackageMessage> sender =
messagingProvider.createSender();
- do {
- sendSeedingMessage(sender);
- } while (! closed && ! seeded.await(seedingDelayMs, MILLISECONDS));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- LOG.info("Stop message seeder");
- }
- }
-
- private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
- PackageMessage pkgMsg = createTestMessage();
- LOG.info("Send seeding message");
- try {
- sender.send(topic, pkgMsg);
- } catch (MessagingException e) {
- LOG.warn(e.getMessage(), e);
- delay(seedingDelayMs * 10);
- }
- }
-
- private static void delay(long sleepMs) {
- try {
- Thread.sleep(sleepMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private PackageMessage createTestMessage() {
- String pkgId = UUID.randomUUID().toString();
- return PackageMessage.newBuilder()
- .setPubSlingId("seeder")
- .setPkgId(pkgId)
- .setPkgType("seeder")
- .setReqType(PackageMessage.ReqType.TEST)
- .build();
- }
-
/**
* Fetch the package messages from the requested min offset,
* up to the current cached min offset.
@@ -213,13 +148,7 @@ public class PubQueueCache {
* @param requestedMinOffset the min offset to start fetching data from
*/
private void fetchIfNeeded(long requestedMinOffset) throws
InterruptedException {
-
- // We wait on the cache to be seeded (at least one message handled)
- // before computing potential missing offsets.
- waitSeeded();
-
long cachedMinOffset = getMinOffset();
-
if (requestedMinOffset < cachedMinOffset) {
LOG.debug("Requested min offset {} smaller than cached min offset
{}", requestedMinOffset, cachedMinOffset);
@@ -271,15 +200,14 @@ public class PubQueueCache {
}
private void waitSeeded() throws InterruptedException {
- long start = currentTimeMillis();
- while (!closed && currentTimeMillis() - start < MAX_FETCH_WAIT_MS) {
- if (seeded.await(seedingDelayMs, MILLISECONDS)) {
- return;
- } else {
- LOG.debug("Waiting for seeded cache");
+ long start = System.currentTimeMillis();
+ while (getMinOffset() == Long.MAX_VALUE) {
+ LOG.debug("Waiting for seeded cache");
+ if (closed || System.currentTimeMillis() - start >
MAX_FETCH_WAIT_MS) {
+ throw new RuntimeException("Gave up waiting for seeded cache");
}
+ Thread.sleep(1000);
}
- throw new RuntimeException("Gave up waiting for seeded cache");
}
protected long getMinOffset() {
@@ -340,9 +268,5 @@ public class PubQueueCache {
private void handlePackage(final MessageInfo info, final PackageMessage
message) {
merge(singletonList(new FullMessage<>(info, message)));
- if (seeded.getCount() > 0) {
- LOG.info("Cache has been seeded");
- }
- seeded.countDown();
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 8f49687..6609b8b 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -57,11 +57,6 @@ public class PubQueueCacheService implements Runnable {
private static final int CLEANUP_THRESHOLD = 10_000;
/**
- * Interval in millisecond between two seeding messages to seed the cache.
- */
- private static final long CACHE_SEEDING_DELAY_MS = 10_000;
-
- /**
* Will cause the cache to be cleared when we loose the journal
*/
@Reference
@@ -115,6 +110,12 @@ public class PubQueueCacheService implements Runnable {
}
}
+ public void seed(long offset) {
+ if (cache != null) {
+ cache.seed(offset);
+ }
+ }
+
private void cleanup() {
if (cache != null) {
int size = cache.size();
@@ -129,7 +130,7 @@ public class PubQueueCacheService implements Runnable {
}
private PubQueueCache newCache() {
- return new PubQueueCache(messagingProvider, eventAdmin,
distributionMetricsService, topics.getPackageTopic(), CACHE_SEEDING_DELAY_MS);
+ return new PubQueueCache(messagingProvider, eventAdmin,
distributionMetricsService, topics.getPackageTopic());
}
@Override
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index 02a58c1..736618e 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -18,50 +18,84 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
-import static
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.UUID;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
+import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
import org.mockito.Mockito;
-
-import
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.apache.sling.distribution.journal.messages.Messages.SubscriberState;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
-@SuppressWarnings({ "unchecked", "rawtypes" })
+@RunWith(MockitoJUnitRunner.class)
public class DiscoveryServiceTest {
private static final String SUB1_SLING_ID = UUID.randomUUID().toString();
private static final String SUB1_AGENT = "subagent";
private static final String PUB1_AGENT_NAME = "pubagent1";
+ @Mock
private Closeable poller;
+
+ @Mock
+ BundleContext bundleContext;
+
+ @Mock
+ MessagingProvider clientProvider;
+
+ @Captor
+ ArgumentCaptor<HandlerAdapter<DiscoveryMessage>> captureHandler;
+
+ @Spy
+ Topics topics = new Topics();
+
+ @Mock
+ TopologyChangeHandler topologyChangeHandler;
+
+ @Mock
+ private PubQueueCacheService pubQueueCacheService;
+
private MessageHandler<DiscoveryMessage> discoveryHandler;
+
private DiscoveryService discoveryService;
+
@Before
public void before() {
- mockDiscoveryService();
+ discoveryService = new DiscoveryService(
+ clientProvider, topologyChangeHandler,
+ topics, pubQueueCacheService);
+ when(clientProvider.createPoller(
+ Mockito.anyString(),
+ Mockito.any(Reset.class),
+ captureHandler.capture())).thenReturn(poller);
+ discoveryService.activate(bundleContext);
+ discoveryHandler = captureHandler.getValue().getHandler();
}
@Test
@@ -69,13 +103,25 @@ public class DiscoveryServiceTest {
String subAgentId = SUB1_SLING_ID + "-" + SUB1_AGENT;
assertTrue(discoveryService.getTopologyView().getSubscriberAgentStates(subAgentId).isEmpty());
- MessageInfo info = new TestMessageInfo("topic", 0, 0, 0);
- DiscoveryMessage message = discoveryMessage(SUB1_SLING_ID, SUB1_AGENT,
PUB1_AGENT_NAME, 10);
- discoveryHandler.handle(info, message);
+ DiscoveryMessage message = discoveryMessage(SUB1_SLING_ID, SUB1_AGENT,
+ subscriberState(PUB1_AGENT_NAME, 10));
+ discoveryHandler.handle((MessageInfo) messageInfo(0), message);
discoveryService.run();
assertThat(discoveryService.getTopologyView().getState(subAgentId,
PUB1_AGENT_NAME).getOffset(), equalTo(10L));
}
+
+ @Test
+ public void testPubQueueCacheSeed() throws IOException {
+ DiscoveryMessage message = discoveryMessage(
+ SUB1_SLING_ID,
+ SUB1_AGENT,
+ subscriberState(PUB1_AGENT_NAME, 20),
+ subscriberState(PUB1_AGENT_NAME, 10)
+ );
+ discoveryHandler.handle(messageInfo(0), message);
+ verify(pubQueueCacheService).seed(Mockito.eq(10l));
+ }
@After
public void after() throws IOException {
@@ -88,27 +134,11 @@ public class DiscoveryServiceTest {
// TODO If a subscriber does not respond after a certain timeout its
offsets must be purged
}
- private void mockDiscoveryService() {
- poller = mock(Closeable.class);
- BundleContext bundleContext = mock(BundleContext.class);
- MessagingProvider clientProvider = mock(MessagingProvider.class);
- ArgumentCaptor<HandlerAdapter> captureHandler =
ArgumentCaptor.forClass(HandlerAdapter.class);
- when(clientProvider.createPoller(
- Mockito.anyString(),
- Mockito.any(Reset.class),
- captureHandler.capture())).thenReturn(poller);
- Topics topics = mock(Topics.class);
- TopologyChangeHandler topologyChangeHandler =
mock(TopologyChangeHandler.class);
- discoveryService = new DiscoveryService(clientProvider,
topologyChangeHandler, topics);
- discoveryService.activate(bundleContext);
- discoveryHandler = captureHandler.getValue().getHandler();
+ private MessageInfo messageInfo(int offset) {
+ return new TestMessageInfo("topic", 0, offset, 0);
}
- private DiscoveryMessage discoveryMessage(String subSlingId, String
subAgentName, String pubAgentName, int offset) {
- SubscriberState queueoffset = SubscriberState.newBuilder()
- .setPubAgentName(pubAgentName)
- .setOffset(offset).build();
-
+ private DiscoveryMessage discoveryMessage(String subSlingId, String
subAgentName, SubscriberState... subStates) {
return DiscoveryMessage.newBuilder()
.setSubSlingId(subSlingId)
.setSubAgentName(subAgentName)
@@ -117,6 +147,12 @@ public class DiscoveryServiceTest {
.setEditable(false)
.setMaxRetries(-1)
.build())
- .addSubscriberState(queueoffset).build();
+ .addAllSubscriberState(Arrays.asList(subStates)).build();
+ }
+
+ private SubscriberState subscriberState(String pubAgentName, int offset) {
+ return SubscriberState.newBuilder()
+ .setPubAgentName(pubAgentName)
+ .setOffset(offset).build();
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index c6245b8..7d0808a 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -23,37 +23,31 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.ADD;
import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
import static
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
-import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
-import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeoutException;
import java.util.stream.LongStream;
import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
@@ -71,7 +65,6 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -96,10 +89,7 @@ public class PubQueueCacheTest {
private ArgumentCaptor<PackageMessage> seedingMessageCaptor;
@Captor
- private ArgumentCaptor<HandlerAdapter<PackageMessage>> tailHandlerCaptor;
-
- @Captor
- private ArgumentCaptor<HandlerAdapter<PackageMessage>> headHandlerCaptor;
+ private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@Captor
private ArgumentCaptor<String> headAssignCaptor;
@@ -117,9 +107,6 @@ public class PubQueueCacheTest {
private Counter counter;
@Mock
- private MessageSender<PackageMessage> pkgSender;
-
- @Mock
private Closeable poller;
private PubQueueCache cache;
@@ -135,28 +122,18 @@ public class PubQueueCacheTest {
when(clientProvider.createPoller(
eq(TOPIC),
any(Reset.class),
- tailHandlerCaptor.capture()))
- .thenReturn(poller);
-
- when(clientProvider.createPoller(
- eq(TOPIC),
- any(Reset.class),
headAssignCaptor.capture(),
- headHandlerCaptor.capture()))
+ handlerCaptor.capture()))
.thenReturn(poller);
- when(clientProvider.<PackageMessage>createSender())
- .thenReturn(pkgSender);
-
when(distributionMetricsService.getQueueCacheFetchCount())
.thenReturn(counter);
- cache = new PubQueueCache(clientProvider, eventAdmin,
distributionMetricsService, TOPIC, 250);
- verify(pkgSender, timeout(5000)).send(Mockito.eq(TOPIC),
seedingMessageCaptor.capture());
-
- executor = Executors.newFixedThreadPool(10);
+ cache = new PubQueueCache(clientProvider, eventAdmin,
distributionMetricsService, TOPIC);
+ cache.seed(0);
- tailHandler = tailHandlerCaptor.getValue().getHandler();
+ executor = Executors.newFixedThreadPool(10);
+ tailHandler = handlerCaptor.getValue().getHandler();
}
@After
@@ -167,7 +144,7 @@ public class PubQueueCacheTest {
@Test
public void testSeedingFromNewPackageMessage() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
+ Future<OffsetQueue<DistributionQueueItem>> consumer =
consumer(PUB_AGENT_NAME_1, 0);
// The consumer is blocked until the cache is seeded
assertFalse(consumer.isDone());
// sending any package message seeds the cache
@@ -176,27 +153,13 @@ public class PubQueueCacheTest {
}
@Test
- public void testSeedingFromSeedingMessage() throws Exception {
- ArgumentCaptor<PackageMessage> seedingMsgCaptor =
ArgumentCaptor.forClass(PackageMessage.class);
- Future<OffsetQueue<DistributionQueueItem>> consumer =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
- // The consumer is blocked until the cache is seeded
- assertFalse(consumer.isDone());
- // wait until a seeding message is sent and captured
- verify(pkgSender, timeout(15000).atLeastOnce()).send(eq(TOPIC),
seedingMsgCaptor.capture());
- // sending the captured seeding message seeds the cache
- simulateMessage(tailHandler, seedingMsgCaptor.getValue(), 0);
- consumer.get(15, SECONDS);
- }
-
- @Test
public void testSeedingConcurrentConsumers() throws Exception {
List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new
ArrayList<>();
- consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_1, 0)));
- consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_2, 0)));
- consumers.add(executor.submit(new Consumer(PUB_AGENT_NAME_3, 0)));
+ consumers.add(consumer(PUB_AGENT_NAME_1, 0));
+ consumers.add(consumer(PUB_AGENT_NAME_2, 0));
+ consumers.add(consumer(PUB_AGENT_NAME_3, 0));
// All consumers are blocked until the cache is seeded
consumers.forEach(future -> assertFalse(future.isDone()));
- // sending any package message seeds the cache
simulateMessage(tailHandler, 0);
consumers.forEach(future -> assertNotNull(get(future)));
}
@@ -204,45 +167,47 @@ public class PubQueueCacheTest {
@Test
public void testFetchWithSingleConsumer() throws Exception {
// build a consumer form offset 100
- Future<OffsetQueue<DistributionQueueItem>> consumer =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
+ Future<OffsetQueue<DistributionQueueItem>> consumer =
consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
simulateMessage(tailHandler, 200);
// wait that the consumer has started fetching the offsets from 100 to
200
- awaitUntil(() -> headAssignCaptor.getValue() != null);
- awaitUntil(() -> headHandlerCaptor.getValue() != null);
+ awaitHeadHandler();
// simulate messages for the fetched offsets
long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
- simulateMessages(headHandlerCaptor.getValue().getHandler(),
fromOffset, cache.getMinOffset());
+ simulateMessages(handlerCaptor.getValue().getHandler(), fromOffset,
cache.getMinOffset());
// the consumer returns the offset queue
consumer.get(15, SECONDS);
assertEquals(100, cache.getMinOffset());
}
+ private MessageHandler<PackageMessage> awaitHeadHandler() {
+ return Awaitility.await().ignoreExceptions().until(() ->
handlerCaptor.getAllValues().get(1).getHandler(), notNullValue());
+ }
+
@Test
public void testFetchWithConcurrentConsumer() throws Exception {
// build two consumers for same agent queue, from offset 100
- Future<OffsetQueue<DistributionQueueItem>> consumer1 =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
- Future<OffsetQueue<DistributionQueueItem>> consumer2 =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 100));
+ Future<OffsetQueue<DistributionQueueItem>> consumer1 =
consumer(PUB_AGENT_NAME_1, 100);
+ Future<OffsetQueue<DistributionQueueItem>> consumer2 =
consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
simulateMessage(tailHandler, 200);
// wait that one consumer has started fetching the offsets from 100 to
200
- awaitUntil(() -> headAssignCaptor.getValue() != null);
- awaitUntil(() -> headHandlerCaptor.getValue() != null);
+ MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
// simulate messages for the fetched offsets
long fromOffset = offsetFromAssign(headAssignCaptor.getValue());
- simulateMessages(headHandlerCaptor.getValue().getHandler(),
fromOffset, cache.getMinOffset());
+ simulateMessages(headHandler, fromOffset, cache.getMinOffset());
// both consumers returns the offset queue
OffsetQueue<DistributionQueueItem> q1 = consumer1.get(5, SECONDS);
OffsetQueue<DistributionQueueItem> q2 = consumer2.get(5, SECONDS);
assertEquals(q1.getSize(), q2.getSize());
assertEquals(100, cache.getMinOffset());
// the offsets have been fetched only once
- assertEquals(1, headHandlerCaptor.getAllValues().size());
+ assertEquals(2, handlerCaptor.getAllValues().size());
}
@Test
public void testCacheSize() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer =
executor.submit(new Consumer(PUB_AGENT_NAME_1, 0));
+ Future<OffsetQueue<DistributionQueueItem>> consumer =
consumer(PUB_AGENT_NAME_1, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
@@ -255,34 +220,14 @@ public class PubQueueCacheTest {
@Test(expected = ExecutionException.class)
public void testCloseUnseededPoller() throws Throwable {
- FutureTask<OffsetQueue<DistributionQueueItem>> task = new
FutureTask<>(new Consumer(PUB_AGENT_NAME_1, 0));
- Thread th = new Thread(task);
- th.start();
+ Future<OffsetQueue<DistributionQueueItem>> task =
consumer(PUB_AGENT_NAME_1, 0);
Awaitility.setDefaultPollDelay(Duration.ZERO);
- await().until(th::getState, equalTo(State.TIMED_WAITING));
cache.close();
task.get();
}
- @Test
- public void testFetchWithOnlyTestMessage() throws Exception {
- long requestedMinOffset = 0;
- PackageMessage seedingMessage = seedingMessageCaptor.getValue();
- simulateMessage(tailHandler, seedingMessage, 200000);
- Future<OffsetQueue<DistributionQueueItem>> consumer =
executor.submit(new Consumer(PUB_AGENT_NAME_1, requestedMinOffset));
- awaitUntil(() -> headHandlerCaptor.getValue() != null);
- MessageHandler<PackageMessage> headHandler =
headHandlerCaptor.getValue().getHandler();
- simulateMessage(headHandler, seedingMessage, 200000);
- consumer.get(10, SECONDS);
- assertEquals("After we fetched from 0 we expect this to be the cached
min offset.",
- requestedMinOffset, cache.getMinOffset());
- }
-
- private void awaitUntil(Callable<Boolean> callable) {
- await().atMost(15, SECONDS).ignoreExceptions().until(callable);
- }
- private void simulateMessages(MessageHandler<PackageMessage> handler,
long fromOffset, long toOffset) {
+ private void simulateMessages(MessageHandler<PackageMessage> handler, long
fromOffset, long toOffset) {
LongStream.rangeClosed(fromOffset, toOffset).forEach(offset ->
simulateMessage(handler, offset));
}
@@ -308,28 +253,14 @@ public class PubQueueCacheTest {
handler.handle(new TestMessageInfo(TOPIC, 0, offset,
currentTimeMillis()), msg);
}
-
- private class Consumer implements
Callable<OffsetQueue<DistributionQueueItem>> {
-
- final String pubAgentName;
-
- final long minOffset;
-
- private Consumer(String pubAgentName, long minOffset) {
- this.pubAgentName = pubAgentName;
- this.minOffset = minOffset;
- }
-
- @Override
- public OffsetQueue<DistributionQueueItem> call() throws Exception {
- return cache.getOffsetQueue(pubAgentName, minOffset);
- }
+ Future<OffsetQueue<DistributionQueueItem>> consumer(String pubAgentName,
long minOffset) {
+ return executor.submit(() -> cache.getOffsetQueue(pubAgentName,
minOffset));
}
private OffsetQueue<DistributionQueueItem>
get(Future<OffsetQueue<DistributionQueueItem>> future) {
try {
- return future.get();
- } catch (Exception e) {
+ return future.get(1, SECONDS);
+ } catch (TimeoutException | InterruptedException | ExecutionException
e) {
throw new RuntimeException(e);
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 7edfb05..3a1a27a 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -114,6 +114,7 @@ public class PubQueueProviderTest {
when(clientProvider.createPoller(
Mockito.eq(Topics.PACKAGE_TOPIC),
Mockito.any(Reset.class),
+ Mockito.anyString(),
handlerCaptor.capture()))
.thenReturn(poller);
when(clientProvider.createPoller(
@@ -129,6 +130,7 @@ public class PubQueueProviderTest {
pubQueueCacheService.activate();
queueProvider = new PubQueueProviderImpl(pubQueueCacheService,
clientProvider, topics);
queueProvider.activate();
+ pubQueueCacheService.seed(0);
handler = handlerCaptor.getValue().getHandler();
statHandler = statHandlerCaptor.getValue().getHandler();
}