This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new b76750c Revert "Revert "SLING-9465 - DiscoveryService is activated on
all services (#32)""
b76750c is described below
commit b76750c73c3811044156daba97c8819c681109df
Author: tmaret <[email protected]>
AuthorDate: Mon May 25 13:58:11 2020 +0200
Revert "Revert "SLING-9465 - DiscoveryService is activated on all services
(#32)""
This reverts commit f06a4140d5ad35028c00be4000c72c213c6ac202.
---
.../journal/impl/publisher/DiscoveryService.java | 9 +-
.../impl/publisher/DistributionPublisher.java | 4 +-
.../shared/PublisherConfigurationAvailable.java | 108 +++++++++++++++++++
.../PublisherConfigurationAvailableTest.java | 116 +++++++++++++++++++++
4 files changed, 233 insertions(+), 4 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 2e0b665..2cce5bb 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -30,6 +30,7 @@ import java.util.Hashtable;
import javax.annotation.ParametersAreNonnullByDefault;
import
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import
org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
import
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
@@ -55,8 +56,7 @@ import org.slf4j.LoggerFactory;
* Listens for discovery messages and tracks presence of Subscribers as well as
* the last processed offset of each Subscriber
*
- * This component uses lazy starting so it is only started when there is at
least one Agent
- * that requires it.
+ * This component is only activated when there is at least one
DistributionSubscriber agent configured.
*
* This component is meant to be shared by Publisher agents.
*/
@@ -70,7 +70,10 @@ public class DiscoveryService implements Runnable {
@Reference
private JournalAvailable journalAvailable;
-
+
+ @Reference
+ private PublisherConfigurationAvailable publisherConfigurationAvailable;
+
@Reference
private MessagingProvider messagingProvider;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..d366ab3 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -86,12 +86,14 @@ import
org.apache.sling.distribution.journal.JournalAvailable;
@Component(
service = {},
immediate = true,
- configurationPid =
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"
+ configurationPid = DistributionPublisher.FACTORY_PID
)
@Designate(ocd = PublisherConfiguration.class, factory = true)
@ParametersAreNonnullByDefault
public class DistributionPublisher implements DistributionAgent {
+ public static final String FACTORY_PID =
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
+
private final Map<DistributionRequestType, Consumer<PackageMessage>>
REQ_TYPES = new HashMap<>();
private final DefaultDistributionLog log;
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
new file mode 100644
index 0000000..c0a6db4
--- /dev/null
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+
+import
org.apache.sling.distribution.journal.impl.publisher.DistributionPublisher;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
+import static
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.osgi.service.cm.ConfigurationAdmin.SERVICE_FACTORYPID;
+
+/**
+ * This task periodically checks for DistributionPublisher agent
+ * configuration availability and registers the marker service
+ * {@link PublisherConfigurationAvailable} when such configuration
+ * could be found. To avoid costly reactivation cycles, the marker
+ * service remains registered until this task is deactivated.
+ * This task is meant to be executed on every instance, even in a cluster.
+ */
+@Component(
+ property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=false",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute
+ }
+)
+public class PublisherConfigurationAvailable implements Runnable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PublisherConfigurationAvailable.class);
+
+ private volatile ServiceRegistration<PublisherConfigurationAvailable> reg;
//NOSONAR
+
+ private volatile BundleContext context; //NOSONAR
+
+ private final Object lock = new Object();
+
+ @Reference
+ private ConfigurationAdmin configAdmin;
+
+ @Activate
+ public void activate(BundleContext context) {
+ this.context = context;
+ }
+
+ @Deactivate
+ public void deactivate() {
+ synchronized (lock) {
+ if (reg != null) {
+ reg.unregister();
+ LOG.info("Unregistered marker service");
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ if (reg == null && hasPublisherConfigurations()) {
+ reg =
context.registerService(PublisherConfigurationAvailable.class, this, null);
+ LOG.info("Registered marker service");
+ }
+ }
+ }
+
+ protected boolean isAvailable() {
+ return reg != null;
+ }
+
+ private boolean hasPublisherConfigurations() {
+ String filter = "(" + SERVICE_FACTORYPID + "=" +
DistributionPublisher.FACTORY_PID + ")";
+ try {
+ Configuration[] configs = configAdmin.listConfigurations(filter);
+ return configs != null && configs.length > 0;
+ } catch (IOException | InvalidSyntaxException e) {
+ LOG.warn("Failed to search for DistributionPublisher agent
configurations", e);
+ }
+ return false;
+ }
+}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
new file mode 100644
index 0000000..9b968e9
--- /dev/null
+++
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PublisherConfigurationAvailableTest {
+
+ @InjectMocks
+ private PublisherConfigurationAvailable pca;
+
+ @Mock
+ private BundleContext context;
+
+ @Mock
+ private ConfigurationAdmin configAdmin;
+
+ @Before
+ public void before() throws Exception {
+ ServiceRegistration<PublisherConfigurationAvailable> serviceReg =
mock(ServiceRegistration.class);
+
when(context.registerService(eq(PublisherConfigurationAvailable.class),
any(PublisherConfigurationAvailable.class), any()))
+ .thenReturn(serviceReg);
+ when(configAdmin.listConfigurations(anyString()))
+ .thenReturn(null);
+ pca.activate(context);
+ }
+
+ @After
+ public void after() {
+ pca.deactivate();
+ }
+
+ @Test
+ public void testNoConfiguration() {
+ assertFalse(pca.isAvailable());
+ pca.run();
+ assertFalse(pca.isAvailable());
+ }
+
+ @Test
+ public void testWithZeroConfiguration() throws Exception {
+ assertFalse(pca.isAvailable());
+ addConfigurations(0);
+ pca.run();
+ assertFalse(pca.isAvailable());
+ }
+
+ @Test
+ public void testWithOneConfiguration() throws Exception {
+ assertFalse(pca.isAvailable());
+ addConfigurations(1);
+ pca.run();
+ assertTrue(pca.isAvailable());
+ }
+
+ @Test
+ public void testWithManyConfigurations() throws Exception {
+ assertFalse(pca.isAvailable());
+ addConfigurations(10);
+ pca.run();
+ assertTrue(pca.isAvailable());
+ }
+
+ @Test
+ public void testRemainAvailable() throws Exception {
+ addConfigurations(1);
+ pca.run();
+ assertTrue(pca.isAvailable());
+ removeAllConfigurations();
+ pca.run();
+ assertTrue(pca.isAvailable());
+ }
+
+ private void removeAllConfigurations() throws Exception {
+ when(configAdmin.listConfigurations(anyString()))
+ .thenReturn(null);
+ }
+
+ private void addConfigurations(int nbConfigurations) throws Exception {
+ when(configAdmin.listConfigurations(anyString()))
+ .thenReturn(new Configuration[nbConfigurations]);
+ }
+
+}
\ No newline at end of file