This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 7460d7a  SLING-10154 - Allow to seamlessly move clients to different 
set of to… (#66)
7460d7a is described below

commit 7460d7ad56e1d6067aaa4b2f8131d33b4f2b3743
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Mar 1 19:37:35 2021 +0100

    SLING-10154 - Allow to seamlessly move clients to different set of to… (#66)
    
    * SLING-10154 - Allow to seamlessly move clients to different set of topics
    
    * SLING-10154 - Make sure we start from latest if no offset is stored
    
    * SLING-10154 - Add logging in case we force imported status
---
 pom.xml                                            |  9 +-
 .../journal/bookkeeper/BookKeeper.java             |  3 +-
 .../journal/bookkeeper/BookKeeperConfig.java       | 13 ++-
 .../journal/bookkeeper/LocalStore.java             |  4 +-
 .../journal/bookkeeper/LocalStoreJMX.java          | 94 ++++++++++++++++++++
 .../impl/precondition/PackageStatusWatcher.java    | 25 +++++-
 .../impl/subscriber/DistributionSubscriber.java    | 13 ++-
 .../journal/shared/JMXRegistration.java            | 13 ++-
 .../journal/shared/LocalStoreJMXMBean.java         | 23 +++++
 .../journal/bookkeeper/BookKeeperTest.java         |  2 +-
 .../journal/bookkeeper/LocalStoreJMXTest.java      | 99 ++++++++++++++++++++++
 .../precondition/PackageStatusWatcherTest.java     | 15 ++--
 .../journal/impl/subscriber/EscapeTest.java        | 38 +++++++++
 .../journal/impl/subscriber/SubscriberTest.java    | 18 ++--
 14 files changed, 336 insertions(+), 33 deletions(-)

diff --git a/pom.xml b/pom.xml
index 873176e..7fe140a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.osgi</artifactId>
+            <version>2.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.commons.metrics</artifactId>
             <version>1.2.6</version>
         </dependency>
@@ -140,7 +145,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.2.0</version>
+            <version>0.2.1-SNAPSHOT</version>
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
@@ -202,7 +207,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.testing.sling-mock-oak</artifactId>
-            <version>2.0.2</version>
+            <version>2.1.2</version>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 3d87eb2..5120d08 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -75,7 +75,6 @@ import org.slf4j.MDC;
  * agent on the leader instance.
  */
 public class BookKeeper implements Closeable {
-    public static final String STORE_TYPE_PACKAGE = "packages";
     public static final String STORE_TYPE_STATUS = "statuses";
     public static final String KEY_OFFSET = "offset";
     public static final int COMMIT_AFTER_NUM_SKIPPED = 10;
@@ -120,7 +119,7 @@ public class BookKeeper implements Closeable {
         // of retry attempts is limited ; disabled otherwise
         this.errorQueueEnabled = (config.getMaxRetries() >= 0);
         this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS, 
config.getSubAgentName());
-        this.processedOffsets = new LocalStore(resolverFactory, 
STORE_TYPE_PACKAGE, config.getSubAgentName());
+        this.processedOffsets = new LocalStore(resolverFactory, 
config.getPackageNodeName(), config.getSubAgentName());
         log.info("Started bookkeeper {}.", config);
     }
     
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
index abaff72..8179e0e 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
@@ -24,17 +24,20 @@ public class BookKeeperConfig {
     private final boolean editable;
     private final int maxRetries;
     private final PackageHandling packageHandling;
+    private final String packageNodeName;
 
     public BookKeeperConfig(String subAgentName,
             String subSlingId,
             boolean editable, 
             int maxRetries,
-            PackageHandling packageHandling) {
+            PackageHandling packageHandling, 
+            String packageNodeName) {
                 this.subAgentName = subAgentName;
                 this.subSlingId = subSlingId;
                 this.editable = editable;
                 this.maxRetries = maxRetries;
                 this.packageHandling = packageHandling;
+                this.packageNodeName = packageNodeName;
     }
     
     public String getSubAgentName() {
@@ -57,9 +60,13 @@ public class BookKeeperConfig {
         return packageHandling;
     }
     
+    public String getPackageNodeName() {
+        return packageNodeName;
+    }
+    
     @Override
     public String toString() {
-        return String.format("subAgentName=%S, subSlingId=%s, editable=%s, 
maxRetries=%s, packageHandling=%s",
-                subAgentName, subSlingId, editable, maxRetries, 
packageHandling);
+        return String.format("subAgentName=%S, subSlingId=%s, editable=%s, 
maxRetries=%s, packageHandling=%s, packageNodeName=%s",
+                subAgentName, subSlingId, editable, maxRetries, 
packageHandling, packageNodeName);
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
index e6db4e2..c3c510a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
@@ -46,7 +46,7 @@ import static 
org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
 @ParametersAreNonnullByDefault
 public class LocalStore {
 
-    private static final String ROOT_PATH = 
"/var/sling/distribution/journal/stores";
+    static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(LocalStore.class);
 
@@ -115,7 +115,7 @@ public class LocalStore {
 
     @Nonnull
     private Resource getParent(ResourceResolver resolver) {
-        String msg = "Parent path " + rootPath + " must be created during 
provisioning.";
+        String msg = "Parent path " + rootPath + " should have been created on 
construction. Possibly the stores were reset";
         return requireNonNull(resolver.getResource(rootPath), msg);
     }
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
new file mode 100644
index 0000000..45761b4
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.bookkeeper;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.LocalStoreJMXMBean;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(
+    immediate = true
+)
+public class LocalStoreJMX implements LocalStoreJMXMBean {
+    
+    Logger log = LoggerFactory.getLogger(this.getClass());
+    
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+    
+    private JMXRegistration jmxRegistration;
+
+    @Activate
+    public void activate() throws NotCompliantMBeanException {
+        Objects.requireNonNull(resolverFactory, "resolverFactory must not be 
null");
+        StandardMBean mbean = new StandardMBean(this, 
LocalStoreJMXMBean.class);
+        jmxRegistration = new JMXRegistration(mbean, "offsetReset", "default");
+    }
+    
+    @Deactivate
+    public void close() throws IOException {
+        jmxRegistration.close();
+    }
+
+    @Override
+    public void resetStores() {
+        String path = LocalStore.ROOT_PATH;
+        try (ResourceResolver resolver = getBookKeeperServiceResolver()) {
+            Resource rootResource = 
Objects.requireNonNull(resolver.getResource(path), path + " not found");
+            // We must not delete the root Resource as it is provisioned by 
the feature. So we delete the children instead
+            Iterable<Resource> children = resolver.getChildren(rootResource);
+            children.forEach(res -> delete(resolver, res));
+            resolver.commit();
+        } catch (Exception e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+    
+    private void delete(ResourceResolver resolver, Resource resource) {
+        try {
+            log.info("Deleting store {}", resource.getPath());
+            resolver.delete(resource);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+    
+    private ResourceResolver getBookKeeperServiceResolver() throws 
LoginException {
+        return 
resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE, 
"bookkeeper"));
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0d7d25c..5d42ca3 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -32,14 +33,20 @@ import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import 
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.shared.Topics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PackageStatusWatcher implements Closeable {
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    
     private final Closeable poller;
+    private final AtomicLong lowestStatusOffset;
     
     // subAgentName -> pkgOffset -> Status
     private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new 
ConcurrentHashMap<>();
 
     public PackageStatusWatcher(MessagingProvider messagingProvider, Topics 
topics) {
+        this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE);
         String topicName = topics.getStatusTopic();
 
         poller = messagingProvider.createPoller(
@@ -56,7 +63,16 @@ public class PackageStatusWatcher implements Closeable {
      */
     public PackageStatusMessage.Status getStatus(String subAgentName, long 
pkgOffset) {
         Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
-        return statusPerAgent.get(pkgOffset);
+        Status status = statusPerAgent.get(pkgOffset);
+        if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
+            log.info("Considering offset {} as imported as status for this 
package can not arrive anymore.", pkgOffset);
+            return Status.IMPORTED;
+        }
+        return status;
+    }
+
+    private boolean statusCanNotArriveAnymore(long pkgOffset) {
+        return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset < 
lowestStatusOffset.get();
     }
 
     private Map<Long, Status> getAgentStatus(String subAgentName) {
@@ -73,8 +89,13 @@ public class PackageStatusWatcher implements Closeable {
     }
 
     private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
+        long statusOffset = pkgStatusMsg.getOffset();
+        long lowest = lowestStatusOffset.get();
+        if (statusOffset < lowest) {
+            lowestStatusOffset.set(statusOffset);
+        }
         // TODO: check revision
         Map<Long, Status> agentStatus = 
getAgentStatus(pkgStatusMsg.getSubAgentName());
-        agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
+        agentStatus.put(statusOffset, pkgStatusMsg.getStatus());
     }
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4243466..b306cf9 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -25,6 +25,7 @@ import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackground
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -39,6 +40,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.commons.metrics.Timer;
@@ -174,13 +176,14 @@ public class DistributionSubscriber {
         Consumer<PackageStatusMessage> statusSender = 
messagingProvider.createSender(topics.getStatusTopic());
         Consumer<LogMessage> logSender = 
messagingProvider.createSender(topics.getDiscoveryTopic());
 
-        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, 
subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
+        String packageNodeName = 
escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+        BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName, 
subSlingId, config.editable(), config.maxRetries(), config.packageHandling(), 
packageNodeName);
         bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender);
         
         long startOffset = bookKeeper.loadOffset() + 1;
-        String assign = messagingProvider.assignTo(startOffset);
+        String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
 
-        packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
+        packagePoller = 
messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
                 HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage));
 
 
@@ -193,6 +196,10 @@ public class DistributionSubscriber {
 
         LOG.info("Started Subscriber agent {} at offset {}, subscribed to 
agent names {}", subAgentName, startOffset, queueNames);
     }
+    
+    public static String escapeTopicName(URI messagingUri, String topicName) {
+        return messagingUri.getHost() + "_" + 
Text.escapeIllegalJcrChars(topicName);
+    }
 
     private Set<String> getNotEmpty(String[] agentNames) {
         return 
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e62c367..9a98822 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Hashtable;
 
 import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 public class JMXRegistration implements Closeable {
@@ -33,11 +34,8 @@ public class JMXRegistration implements Closeable {
     private ObjectName name;
     
     public JMXRegistration(Object bean, String type, String id) {
-        Hashtable<String, String> props = new Hashtable<>();
-        props.put("type", type);
-        props.put("id", id);
         try {
-            this.name = ObjectName.getInstance(DOMAIN, props);
+            this.name = nameOf(type, id);
             MBeanServer server = getPlatformMBeanServer();
             if (!server.isRegistered(name)) {
                 server.registerMBean(bean, name);
@@ -46,6 +44,13 @@ public class JMXRegistration implements Closeable {
             throw new RuntimeException(e.getMessage(), e);
         }
     }
+    
+    public static ObjectName nameOf(String type, String id) throws 
MalformedObjectNameException {
+        Hashtable<String, String> props = new Hashtable<>();
+        props.put("type", type);
+        props.put("id", id);
+        return ObjectName.getInstance(DOMAIN, props);
+    }
 
     public void close() throws IOException {
         try {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
 
b/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
new file mode 100644
index 0000000..8fda5d4
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+public interface LocalStoreJMXMBean {
+    void resetStores();
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index bfc41d7..a3189c1 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -67,7 +67,7 @@ public class BookKeeperTest {
 
     @Before
     public void before() {
-        BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract);
+        BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", 
"subSlingId", true, 10, PackageHandling.Extract, "package");
         bookKeeper = new BookKeeper(resolverFactory, 
distributionMetricsService, packageHandler, eventAdmin, sender, logSender, 
bkConfig);
     }
 
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
new file mode 100644
index 0000000..63c19c6
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.bookkeeper;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalStoreJMXTest {
+    
+    private LocalStoreJMX resetOffsets;
+    
+    private ResourceResolverFactory resolverFactory;
+
+    private BundleContext context;
+    
+    @Before
+    public void before() {
+        context = MockOsgi.newBundleContext();
+        resolverFactory = 
MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, context);
+        resetOffsets = new LocalStoreJMX();
+        MockOsgi.injectServices(resetOffsets, context);
+        MockOsgi.activate(resetOffsets, context);
+    }
+
+    @Test
+    public void testResetViaService() throws LoginException, 
PersistenceException {
+        try ( ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            ResourceUtil.getOrCreateResource(resolver, LocalStore.ROOT_PATH, 
"sling:Folder", "sling:Folder", true);
+        }
+        LocalStore store = new LocalStore(resolverFactory, "package", 
"publish");
+        store.store("offset", 10l);
+        assertThat(store.load("offset", Long.class), equalTo(10l));
+        resetOffsets.resetStores();
+        try {
+            store.load("offset", Long.class);
+            fail("NPE expected");
+        } catch (NullPointerException e) {
+            assertThat(e.getMessage(), containsString("Possibly the stores 
were reset"));
+        }
+    }
+    
+    @Test
+    public void testResetViaJMX() throws Exception {
+        try ( ResourceResolver resolver = 
resolverFactory.getServiceResourceResolver(null)) {
+            ResourceUtil.getOrCreateResource(resolver, LocalStore.ROOT_PATH, 
"sling:Folder", "sling:Folder", true);
+        }
+        LocalStore store = new LocalStore(resolverFactory, "package", 
"publish");
+        store.store("offset", 10l);
+        assertThat(store.load("offset", Long.class), equalTo(10l));
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        ObjectName name = JMXRegistration.nameOf("offsetReset", "default");
+        mbeanServer.invoke(name, "resetStores", null, null);
+        try {
+            store.load("offset", Long.class);
+            fail("NPE expected");
+        } catch (NullPointerException e) {
+            assertThat(e.getMessage(), containsString("Possibly the stores 
were reset"));
+        }
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 2edc02f..cf8473e 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -81,8 +81,11 @@ public class PackageStatusWatcherTest {
 
         generateMessages(10, 50);
 
-        assertPackageStatus(1000, null);
+        // If offset is lower than lowest offset we received we assume it to 
be imported
+        assertPackageStatus(1000, Status.IMPORTED);
+
         assertPackageStatus(1010, Status.REMOVED_FAILED);
+        assertPackageStatus(1051, null);
     }
 
 
@@ -105,13 +108,9 @@ public class PackageStatusWatcherTest {
 
     }
 
-    void assertPackageStatus(long pkgOffset, Status status) {
-        if (status == null) {
-            assertEquals(null, statusWatcher.getStatus(SUB1_AGENT_NAME, 
pkgOffset));
-        } else {
-            assertEquals(status, statusWatcher.getStatus(SUB1_AGENT_NAME, 
pkgOffset));
-        }
+    void assertPackageStatus(long pkgOffset, Status expectedStatus) {
+        Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset);
+        assertEquals(expectedStatus, status);
     }
 
-
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
new file mode 100644
index 0000000..8731036
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Test;
+
+public class EscapeTest {
+
+    @Test
+    public void test() throws URISyntaxException {
+        URI uri = new URI("http://myserver.apache.org:1234/somepath";);
+        String topicName = "some_topic_name1";
+        String escaped = DistributionSubscriber.escapeTopicName(uri, 
topicName);
+        assertThat(escaped, equalTo("myserver.apache.org_some_topic_name1"));
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 902aad7..6cbf062 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -38,6 +38,8 @@ import static org.mockito.Mockito.when;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Dictionary;
@@ -114,6 +116,8 @@ public class SubscriberTest {
     
     private static final String PUB1_SLING_ID = "pub1sling";
     private static final String PUB1_AGENT_NAME = "pub1agent";
+    
+    private static final String STORE_PACKAGE_NODE_NAME = 
"myserver.apache.org_aemdistribution_package";
 
     private static final PackageMessage BASIC_ADD_PACKAGE = 
PackageMessage.builder()
             .pkgId("myid")
@@ -134,6 +138,7 @@ public class SubscriberTest {
             .paths(Arrays.asList("/test"))
             .build();
 
+
     
     @Mock
     private BundleContext context;
@@ -203,7 +208,7 @@ public class SubscriberTest {
     private MessageHandler<ClearCommand> commandHandler;
 
     @Before
-    public void before() {
+    public void before() throws URISyntaxException {
         DistributionSubscriber.QUEUE_FETCH_DELAY = 100;
         DistributionSubscriber.RETRY_DELAY = 100;
         
@@ -214,13 +219,14 @@ public class SubscriberTest {
         when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
 
         mockMetrics();
-
+        URI serverURI = new URI("http://myserver.apache.org:1234/somepath";);
+        when(clientProvider.getServerUri()).thenReturn(serverURI);
         
when(clientProvider.<PackageStatusMessage>createSender(Mockito.eq(topics.getStatusTopic()))).thenReturn(statusSender);
         
when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender);
 
         when(clientProvider.createPoller(
                 Mockito.eq(topics.getPackageTopic()),
-                Mockito.eq(Reset.earliest), 
+                Mockito.eq(Reset.latest), 
                 Mockito.anyString(),
                 packageCaptor.capture()))
             .thenReturn(poller);
@@ -359,7 +365,7 @@ public class SubscriberTest {
         PackageMessage message = BASIC_ADD_PACKAGE;
         packageHandler.handle(info, message);
         
-        await().until(this::getStatus, 
equalTo(PackageStatusMessage.Status.REMOVED));
+        await().until(this::getStoredStatus, 
equalTo(PackageStatusMessage.Status.REMOVED));
         verifyStatusMessageSentWithStatus(Status.REMOVED);
     }
     
@@ -412,11 +418,11 @@ public class SubscriberTest {
     }
 
     private Long getStoredOffset() {
-        LocalStore store = new LocalStore(resolverFactory, 
BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
+        LocalStore store = new LocalStore(resolverFactory, 
STORE_PACKAGE_NODE_NAME, SUB1_AGENT_NAME);
         return store.load(BookKeeper.KEY_OFFSET, Long.class);
     }
 
-    private Status getStatus() {
+    private Status getStoredStatus() {
         LocalStore statusStore = new LocalStore(resolverFactory, 
BookKeeper.STORE_TYPE_STATUS, SUB1_AGENT_NAME);
         return new BookKeeper.PackageStatus(statusStore.load()).status;
     }

Reply via email to