This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch SLING-9482-1
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit e68eb0b5a66571c8730e2b5405884fca0ade8991
Author: tmaret <[email protected]>
AuthorDate: Wed Jun 3 22:52:05 2020 +0200

    SLING-9482 - Ensure the cache can be seeded when the seed no longer exists 
on the journal
---
 .../journal/impl/queue/impl/PubQueueCache.java     |  23 ++--
 .../impl/queue/impl/PubQueueCacheService.java      |   4 +-
 .../journal/impl/queue/impl/QueueCacheSeeder.java  | 124 +++++++++++++++++++++
 .../journal/impl/subscriber/LocalStore.java        |   4 +
 .../journal/impl/queue/impl/PubQueueCacheTest.java |  15 ++-
 .../impl/queue/impl/PubQueueProviderTest.java      |   8 +-
 .../impl/queue/impl/QueueCacheSeederTest.java      | 119 ++++++++++++++++++++
 .../journal/impl/subscriber/LocalStoreTest.java    |  12 +-
 8 files changed, 288 insertions(+), 21 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index c6bf1de..ec811d2 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -106,7 +106,7 @@ public class PubQueueCache {
      * Holds the last known seed offset stored to the
      * seed store.
      */
-    private volatile long seedOffset = 0L;
+    private volatile long seedOffset = -1L;
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
@@ -116,34 +116,42 @@ public class PubQueueCache {
 
     private volatile Closeable tailPoller;
 
+    private final QueueCacheSeeder seeder;
+
     private final String topic;
 
     private final LocalStore seedStore;
 
     private final DistributionMetricsService distributionMetricsService;
     
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, LocalStore seedStore) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin 
eventAdmin, DistributionMetricsService distributionMetricsService, String 
topic, LocalStore seedStore, QueueCacheSeeder seeder) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
         this.topic = topic;
         this.seedStore = seedStore;
-        Long offset = seedStore.load().get("offset", Long.class);
+        this.seeder = seeder;
+        Long offset = seedStore.load("offset", Long.class);
         if (offset != null) {
             seedOffset = offset;
+            startPoller(seedOffset);
+            /*
+             * We need at least one seeding message
+             * for cases where the seedOffset is no
+             * longer on the journal.
+             */
+            seeder.seedOne();
         } else {
             /*
              * Fallback to seeding messages when
              * no offset could be found in the
              * repository.
              */
-            seedOffset = messagingProvider.retrieveOffset(topic, Reset.latest);
-            storeSeed(seedOffset);
+            seeder.seed(this::startPoller);
         }
-        seed(seedOffset);
     }
 
