This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch SLING-9482-1 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit e68eb0b5a66571c8730e2b5405884fca0ade8991 Author: tmaret <[email protected]> AuthorDate: Wed Jun 3 22:52:05 2020 +0200 SLING-9482 - Ensure the cache can be seeded when the seed no longer exists on the journal --- .../journal/impl/queue/impl/PubQueueCache.java | 23 ++-- .../impl/queue/impl/PubQueueCacheService.java | 4 +- .../journal/impl/queue/impl/QueueCacheSeeder.java | 124 +++++++++++++++++++++ .../journal/impl/subscriber/LocalStore.java | 4 + .../journal/impl/queue/impl/PubQueueCacheTest.java | 15 ++- .../impl/queue/impl/PubQueueProviderTest.java | 8 +- .../impl/queue/impl/QueueCacheSeederTest.java | 119 ++++++++++++++++++++ .../journal/impl/subscriber/LocalStoreTest.java | 12 +- 8 files changed, 288 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java index c6bf1de..ec811d2 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java @@ -106,7 +106,7 @@ public class PubQueueCache { * Holds the last known seed offset stored to the * seed store. */ - private volatile long seedOffset = 0L; + private volatile long seedOffset = -1L; private final Set<JMXRegistration> jmxRegs = new HashSet<>(); @@ -116,34 +116,42 @@ public class PubQueueCache { private volatile Closeable tailPoller; + private final QueueCacheSeeder seeder; + private final String topic; private final LocalStore seedStore; private final DistributionMetricsService distributionMetricsService; - public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) { + public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) { this.messagingProvider = messagingProvider; this.eventAdmin = eventAdmin; this.distributionMetricsService = distributionMetricsService; this.topic = topic; this.seedStore = seedStore; - Long offset = seedStore.load().get("offset", Long.class); + this.seeder = seeder; + Long offset = seedStore.load("offset", Long.class); if (offset != null) { seedOffset = offset; + startPoller(seedOffset); + /* + * We need at least one seeding message + * for cases where the seedOffset is no + * longer on the journal. + */ + seeder.seedOne(); } else { /* * Fallback to seeding messages when * no offset could be found in the * repository. */ - seedOffset = messagingProvider.retrieveOffset(topic, Reset.latest); - storeSeed(seedOffset); + seeder.seed(this::startPoller); } - seed(seedOffset); } - private void seed(long offset) { + private void startPoller(long offset) { LOG.info("Seed with offset: {}", offset); String assignTo = messagingProvider.assignTo(offset); tailPoller = messagingProvider.createPoller( @@ -185,6 +193,7 @@ public class PubQueueCache { public void close() { IOUtils.closeQuietly(tailPoller); + IOUtils.closeQuietly(seeder); jmxRegs.forEach(IOUtils::closeQuietly); } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java index 304bfb4..2eddc6f 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java @@ -155,6 +155,8 @@ public class PubQueueCacheService { private PubQueueCache newCache() { LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId); - return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), seedStore); + String topic = topics.getPackageTopic(); + QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic); + return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder); } } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java new file mode 100644 index 0000000..79f6063 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.queue.impl; + +import java.io.Closeable; +import java.util.UUID; +import java.util.function.LongConsumer; + +import org.apache.commons.io.IOUtils; +import org.apache.sling.distribution.journal.MessageSender; +import org.apache.sling.distribution.journal.MessagingException; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.sling.distribution.journal.HandlerAdapter.create; +import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread; + +public class QueueCacheSeeder implements Closeable { + + + private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class); + + /** + * Interval in millisecond between two seeding messages to seed the cache. + */ + private static final long CACHE_SEEDING_DELAY_MS = 10_000; + + private final String topic; + + private final MessagingProvider provider; + + private volatile Closeable poller; + + private volatile boolean closed; + + public QueueCacheSeeder(MessagingProvider provider, String topic) { + this.provider = provider; + this.topic = topic; + } + + public void seedOne() { + startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed"); + } + + public void seed(LongConsumer callback) { + poller = provider.createPoller(topic, Reset.latest, + create(PackageMessage.class, (info, msg) -> { + close(); + callback.accept(info.getOffset()); + })); + startBackgroundThread(this::sendSeedingMessages, "Seeder thread"); + } + + @Override + public void close() { + closed = true; + IOUtils.closeQuietly(poller); + } + + private void sendSeedingMessages() { + LOG.info("Start message seeder"); + try { + MessageSender<PackageMessage> sender = provider.createSender(); + while (!closed) { + sendSeedingMessage(sender); + delay(CACHE_SEEDING_DELAY_MS); + } + } finally { + LOG.info("Stop message seeder"); + } + } + + private void sendSeedingMessage() { + sendSeedingMessage(provider.createSender()); + } + + private void sendSeedingMessage(MessageSender<PackageMessage> sender) { + PackageMessage pkgMsg = createTestMessage(); + LOG.info("Send seeding message"); + try { + sender.send(topic, pkgMsg); + } catch (MessagingException e) { + LOG.warn(e.getMessage(), e); + delay(CACHE_SEEDING_DELAY_MS * 10); + } + } + + private static void delay(long sleepMs) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + protected PackageMessage createTestMessage() { + String pkgId = UUID.randomUUID().toString(); + return PackageMessage.newBuilder() + .setPubSlingId("seeder") + .setPkgId(pkgId) + .setPkgType("seeder") + .setReqType(PackageMessage.ReqType.TEST) + .build(); + } +} diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java index b59fbde..878c831 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java @@ -91,6 +91,10 @@ public class LocalStore { LOG.debug(String.format("Stored data %s for storeId %s", map.toString(), storeId)); } + public <T> T load(String key, Class<T> clazz) { + return load().get(key, clazz); + } + public <T> T load(String key, T defaultValue) { LOG.debug(String.format("Loading key %s for storeId %s with default value %s", key, storeId, defaultValue)); return load().get(key, defaultValue); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java index e8881bf..3f0dc5e 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java @@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; @@ -84,9 +85,6 @@ public class PubQueueCacheTest { private static final String PUB_AGENT_NAME_3 = "pubAgentName3"; private static final Random RAND = new Random(); - - @Captor - private ArgumentCaptor<PackageMessage> seedingMessageCaptor; @Captor private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor; @@ -101,12 +99,18 @@ public class PubQueueCacheTest { private MessagingProvider clientProvider; @Mock + private QueueCacheSeeder cacheSeeder; + + @Mock private DistributionMetricsService distributionMetricsService; @Mock private Counter counter; @Mock + private LocalStore seedStore; + + @Mock private Closeable poller; private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory(); @@ -131,8 +135,9 @@ public class PubQueueCacheTest { when(distributionMetricsService.getQueueCacheFetchCount()) .thenReturn(counter); - LocalStore seedStore = new LocalStore(resolverFactory, "seeds", PUB_SLING_ID); - cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore); + when(seedStore.load(anyString(), any())).thenReturn(0L); + + cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder); cache.storeSeed(); executor = Executors.newFixedThreadPool(10); diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java index b93003a..009235c 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java @@ -38,10 +38,13 @@ import javax.management.ObjectInstance; import javax.management.ObjectName; import javax.management.ReflectionException; +import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.MessageSender; import com.google.protobuf.GeneratedMessage; + +import org.apache.sling.distribution.journal.impl.subscriber.LocalStore; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; import org.apache.sling.distribution.queue.spi.DistributionQueue; @@ -113,7 +116,7 @@ public class PubQueueProviderTest { private MBeanServer mbeanServer; @Before - public void before() { + public void before() throws PersistenceException { MockitoAnnotations.initMocks(this); when(clientProvider.createPoller( Mockito.eq(Topics.PACKAGE_TOPIC), @@ -131,11 +134,12 @@ public class PubQueueProviderTest { Topics topics = new Topics(); String slingId = UUID.randomUUID().toString(); when(slingSettings.getSlingId()).thenReturn(slingId); + LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId); + seedStore.store("offset", 1L); pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId); pubQueueCacheService.activate(); queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics); queueProvider.activate(); - pubQueueCacheService.storeSeed(); handler = handlerCaptor.getValue().getHandler(); statHandler = statHandlerCaptor.getValue().getHandler(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java new file mode 100644 index 0000000..eeebb7a --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.queue.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.function.LongConsumer; + +import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.MessageSender; +import org.apache.sling.distribution.journal.MessagingProvider; +import org.apache.sling.distribution.journal.Reset; +import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo; +import org.apache.sling.distribution.journal.messages.Messages.PackageMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.mockito.runners.MockitoJUnitRunner; + +import static java.lang.System.currentTimeMillis; +import static org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC; +import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class QueueCacheSeederTest { + + @Mock + private MessagingProvider clientProvider; + + @Captor + private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor; + + @Captor + private ArgumentCaptor<PackageMessage> pkgMsgCaptor; + + @Mock + private Closeable poller; + + @Mock + private MessageSender<PackageMessage> sender; + + @Mock + private LongConsumer callback; + + private QueueCacheSeeder seeder; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + when(clientProvider.createPoller( + eq(PACKAGE_TOPIC), + any(Reset.class), + pkgHandlerCaptor.capture())) + .thenReturn(poller); + doNothing().when(sender).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture()); + when(clientProvider.<PackageMessage>createSender()) + .thenReturn(sender); + seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC); + } + + @Test + public void testSeededCallback() throws IOException { + seeder.seed(callback); + long offset = 15L; + simulateSeedingMsg(offset); + verify(callback).accept(offset); + verify(poller).close(); + } + + @Test + public void testSendingSeeds() { + seeder.seed(callback); + verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture()); + PackageMessage seedMsg = pkgMsgCaptor.getValue(); + assertNotNull(seedMsg); + assertEquals(TEST, seedMsg.getReqType()); + } + + @After + public void after() { + seeder.close(); + } + + private void simulateSeedingMsg(long offset) { + PackageMessage msg = seeder.createTestMessage(); + pkgHandlerCaptor.getValue().getHandler().handle( + new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()), + msg); + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java index 4bc8761..c989b4a 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java @@ -54,9 +54,9 @@ public class LocalStoreTest { ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null); LocalStore offsetStore = new LocalStore(resolverFactory, "packages", "store3"); offsetStore.store(resolver, "key1", "value1"); - assertNull(offsetStore.load("key1", null)); + assertNull(offsetStore.load("key1", String.class)); resolver.commit(); - assertEquals("value1", offsetStore.load("key1", null)); + assertEquals("value1", offsetStore.load("key1", String.class)); } @Test @@ -72,8 +72,8 @@ public class LocalStoreTest { statusStore.store(resolver, map); resolver.commit(); - assertEquals("value1", statusStore.load("key1", null)); - assertEquals(false, statusStore.load("key2", null)); + assertEquals("value1", statusStore.load("key1", String.class)); + assertEquals(false, statusStore.load("key2", Boolean.class)); } @Test @@ -92,7 +92,7 @@ public class LocalStoreTest { statusStore.store(resolver, "key2", true); resolver.commit(); - assertEquals("value1", statusStore.load("key1", null)); - assertEquals(true, statusStore.load("key2", null)); + assertEquals("value1", statusStore.load("key1", String.class)); + assertEquals(true, statusStore.load("key2", Boolean.class)); } }
