This is an automated email from the ASF dual-hosted git repository. tmaret pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new b76750c Revert "Revert "SLING-9465 - DiscoveryService is activated on all services (#32)"" b76750c is described below commit b76750c73c3811044156daba97c8819c681109df Author: tmaret <tma...@adobe.com> AuthorDate: Mon May 25 13:58:11 2020 +0200 Revert "Revert "SLING-9465 - DiscoveryService is activated on all services (#32)"" This reverts commit f06a4140d5ad35028c00be4000c72c213c6ac202. --- .../journal/impl/publisher/DiscoveryService.java | 9 +- .../impl/publisher/DistributionPublisher.java | 4 +- .../shared/PublisherConfigurationAvailable.java | 108 +++++++++++++++++++ .../PublisherConfigurationAvailableTest.java | 116 +++++++++++++++++++++ 4 files changed, 233 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java index 2e0b665..2cce5bb 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java @@ -30,6 +30,7 @@ import java.util.Hashtable; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService; +import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable; import org.apache.sling.distribution.journal.impl.shared.Topics; import org.apache.sling.distribution.journal.messages.Messages; import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; @@ -55,8 +56,7 @@ import org.slf4j.LoggerFactory; * Listens for discovery messages and tracks presence of Subscribers as well as * the last processed offset of each Subscriber * - * This component uses lazy starting so it is only started when there is at least one Agent - * that requires it. + * This component is only activated when there is at least one DistributionSubscriber agent configured. * * This component is meant to be shared by Publisher agents. */ @@ -70,7 +70,10 @@ public class DiscoveryService implements Runnable { @Reference private JournalAvailable journalAvailable; - + + @Reference + private PublisherConfigurationAvailable publisherConfigurationAvailable; + @Reference private MessagingProvider messagingProvider; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 55787dc..d366ab3 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -86,12 +86,14 @@ import org.apache.sling.distribution.journal.JournalAvailable; @Component( service = {}, immediate = true, - configurationPid = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory" + configurationPid = DistributionPublisher.FACTORY_PID ) @Designate(ocd = PublisherConfiguration.class, factory = true) @ParametersAreNonnullByDefault public class DistributionPublisher implements DistributionAgent { + public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"; + private final Map<DistributionRequestType, Consumer<PackageMessage>> REQ_TYPES = new HashMap<>(); private final DefaultDistributionLog log; diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java new file mode 100644 index 0000000..c0a6db4 --- /dev/null +++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.shared; + +import java.io.IOException; + +import org.apache.sling.distribution.journal.impl.publisher.DistributionPublisher; +import org.osgi.framework.BundleContext; +import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT; +import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE; +import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD; +import static org.osgi.service.cm.ConfigurationAdmin.SERVICE_FACTORYPID; + +/** + * This task periodically checks for DistributionPublisher agent + * configuration availability and registers the marker service + * {@link PublisherConfigurationAvailable} when such configuration + * could be found. To avoid costly reactivation cycles, the marker + * service remains registered until this task is deactivated. + * This task is meant to be executed on every instance, even in a cluster. + */ +@Component( + property = { + PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false", + PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=false", + PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute + } +) +public class PublisherConfigurationAvailable implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(PublisherConfigurationAvailable.class); + + private volatile ServiceRegistration<PublisherConfigurationAvailable> reg; //NOSONAR + + private volatile BundleContext context; //NOSONAR + + private final Object lock = new Object(); + + @Reference + private ConfigurationAdmin configAdmin; + + @Activate + public void activate(BundleContext context) { + this.context = context; + } + + @Deactivate + public void deactivate() { + synchronized (lock) { + if (reg != null) { + reg.unregister(); + LOG.info("Unregistered marker service"); + } + } + } + + @Override + public void run() { + synchronized (lock) { + if (reg == null && hasPublisherConfigurations()) { + reg = context.registerService(PublisherConfigurationAvailable.class, this, null); + LOG.info("Registered marker service"); + } + } + } + + protected boolean isAvailable() { + return reg != null; + } + + private boolean hasPublisherConfigurations() { + String filter = "(" + SERVICE_FACTORYPID + "=" + DistributionPublisher.FACTORY_PID + ")"; + try { + Configuration[] configs = configAdmin.listConfigurations(filter); + return configs != null && configs.length > 0; + } catch (IOException | InvalidSyntaxException e) { + LOG.warn("Failed to search for DistributionPublisher agent configurations", e); + } + return false; + } +} diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java new file mode 100644 index 0000000..9b968e9 --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.impl.shared; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PublisherConfigurationAvailableTest { + + @InjectMocks + private PublisherConfigurationAvailable pca; + + @Mock + private BundleContext context; + + @Mock + private ConfigurationAdmin configAdmin; + + @Before + public void before() throws Exception { + ServiceRegistration<PublisherConfigurationAvailable> serviceReg = mock(ServiceRegistration.class); + when(context.registerService(eq(PublisherConfigurationAvailable.class), any(PublisherConfigurationAvailable.class), any())) + .thenReturn(serviceReg); + when(configAdmin.listConfigurations(anyString())) + .thenReturn(null); + pca.activate(context); + } + + @After + public void after() { + pca.deactivate(); + } + + @Test + public void testNoConfiguration() { + assertFalse(pca.isAvailable()); + pca.run(); + assertFalse(pca.isAvailable()); + } + + @Test + public void testWithZeroConfiguration() throws Exception { + assertFalse(pca.isAvailable()); + addConfigurations(0); + pca.run(); + assertFalse(pca.isAvailable()); + } + + @Test + public void testWithOneConfiguration() throws Exception { + assertFalse(pca.isAvailable()); + addConfigurations(1); + pca.run(); + assertTrue(pca.isAvailable()); + } + + @Test + public void testWithManyConfigurations() throws Exception { + assertFalse(pca.isAvailable()); + addConfigurations(10); + pca.run(); + assertTrue(pca.isAvailable()); + } + + @Test + public void testRemainAvailable() throws Exception { + addConfigurations(1); + pca.run(); + assertTrue(pca.isAvailable()); + removeAllConfigurations(); + pca.run(); + assertTrue(pca.isAvailable()); + } + + private void removeAllConfigurations() throws Exception { + when(configAdmin.listConfigurations(anyString())) + .thenReturn(null); + } + + private void addConfigurations(int nbConfigurations) throws Exception { + when(configAdmin.listConfigurations(anyString())) + .thenReturn(new Configuration[nbConfigurations]); + } + +} \ No newline at end of file