-    private void seed(long offset) {
+    private void startPoller(long offset) {
         LOG.info("Seed with offset: {}", offset);
         String assignTo = messagingProvider.assignTo(offset);
         tailPoller = messagingProvider.createPoller(
@@ -185,6 +193,7 @@ public class PubQueueCache {
 
     public void close() {
         IOUtils.closeQuietly(tailPoller);
+        IOUtils.closeQuietly(seeder);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 304bfb4..2eddc6f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -155,6 +155,8 @@ public class PubQueueCacheService {
 
     private PubQueueCache newCache() {
         LocalStore seedStore = new LocalStore(resolverFactory, "seeds", 
pubSlingId);
-        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topics.getPackageTopic(), seedStore);
+        String topic = topics.getPackageTopic();
+        QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, 
topic);
+        return new PubQueueCache(messagingProvider, eventAdmin, 
distributionMetricsService, topic, seedStore, seeder);
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
new file mode 100644
index 0000000..79f6063
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.function.LongConsumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
+public class QueueCacheSeeder implements Closeable {
+
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(QueueCacheSeeder.class);
+
+    /**
+     * Interval in millisecond between two seeding messages to seed the cache.
+     */
+    private static final long CACHE_SEEDING_DELAY_MS = 10_000;
+
+    private final String topic;
+
+    private final MessagingProvider provider;
+
+    private volatile Closeable poller;
+
+    private volatile boolean closed;
+
+    public QueueCacheSeeder(MessagingProvider provider, String topic) {
+        this.provider = provider;
+        this.topic = topic;
+    }
+
+    public void seedOne() {
+        startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one 
seed");
+    }
+
+    public void seed(LongConsumer callback) {
+        poller = provider.createPoller(topic, Reset.latest,
+                create(PackageMessage.class, (info, msg) -> {
+                    close();
+                    callback.accept(info.getOffset());
+                }));
+        startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        IOUtils.closeQuietly(poller);
+    }
+
+    private void sendSeedingMessages() {
+        LOG.info("Start message seeder");
+        try {
+            MessageSender<PackageMessage> sender = provider.createSender();
+            while (!closed) {
+                sendSeedingMessage(sender);
+                delay(CACHE_SEEDING_DELAY_MS);
+            }
+        } finally {
+            LOG.info("Stop message seeder");
+        }
+    }
+
+    private void sendSeedingMessage() {
+        sendSeedingMessage(provider.createSender());
+    }
+
+    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+        PackageMessage pkgMsg = createTestMessage();
+        LOG.info("Send seeding message");
+        try {
+            sender.send(topic, pkgMsg);
+        } catch (MessagingException e) {
+            LOG.warn(e.getMessage(), e);
+            delay(CACHE_SEEDING_DELAY_MS * 10);
+        }
+    }
+
+    private static void delay(long sleepMs) {
+        try {
+            Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    protected PackageMessage createTestMessage() {
+        String pkgId = UUID.randomUUID().toString();
+        return PackageMessage.newBuilder()
+                .setPubSlingId("seeder")
+                .setPkgId(pkgId)
+                .setPkgType("seeder")
+                .setReqType(PackageMessage.ReqType.TEST)
+                .build();
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index b59fbde..878c831 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -91,6 +91,10 @@ public class LocalStore {
         LOG.debug(String.format("Stored data %s for storeId %s", 
map.toString(), storeId));
     }
 
+    public <T> T load(String key, Class<T> clazz) {
+        return load().get(key, clazz);
+    }
+
     public <T> T load(String key, T defaultValue) {
         LOG.debug(String.format("Loading key %s for storeId %s with default 
value %s", key, storeId, defaultValue));
         return load().get(key, defaultValue);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index e8881bf..3f0dc5e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
@@ -84,9 +85,6 @@ public class PubQueueCacheTest {
     private static final String PUB_AGENT_NAME_3 = "pubAgentName3";
 
     private static final Random RAND = new Random();
-    
-    @Captor
-    private ArgumentCaptor<PackageMessage> seedingMessageCaptor;
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@@ -101,12 +99,18 @@ public class PubQueueCacheTest {
     private MessagingProvider clientProvider;
 
     @Mock
+    private QueueCacheSeeder cacheSeeder;
+
+    @Mock
     private DistributionMetricsService distributionMetricsService;
 
     @Mock
     private Counter counter;
 
     @Mock
+    private LocalStore seedStore;
+
+    @Mock
     private Closeable poller;
 
     private ResourceResolverFactory resolverFactory = new 
MockResourceResolverFactory();
@@ -131,8 +135,9 @@ public class PubQueueCacheTest {
         when(distributionMetricsService.getQueueCacheFetchCount())
                 .thenReturn(counter);
 
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", 
PUB_SLING_ID);
-        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, seedStore);
+        when(seedStore.load(anyString(), any())).thenReturn(0L);
+
+        cache = new PubQueueCache(clientProvider, eventAdmin, 
distributionMetricsService, TOPIC, seedStore, cacheSeeder);
         cache.storeSeed();
 
         executor = Executors.newFixedThreadPool(10);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index b93003a..009235c 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,10 +38,13 @@ import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
+
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -113,7 +116,7 @@ public class PubQueueProviderTest {
     private MBeanServer mbeanServer;
     
     @Before
-    public void before() {
+    public void before() throws PersistenceException {
         MockitoAnnotations.initMocks(this);
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
@@ -131,11 +134,12 @@ public class PubQueueProviderTest {
         Topics topics = new Topics();
         String slingId = UUID.randomUUID().toString();
         when(slingSettings.getSlingId()).thenReturn(slingId);
+        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", 
slingId);
+        seedStore.store("offset", 1L);
         pubQueueCacheService = new PubQueueCacheService(clientProvider, 
topics, eventAdmin, slingSettings, resolverFactory, slingId);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, 
clientProvider, topics);
         queueProvider.activate();
-        pubQueueCacheService.storeSeed();
         handler = handlerCaptor.getValue().getHandler();
         statHandler = statHandlerCaptor.getValue().getHandler();
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
new file mode 100644
index 0000000..eeebb7a
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.LongConsumer;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static java.lang.System.currentTimeMillis;
+import static 
org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC;
+import static 
org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueueCacheSeederTest {
+
+    @Mock
+    private MessagingProvider clientProvider;
+
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
+
+    @Captor
+    private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
+
+    @Mock
+    private Closeable poller;
+
+    @Mock
+    private MessageSender<PackageMessage> sender;
+
+    @Mock
+    private LongConsumer callback;
+
+    private QueueCacheSeeder seeder;
+
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+        when(clientProvider.createPoller(
+                eq(PACKAGE_TOPIC),
+                any(Reset.class),
+                pkgHandlerCaptor.capture()))
+                .thenReturn(poller);
+        doNothing().when(sender).send(eq(PACKAGE_TOPIC), 
pkgMsgCaptor.capture());
+        when(clientProvider.<PackageMessage>createSender())
+                .thenReturn(sender);
+        seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
+    }
+
+    @Test
+    public void testSeededCallback() throws IOException {
+        seeder.seed(callback);
+        long offset = 15L;
+        simulateSeedingMsg(offset);
+        verify(callback).accept(offset);
+        verify(poller).close();
+    }
+
+    @Test
+    public void testSendingSeeds() {
+        seeder.seed(callback);
+        verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), 
pkgMsgCaptor.capture());
+        PackageMessage seedMsg = pkgMsgCaptor.getValue();
+        assertNotNull(seedMsg);
+        assertEquals(TEST, seedMsg.getReqType());
+    }
+
+    @After
+    public void after() {
+        seeder.close();
+    }
+
+    private void simulateSeedingMsg(long offset) {
+        PackageMessage msg = seeder.createTestMessage();
+        pkgHandlerCaptor.getValue().getHandler().handle(
+                new TestMessageInfo(PACKAGE_TOPIC, 0, offset, 
currentTimeMillis()),
+                msg);
+    }
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
index 4bc8761..c989b4a 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
@@ -54,9 +54,9 @@ public class LocalStoreTest {
         ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null);
         LocalStore offsetStore = new LocalStore(resolverFactory, "packages", 
"store3");
         offsetStore.store(resolver, "key1", "value1");
-        assertNull(offsetStore.load("key1", null));
+        assertNull(offsetStore.load("key1", String.class));
         resolver.commit();
-        assertEquals("value1", offsetStore.load("key1", null));
+        assertEquals("value1", offsetStore.load("key1", String.class));
     }
 
     @Test
@@ -72,8 +72,8 @@ public class LocalStoreTest {
         statusStore.store(resolver, map);
         resolver.commit();
 
-        assertEquals("value1", statusStore.load("key1", null));
-        assertEquals(false, statusStore.load("key2", null));
+        assertEquals("value1", statusStore.load("key1", String.class));
+        assertEquals(false, statusStore.load("key2", Boolean.class));
     }
 
     @Test
@@ -92,7 +92,7 @@ public class LocalStoreTest {
         statusStore.store(resolver, "key2", true);
         resolver.commit();
 
-        assertEquals("value1", statusStore.load("key1", null));
-        assertEquals(true, statusStore.load("key2", null));
+        assertEquals("value1", statusStore.load("key1", String.class));
+        assertEquals(true, statusStore.load("key2", Boolean.class));
     }
 }

Reply via email to