This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 7460d7a SLING-10154 - Allow to seamlessly move clients to different
set of to… (#66)
7460d7a is described below
commit 7460d7ad56e1d6067aaa4b2f8131d33b4f2b3743
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Mar 1 19:37:35 2021 +0100
SLING-10154 - Allow to seamlessly move clients to different set of to… (#66)
* SLING-10154 - Allow to seamlessly move clients to different set of topics
* SLING-10154 - Make sure we start from latest if no offset is stored
* SLING-10154 - Add logging in case we force imported status
---
pom.xml | 9 +-
.../journal/bookkeeper/BookKeeper.java | 3 +-
.../journal/bookkeeper/BookKeeperConfig.java | 13 ++-
.../journal/bookkeeper/LocalStore.java | 4 +-
.../journal/bookkeeper/LocalStoreJMX.java | 94 ++++++++++++++++++++
.../impl/precondition/PackageStatusWatcher.java | 25 +++++-
.../impl/subscriber/DistributionSubscriber.java | 13 ++-
.../journal/shared/JMXRegistration.java | 13 ++-
.../journal/shared/LocalStoreJMXMBean.java | 23 +++++
.../journal/bookkeeper/BookKeeperTest.java | 2 +-
.../journal/bookkeeper/LocalStoreJMXTest.java | 99 ++++++++++++++++++++++
.../precondition/PackageStatusWatcherTest.java | 15 ++--
.../journal/impl/subscriber/EscapeTest.java | 38 +++++++++
.../journal/impl/subscriber/SubscriberTest.java | 18 ++--
14 files changed, 336 insertions(+), 33 deletions(-)
diff --git a/pom.xml b/pom.xml
index 873176e..7fe140a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,11 @@
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.osgi</artifactId>
+ <version>2.4.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.commons.metrics</artifactId>
<version>1.2.6</version>
</dependency>
@@ -140,7 +145,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.2.0</version>
+ <version>0.2.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
@@ -202,7 +207,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.testing.sling-mock-oak</artifactId>
- <version>2.0.2</version>
+ <version>2.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 3d87eb2..5120d08 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -75,7 +75,6 @@ import org.slf4j.MDC;
* agent on the leader instance.
*/
public class BookKeeper implements Closeable {
- public static final String STORE_TYPE_PACKAGE = "packages";
public static final String STORE_TYPE_STATUS = "statuses";
public static final String KEY_OFFSET = "offset";
public static final int COMMIT_AFTER_NUM_SKIPPED = 10;
@@ -120,7 +119,7 @@ public class BookKeeper implements Closeable {
// of retry attempts is limited ; disabled otherwise
this.errorQueueEnabled = (config.getMaxRetries() >= 0);
this.statusStore = new LocalStore(resolverFactory, STORE_TYPE_STATUS,
config.getSubAgentName());
- this.processedOffsets = new LocalStore(resolverFactory,
STORE_TYPE_PACKAGE, config.getSubAgentName());
+ this.processedOffsets = new LocalStore(resolverFactory,
config.getPackageNodeName(), config.getSubAgentName());
log.info("Started bookkeeper {}.", config);
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
index abaff72..8179e0e 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperConfig.java
@@ -24,17 +24,20 @@ public class BookKeeperConfig {
private final boolean editable;
private final int maxRetries;
private final PackageHandling packageHandling;
+ private final String packageNodeName;
public BookKeeperConfig(String subAgentName,
String subSlingId,
boolean editable,
int maxRetries,
- PackageHandling packageHandling) {
+ PackageHandling packageHandling,
+ String packageNodeName) {
this.subAgentName = subAgentName;
this.subSlingId = subSlingId;
this.editable = editable;
this.maxRetries = maxRetries;
this.packageHandling = packageHandling;
+ this.packageNodeName = packageNodeName;
}
public String getSubAgentName() {
@@ -57,9 +60,13 @@ public class BookKeeperConfig {
return packageHandling;
}
+ public String getPackageNodeName() {
+ return packageNodeName;
+ }
+
@Override
public String toString() {
- return String.format("subAgentName=%S, subSlingId=%s, editable=%s,
maxRetries=%s, packageHandling=%s",
- subAgentName, subSlingId, editable, maxRetries,
packageHandling);
+ return String.format("subAgentName=%S, subSlingId=%s, editable=%s,
maxRetries=%s, packageHandling=%s, packageNodeName=%s",
+ subAgentName, subSlingId, editable, maxRetries,
packageHandling, packageNodeName);
}
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
index e6db4e2..c3c510a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStore.java
@@ -46,7 +46,7 @@ import static
org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
@ParametersAreNonnullByDefault
public class LocalStore {
- private static final String ROOT_PATH =
"/var/sling/distribution/journal/stores";
+ static final String ROOT_PATH = "/var/sling/distribution/journal/stores";
private static final Logger LOG =
LoggerFactory.getLogger(LocalStore.class);
@@ -115,7 +115,7 @@ public class LocalStore {
@Nonnull
private Resource getParent(ResourceResolver resolver) {
- String msg = "Parent path " + rootPath + " must be created during
provisioning.";
+ String msg = "Parent path " + rootPath + " should have been created on
construction. Possibly the stores were reset";
return requireNonNull(resolver.getResource(rootPath), msg);
}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
new file mode 100644
index 0000000..45761b4
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMX.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.bookkeeper;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.sling.api.resource.ResourceResolverFactory.SUBSERVICE;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.distribution.journal.shared.LocalStoreJMXMBean;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(
+ immediate = true
+)
+public class LocalStoreJMX implements LocalStoreJMXMBean {
+
+ Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
+ private JMXRegistration jmxRegistration;
+
+ @Activate
+ public void activate() throws NotCompliantMBeanException {
+ Objects.requireNonNull(resolverFactory, "resolverFactory must not be
null");
+ StandardMBean mbean = new StandardMBean(this,
LocalStoreJMXMBean.class);
+ jmxRegistration = new JMXRegistration(mbean, "offsetReset", "default");
+ }
+
+ @Deactivate
+ public void close() throws IOException {
+ jmxRegistration.close();
+ }
+
+ @Override
+ public void resetStores() {
+ String path = LocalStore.ROOT_PATH;
+ try (ResourceResolver resolver = getBookKeeperServiceResolver()) {
+ Resource rootResource =
Objects.requireNonNull(resolver.getResource(path), path + " not found");
+ // We must not delete the root Resource as it is provisioned by
the feature. So we delete the children instead
+ Iterable<Resource> children = resolver.getChildren(rootResource);
+ children.forEach(res -> delete(resolver, res));
+ resolver.commit();
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private void delete(ResourceResolver resolver, Resource resource) {
+ try {
+ log.info("Deleting store {}", resource.getPath());
+ resolver.delete(resource);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private ResourceResolver getBookKeeperServiceResolver() throws
LoginException {
+ return
resolverFactory.getServiceResourceResolver(singletonMap(SUBSERVICE,
"bookkeeper"));
+ }
+}
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
index 0d7d25c..5d42ca3 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcher.java
@@ -25,6 +25,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
@@ -32,14 +33,20 @@ import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import
org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.shared.Topics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PackageStatusWatcher implements Closeable {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
private final Closeable poller;
+ private final AtomicLong lowestStatusOffset;
// subAgentName -> pkgOffset -> Status
private final Map<String, Map<Long, Status>> pkgStatusPerSubAgent = new
ConcurrentHashMap<>();
public PackageStatusWatcher(MessagingProvider messagingProvider, Topics
topics) {
+ this.lowestStatusOffset = new AtomicLong(Long.MAX_VALUE);
String topicName = topics.getStatusTopic();
poller = messagingProvider.createPoller(
@@ -56,7 +63,16 @@ public class PackageStatusWatcher implements Closeable {
*/
public PackageStatusMessage.Status getStatus(String subAgentName, long
pkgOffset) {
Map<Long, Status> statusPerAgent = getAgentStatus(subAgentName);
- return statusPerAgent.get(pkgOffset);
+ Status status = statusPerAgent.get(pkgOffset);
+ if (status == null && statusCanNotArriveAnymore(pkgOffset)) {
+ log.info("Considering offset {} as imported as status for this
package can not arrive anymore.", pkgOffset);
+ return Status.IMPORTED;
+ }
+ return status;
+ }
+
+ private boolean statusCanNotArriveAnymore(long pkgOffset) {
+ return lowestStatusOffset.get()!=Long.MAX_VALUE && pkgOffset <
lowestStatusOffset.get();
}
private Map<Long, Status> getAgentStatus(String subAgentName) {
@@ -73,8 +89,13 @@ public class PackageStatusWatcher implements Closeable {
}
private void handle(MessageInfo info, PackageStatusMessage pkgStatusMsg) {
+ long statusOffset = pkgStatusMsg.getOffset();
+ long lowest = lowestStatusOffset.get();
+ if (statusOffset < lowest) {
+ lowestStatusOffset.set(statusOffset);
+ }
// TODO: check revision
Map<Long, Status> agentStatus =
getAgentStatus(pkgStatusMsg.getSubAgentName());
- agentStatus.put(pkgStatusMsg.getOffset(), pkgStatusMsg.getStatus());
+ agentStatus.put(statusOffset, pkgStatusMsg.getStatus());
}
}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 4243466..b306cf9 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -25,6 +25,7 @@ import static
org.apache.sling.distribution.journal.RunnableUtil.startBackground
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -39,6 +40,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.jackrabbit.util.Text;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.commons.metrics.Timer;
@@ -174,13 +176,14 @@ public class DistributionSubscriber {
Consumer<PackageStatusMessage> statusSender =
messagingProvider.createSender(topics.getStatusTopic());
Consumer<LogMessage> logSender =
messagingProvider.createSender(topics.getDiscoveryTopic());
- BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName,
subSlingId, config.editable(), config.maxRetries(), config.packageHandling());
+ String packageNodeName =
escapeTopicName(messagingProvider.getServerUri(), topics.getPackageTopic());
+ BookKeeperConfig bkConfig = new BookKeeperConfig(subAgentName,
subSlingId, config.editable(), config.maxRetries(), config.packageHandling(),
packageNodeName);
bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig,
statusSender, logSender);
long startOffset = bookKeeper.loadOffset() + 1;
- String assign = messagingProvider.assignTo(startOffset);
+ String assign = startOffset > 0 ?
messagingProvider.assignTo(startOffset) : null;
- packagePoller =
messagingProvider.createPoller(topics.getPackageTopic(), Reset.earliest, assign,
+ packagePoller =
messagingProvider.createPoller(topics.getPackageTopic(), Reset.latest, assign,
HandlerAdapter.create(PackageMessage.class,
this::handlePackageMessage));
@@ -193,6 +196,10 @@ public class DistributionSubscriber {
LOG.info("Started Subscriber agent {} at offset {}, subscribed to
agent names {}", subAgentName, startOffset, queueNames);
}
+
+ public static String escapeTopicName(URI messagingUri, String topicName) {
+ return messagingUri.getHost() + "_" +
Text.escapeIllegalJcrChars(topicName);
+ }
private Set<String> getNotEmpty(String[] agentNames) {
return
Arrays.stream(agentNames).filter(StringUtils::isNotBlank).collect(toSet());
diff --git
a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
index e62c367..9a98822 100644
---
a/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
+++
b/src/main/java/org/apache/sling/distribution/journal/shared/JMXRegistration.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.Hashtable;
import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
public class JMXRegistration implements Closeable {
@@ -33,11 +34,8 @@ public class JMXRegistration implements Closeable {
private ObjectName name;
public JMXRegistration(Object bean, String type, String id) {
- Hashtable<String, String> props = new Hashtable<>();
- props.put("type", type);
- props.put("id", id);
try {
- this.name = ObjectName.getInstance(DOMAIN, props);
+ this.name = nameOf(type, id);
MBeanServer server = getPlatformMBeanServer();
if (!server.isRegistered(name)) {
server.registerMBean(bean, name);
@@ -46,6 +44,13 @@ public class JMXRegistration implements Closeable {
throw new RuntimeException(e.getMessage(), e);
}
}
+
+ public static ObjectName nameOf(String type, String id) throws
MalformedObjectNameException {
+ Hashtable<String, String> props = new Hashtable<>();
+ props.put("type", type);
+ props.put("id", id);
+ return ObjectName.getInstance(DOMAIN, props);
+ }
public void close() throws IOException {
try {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
b/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
new file mode 100644
index 0000000..8fda5d4
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/shared/LocalStoreJMXMBean.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.shared;
+
+public interface LocalStoreJMXMBean {
+ void resetStores();
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index bfc41d7..a3189c1 100644
---
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -67,7 +67,7 @@ public class BookKeeperTest {
@Before
public void before() {
- BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName",
"subSlingId", true, 10, PackageHandling.Extract);
+ BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName",
"subSlingId", true, 10, PackageHandling.Extract, "package");
bookKeeper = new BookKeeper(resolverFactory,
distributionMetricsService, packageHandler, eventAdmin, sender, logSender,
bkConfig);
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
new file mode 100644
index 0000000..63c19c6
--- /dev/null
+++
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/LocalStoreJMXTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.bookkeeper;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.distribution.journal.shared.JMXRegistration;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+
+@RunWith(MockitoJUnitRunner.class)
+public class LocalStoreJMXTest {
+
+ private LocalStoreJMX resetOffsets;
+
+ private ResourceResolverFactory resolverFactory;
+
+ private BundleContext context;
+
+ @Before
+ public void before() {
+ context = MockOsgi.newBundleContext();
+ resolverFactory =
MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, context);
+ resetOffsets = new LocalStoreJMX();
+ MockOsgi.injectServices(resetOffsets, context);
+ MockOsgi.activate(resetOffsets, context);
+ }
+
+ @Test
+ public void testResetViaService() throws LoginException,
PersistenceException {
+ try ( ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
+ ResourceUtil.getOrCreateResource(resolver, LocalStore.ROOT_PATH,
"sling:Folder", "sling:Folder", true);
+ }
+ LocalStore store = new LocalStore(resolverFactory, "package",
"publish");
+ store.store("offset", 10l);
+ assertThat(store.load("offset", Long.class), equalTo(10l));
+ resetOffsets.resetStores();
+ try {
+ store.load("offset", Long.class);
+ fail("NPE expected");
+ } catch (NullPointerException e) {
+ assertThat(e.getMessage(), containsString("Possibly the stores
were reset"));
+ }
+ }
+
+ @Test
+ public void testResetViaJMX() throws Exception {
+ try ( ResourceResolver resolver =
resolverFactory.getServiceResourceResolver(null)) {
+ ResourceUtil.getOrCreateResource(resolver, LocalStore.ROOT_PATH,
"sling:Folder", "sling:Folder", true);
+ }
+ LocalStore store = new LocalStore(resolverFactory, "package",
"publish");
+ store.store("offset", 10l);
+ assertThat(store.load("offset", Long.class), equalTo(10l));
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ ObjectName name = JMXRegistration.nameOf("offsetReset", "default");
+ mbeanServer.invoke(name, "resetStores", null, null);
+ try {
+ store.load("offset", Long.class);
+ fail("NPE expected");
+ } catch (NullPointerException e) {
+ assertThat(e.getMessage(), containsString("Possibly the stores
were reset"));
+ }
+ }
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
index 2edc02f..cf8473e 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/precondition/PackageStatusWatcherTest.java
@@ -81,8 +81,11 @@ public class PackageStatusWatcherTest {
generateMessages(10, 50);
- assertPackageStatus(1000, null);
+ // If offset is lower than lowest offset we received we assume it to
be imported
+ assertPackageStatus(1000, Status.IMPORTED);
+
assertPackageStatus(1010, Status.REMOVED_FAILED);
+ assertPackageStatus(1051, null);
}
@@ -105,13 +108,9 @@ public class PackageStatusWatcherTest {
}
- void assertPackageStatus(long pkgOffset, Status status) {
- if (status == null) {
- assertEquals(null, statusWatcher.getStatus(SUB1_AGENT_NAME,
pkgOffset));
- } else {
- assertEquals(status, statusWatcher.getStatus(SUB1_AGENT_NAME,
pkgOffset));
- }
+ void assertPackageStatus(long pkgOffset, Status expectedStatus) {
+ Status status = statusWatcher.getStatus(SUB1_AGENT_NAME, pkgOffset);
+ assertEquals(expectedStatus, status);
}
-
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
new file mode 100644
index 0000000..8731036
--- /dev/null
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/EscapeTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.junit.Test;
+
+public class EscapeTest {
+
+ @Test
+ public void test() throws URISyntaxException {
+ URI uri = new URI("http://myserver.apache.org:1234/somepath");
+ String topicName = "some_topic_name1";
+ String escaped = DistributionSubscriber.escapeTopicName(uri,
topicName);
+ assertThat(escaped, equalTo("myserver.apache.org_some_topic_name1"));
+ }
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 902aad7..6cbf062 100644
---
a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -38,6 +38,8 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
@@ -114,6 +116,8 @@ public class SubscriberTest {
private static final String PUB1_SLING_ID = "pub1sling";
private static final String PUB1_AGENT_NAME = "pub1agent";
+
+ private static final String STORE_PACKAGE_NODE_NAME =
"myserver.apache.org_aemdistribution_package";
private static final PackageMessage BASIC_ADD_PACKAGE =
PackageMessage.builder()
.pkgId("myid")
@@ -134,6 +138,7 @@ public class SubscriberTest {
.paths(Arrays.asList("/test"))
.build();
+
@Mock
private BundleContext context;
@@ -203,7 +208,7 @@ public class SubscriberTest {
private MessageHandler<ClearCommand> commandHandler;
@Before
- public void before() {
+ public void before() throws URISyntaxException {
DistributionSubscriber.QUEUE_FETCH_DELAY = 100;
DistributionSubscriber.RETRY_DELAY = 100;
@@ -214,13 +219,14 @@ public class SubscriberTest {
when(slingSettings.getSlingId()).thenReturn(SUB1_SLING_ID);
mockMetrics();
-
+ URI serverURI = new URI("http://myserver.apache.org:1234/somepath");
+ when(clientProvider.getServerUri()).thenReturn(serverURI);
when(clientProvider.<PackageStatusMessage>createSender(Mockito.eq(topics.getStatusTopic()))).thenReturn(statusSender);
when(clientProvider.<DiscoveryMessage>createSender(Mockito.eq(topics.getDiscoveryTopic()))).thenReturn(discoverySender);
when(clientProvider.createPoller(
Mockito.eq(topics.getPackageTopic()),
- Mockito.eq(Reset.earliest),
+ Mockito.eq(Reset.latest),
Mockito.anyString(),
packageCaptor.capture()))
.thenReturn(poller);
@@ -359,7 +365,7 @@ public class SubscriberTest {
PackageMessage message = BASIC_ADD_PACKAGE;
packageHandler.handle(info, message);
- await().until(this::getStatus,
equalTo(PackageStatusMessage.Status.REMOVED));
+ await().until(this::getStoredStatus,
equalTo(PackageStatusMessage.Status.REMOVED));
verifyStatusMessageSentWithStatus(Status.REMOVED);
}
@@ -412,11 +418,11 @@ public class SubscriberTest {
}
private Long getStoredOffset() {
- LocalStore store = new LocalStore(resolverFactory,
BookKeeper.STORE_TYPE_PACKAGE, SUB1_AGENT_NAME);
+ LocalStore store = new LocalStore(resolverFactory,
STORE_PACKAGE_NODE_NAME, SUB1_AGENT_NAME);
return store.load(BookKeeper.KEY_OFFSET, Long.class);
}
- private Status getStatus() {
+ private Status getStoredStatus() {
LocalStore statusStore = new LocalStore(resolverFactory,
BookKeeper.STORE_TYPE_STATUS, SUB1_AGENT_NAME);
return new BookKeeper.PackageStatus(statusStore.load()).status;
}