This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new b76750c  Revert "Revert "SLING-9465 - DiscoveryService is activated on 
all services (#32)""
b76750c is described below

commit b76750c73c3811044156daba97c8819c681109df
Author: tmaret <tma...@adobe.com>
AuthorDate: Mon May 25 13:58:11 2020 +0200

    Revert "Revert "SLING-9465 - DiscoveryService is activated on all services 
(#32)""
    
    This reverts commit f06a4140d5ad35028c00be4000c72c213c6ac202.
---
 .../journal/impl/publisher/DiscoveryService.java   |   9 +-
 .../impl/publisher/DistributionPublisher.java      |   4 +-
 .../shared/PublisherConfigurationAvailable.java    | 108 +++++++++++++++++++
 .../PublisherConfigurationAvailableTest.java       | 116 +++++++++++++++++++++
 4 files changed, 233 insertions(+), 4 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 2e0b665..2cce5bb 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -30,6 +30,7 @@ import java.util.Hashtable;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import 
org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
+import 
org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
 import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
@@ -55,8 +56,7 @@ import org.slf4j.LoggerFactory;
  * Listens for discovery messages and tracks presence of Subscribers as well as
  * the last processed offset of each Subscriber
  *
- * This component uses lazy starting so it is only started when there is at 
least one Agent
- * that requires it.
+ * This component is only activated when there is at least one 
DistributionSubscriber agent configured.
  *
  * This component is meant to be shared by Publisher agents.
  */
@@ -70,7 +70,10 @@ public class DiscoveryService implements Runnable {
 
     @Reference
     private JournalAvailable journalAvailable;
-    
+
+    @Reference
+    private PublisherConfigurationAvailable publisherConfigurationAvailable;
+
     @Reference
     private MessagingProvider messagingProvider;
 
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 55787dc..d366ab3 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -86,12 +86,14 @@ import 
org.apache.sling.distribution.journal.JournalAvailable;
 @Component(
         service = {}, 
         immediate = true,
-        configurationPid = 
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"
+        configurationPid = DistributionPublisher.FACTORY_PID
 )
 @Designate(ocd = PublisherConfiguration.class, factory = true)
 @ParametersAreNonnullByDefault
 public class DistributionPublisher implements DistributionAgent {
 
+    public static final String FACTORY_PID = 
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
+
     private final Map<DistributionRequestType, Consumer<PackageMessage>> 
REQ_TYPES = new HashMap<>();
 
     private final DefaultDistributionLog log;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
new file mode 100644
index 0000000..c0a6db4
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailable.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import java.io.IOException;
+
+import 
org.apache.sling.distribution.journal.impl.publisher.DistributionPublisher;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_IMMEDIATE;
+import static 
org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.osgi.service.cm.ConfigurationAdmin.SERVICE_FACTORYPID;
+
+/**
+ * This task periodically checks for DistributionPublisher agent
+ * configuration availability and registers the marker service
+ * {@link PublisherConfigurationAvailable} when such configuration
+ * could be found. To avoid costly reactivation cycles, the marker
+ * service remains registered until this task is deactivated.
+ * This task is meant to be executed on every instance, even in a cluster.
+ */
+@Component(
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_IMMEDIATE + ":Boolean=false",
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 60, // 1 minute
+        }
+)
+public class PublisherConfigurationAvailable implements Runnable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PublisherConfigurationAvailable.class);
+
+    private volatile ServiceRegistration<PublisherConfigurationAvailable> reg; 
//NOSONAR
+
+    private volatile BundleContext context; //NOSONAR
+
+    private final Object lock = new Object();
+
+    @Reference
+    private ConfigurationAdmin configAdmin;
+
+    @Activate
+    public void activate(BundleContext context) {
+        this.context = context;
+    }
+
+    @Deactivate
+    public void deactivate() {
+        synchronized (lock) {
+            if (reg != null) {
+                reg.unregister();
+                LOG.info("Unregistered marker service");
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        synchronized (lock) {
+            if (reg == null && hasPublisherConfigurations()) {
+                reg = 
context.registerService(PublisherConfigurationAvailable.class, this, null);
+                LOG.info("Registered marker service");
+            }
+        }
+    }
+
+    protected boolean isAvailable() {
+        return reg != null;
+    }
+
+    private boolean hasPublisherConfigurations() {
+        String filter = "(" + SERVICE_FACTORYPID + "=" + 
DistributionPublisher.FACTORY_PID + ")";
+        try {
+            Configuration[] configs = configAdmin.listConfigurations(filter);
+            return configs != null && configs.length > 0;
+        } catch (IOException | InvalidSyntaxException e) {
+            LOG.warn("Failed to search for DistributionPublisher agent 
configurations", e);
+        }
+        return false;
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
new file mode 100644
index 0000000..9b968e9
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/impl/shared/PublisherConfigurationAvailableTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PublisherConfigurationAvailableTest {
+
+    @InjectMocks
+    private PublisherConfigurationAvailable pca;
+
+    @Mock
+    private BundleContext context;
+
+    @Mock
+    private ConfigurationAdmin configAdmin;
+
+    @Before
+    public void before() throws Exception {
+        ServiceRegistration<PublisherConfigurationAvailable> serviceReg = 
mock(ServiceRegistration.class);
+        
when(context.registerService(eq(PublisherConfigurationAvailable.class), 
any(PublisherConfigurationAvailable.class), any()))
+                .thenReturn(serviceReg);
+        when(configAdmin.listConfigurations(anyString()))
+                .thenReturn(null);
+        pca.activate(context);
+    }
+
+    @After
+    public void after() {
+        pca.deactivate();
+    }
+
+    @Test
+    public void testNoConfiguration() {
+        assertFalse(pca.isAvailable());
+        pca.run();
+        assertFalse(pca.isAvailable());
+    }
+
+    @Test
+    public void testWithZeroConfiguration() throws Exception {
+        assertFalse(pca.isAvailable());
+        addConfigurations(0);
+        pca.run();
+        assertFalse(pca.isAvailable());
+    }
+
+    @Test
+    public void testWithOneConfiguration() throws Exception {
+        assertFalse(pca.isAvailable());
+        addConfigurations(1);
+        pca.run();
+        assertTrue(pca.isAvailable());
+    }
+
+    @Test
+    public void testWithManyConfigurations() throws Exception {
+        assertFalse(pca.isAvailable());
+        addConfigurations(10);
+        pca.run();
+        assertTrue(pca.isAvailable());
+    }
+    
+    @Test
+    public void testRemainAvailable() throws Exception {
+        addConfigurations(1);
+        pca.run();
+        assertTrue(pca.isAvailable());
+        removeAllConfigurations();
+        pca.run();
+        assertTrue(pca.isAvailable());
+    }
+    
+    private void removeAllConfigurations() throws Exception {
+        when(configAdmin.listConfigurations(anyString()))
+                .thenReturn(null);
+    }
+
+    private void addConfigurations(int nbConfigurations) throws Exception {
+        when(configAdmin.listConfigurations(anyString()))
+                .thenReturn(new Configuration[nbConfigurations]);
+    }
+
+}
\ No newline at end of file

Reply via email to