This is an automated email from the ASF dual-hosted git repository.

pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d08ba  KNOX-2298 - ClouderaManager cluster config monitor should 
stop monitoring unreferenced clusters (#291)
49d08ba is described below

commit 49d08ba57cdec116f25503d73570b47439873ee2
Author: Phil Zampino <[email protected]>
AuthorDate: Tue Mar 17 17:38:33 2020 -0400

    KNOX-2298 - ClouderaManager cluster config monitor should stop monitoring 
unreferenced clusters (#291)
---
 gateway-discovery-cm/pom.xml                       |   4 +
 .../ClouderaManagerServiceDiscoveryMessages.java   |   4 +
 .../cm/monitor/PollingConfigurationAnalyzer.java   |  98 +++++++++++++++
 .../monitor/PollingConfigurationAnalyzerTest.java  | 133 +++++++++++++++++++++
 .../topology/impl/DefaultTopologyService.java      |   8 +-
 5 files changed, 243 insertions(+), 4 deletions(-)

diff --git a/gateway-discovery-cm/pom.xml b/gateway-discovery-cm/pom.xml
index f3ee23e..3107fb2 100644
--- a/gateway-discovery-cm/pom.xml
+++ b/gateway-discovery-cm/pom.xml
@@ -38,6 +38,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.knox</groupId>
+            <artifactId>gateway-topology-simple</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.knox</groupId>
             <artifactId>gateway-i18n</artifactId>
         </dependency>
         <dependency>
diff --git 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index aecb990..289c752 100644
--- 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++ 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -119,6 +119,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
       text = "Stopped ClouderaManager cluster configuration monitor")
   void stoppedClouderaManagerConfigMonitor();
 
+  @Message(level = MessageLevel.INFO,
+           text = "Terminating monitoring of {1} @ {0} for configuration 
changes because there are no referencing descriptors.")
+  void stoppingConfigMonitoring(String discoverySource, String clusterName);
+
   @Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for 
configuration changes...")
   void checkingClusterConfiguration(String clusterName, String 
discoveryAddress);
 
diff --git 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 5631c41..b9f163b 100644
--- 
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++ 
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -29,13 +29,22 @@ import com.cloudera.api.swagger.model.ApiEventQueryResult;
 import com.cloudera.api.swagger.model.ApiRole;
 import com.cloudera.api.swagger.model.ApiRoleList;
 import com.cloudera.api.swagger.model.ApiServiceConfig;
+import org.apache.knox.gateway.GatewayServer;
 import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
 import org.apache.knox.gateway.services.security.AliasService;
 import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
 import 
org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages;
 import org.apache.knox.gateway.topology.discovery.cm.DiscoveryApiClient;
+import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
+import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
 
+import java.io.File;
+import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
@@ -69,6 +78,9 @@ public class PollingConfigurationAnalyzer implements Runnable 
{
   private static final ClouderaManagerServiceDiscoveryMessages log =
                   
MessagesFactory.get(ClouderaManagerServiceDiscoveryMessages.class);
 
+  // Fully-qualified cluster name delimiter
+  private static final String FQCN_DELIM = "::";
+
   private ClusterConfigurationCache configCache;
 
   // Single listener for configuration change events
@@ -78,6 +90,10 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
 
   private KeystoreService keystoreService;
 
+  private TopologyService topologyService;
+
+  private ClusterConfigurationMonitorService ccms;
+
   // Polling interval in seconds
   private int interval;
 
@@ -134,11 +150,19 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
     isActive = true;
 
     while (isActive) {
+      List<String> clustersToStopMonitoring = new ArrayList<>();
+
       for (Map.Entry<String, List<String>> entry : 
configCache.getClusterNames().entrySet()) {
         String address = entry.getKey();
         for (String clusterName : entry.getValue()) {
           log.checkingClusterConfiguration(clusterName, address);
 
+          // Check here for existing descriptor references, and add to the 
removal list if there are not any
+          if (!clusterReferencesExist(address, clusterName)) {
+            clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+            continue;
+          }
+
           // Configuration changes don't mean anything without corresponding 
service restarts. Therefore, monitor
           // restart events, and check the configuration only of the restarted 
service(s) to identify changes
           // that should trigger re-discovery.
@@ -202,12 +226,86 @@ public class PollingConfigurationAnalyzer implements 
Runnable {
         }
       }
 
+      // Remove outdated entries from the cache
+      for (String fqcn : clustersToStopMonitoring) {
+        String[] parts = fqcn.split(FQCN_DELIM);
+        stopMonitoring(parts[0], parts[1]);
+      }
+      clustersToStopMonitoring.clear(); // reset the removal list
+
       waitFor(interval);
     }
 
     log.stoppedClouderaManagerConfigMonitor();
   }
 
+  private TopologyService getTopologyService() {
+    if (topologyService == null) {
+      GatewayServices gws = GatewayServer.getGatewayServices();
+      if (gws != null) {
+        topologyService = gws.getService(ServiceType.TOPOLOGY_SERVICE);
+      }
+    }
+    return topologyService;
+  }
+
+  private ClusterConfigurationMonitorService getConfigMonitorService() {
+    if (ccms == null) {
+      GatewayServices gws = GatewayServer.getGatewayServices();
+      if (gws != null) {
+        ccms = 
gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+      }
+    }
+    return ccms;
+  }
+
+  /**
+   * Determine if any descriptors reference the specified discovery source and 
cluster.
+   *
+   * @param source      A discovery source
+   * @param clusterName A discovery cluster name
+   *
+   * @return true, if at least one descriptor references the specified 
discovery information; Otherwise, false.
+   */
+  private boolean clusterReferencesExist(final String source, final String 
clusterName) {
+    boolean remainingClusterRefs = false;
+
+    if (source != null && clusterName != null) {
+      TopologyService ts = getTopologyService();
+      if (ts != null) {
+        for (File f : ts.getDescriptors()) {
+          try {
+            SimpleDescriptor sd = 
SimpleDescriptorFactory.parse(f.toPath().toAbsolutePath().toString());
+            if (source.equals(sd.getDiscoveryAddress()) && 
clusterName.equals(sd.getCluster())) {
+              remainingClusterRefs = true;
+              break;
+            }
+          } catch (IOException e) {
+            // Ignore these errors
+          }
+        }
+      } else {
+        remainingClusterRefs = true; // If the TopologyService is unavailable, 
assume references remain
+      }
+    }
+
+    return remainingClusterRefs;
+  }
+
+  /**
+   * Stop monitoring the specified cluster for configuration changes.
+   *
+   * @param source      The discovery source
+   * @param clusterName The name of the cluster
+   */
+  private void stopMonitoring(final String source, final String clusterName) {
+    ClusterConfigurationMonitorService ms = getConfigMonitorService();
+    if (ms != null) {
+      log.stoppingConfigMonitoring(source, clusterName);
+      ms.clearCache(source, clusterName);
+    }
+  }
+
   /**
    * Notify the registered change listener.
    *
diff --git 
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
 
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 2001bea..a817c83 100644
--- 
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++ 
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -20,11 +20,21 @@ import com.cloudera.api.swagger.client.ApiClient;
 import com.cloudera.api.swagger.model.ApiEvent;
 import com.cloudera.api.swagger.model.ApiEventAttribute;
 import com.cloudera.api.swagger.model.ApiEventCategory;
+import org.apache.commons.io.FileUtils;
+import org.apache.knox.gateway.GatewayServer;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
 import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
 import 
org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -36,6 +46,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
+import static org.easymock.EasyMock.getCurrentArguments;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -150,6 +161,128 @@ public class PollingConfigurationAnalyzerTest {
   }
 
 
+  @Test
+  public void 
testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
+    final String address = "http://host1:1234";;
+    final String clusterName = "Cluster 5";
+
+    final String updatedAddress = "http://host2:1234";;
+    final String descContent =
+        "{\n" +
+        "  \"discovery-type\": \"ClouderaManager\",\n" +
+        "  \"discovery-address\": \"" + updatedAddress + "\",\n" +
+        "  \"cluster\": \"" + clusterName + "\",\n" +
+        "  \"provider-config-ref\": \"ldap\",\n" +
+        "  \"services\": [\n" +
+        "    {\n" +
+        "      \"name\": \"WEBHDFS\"\n" +
+        "    }\n" +
+        "  ]\n" +
+        "}";
+
+    File descriptor = null;
+    try {
+      descriptor = File.createTempFile("test", ".json");
+      FileUtils.writeStringToFile(descriptor, descContent, 
StandardCharsets.UTF_8);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    // Mock the service discovery details
+    ServiceDiscoveryConfig sdc = 
EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+    EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
+    EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+    EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
+    EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
+    EasyMock.replay(sdc);
+
+    final Map<String, List<String>> clusterNames = new HashMap<>();
+    clusterNames.put(address, Collections.singletonList(clusterName));
+
+    // Create the original ServiceConfigurationModel details
+    final Map<String, ServiceConfigurationModel> serviceConfigurationModels = 
new HashMap<>();
+    final Map<String, String> nnServiceConf = new HashMap<>();
+    final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+    nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE, 
Collections.emptyMap());
+    serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE 
+ "-1", createModel(nnServiceConf, nnRoleConf));
+
+    // Create a ClusterConfigurationCache for the monitor to use
+    final ClusterConfigurationCache configCache = new 
ClusterConfigurationCache();
+    configCache.addDiscoveryConfig(sdc);
+    configCache.addServiceConfiguration(address, clusterName, 
serviceConfigurationModels);
+    assertEquals(1, configCache.getClusterNames().get(address).size());
+
+    // Set up GatewayServices
+
+    // TopologyService mock
+    TopologyService ts = EasyMock.createNiceMock(TopologyService.class);
+    
EasyMock.expect(ts.getDescriptors()).andReturn(Collections.singletonList(descriptor)).anyTimes();
+
+    // ClusterConfigurationMonitorService mock
+    ClusterConfigurationMonitorService ccms = 
EasyMock.createNiceMock(ClusterConfigurationMonitorService.class);
+    // Implement the clearing of the cache for the mock
+    ccms.clearCache(address, clusterName);
+    EasyMock.expectLastCall().andAnswer(() -> {
+                                              Object[] args = 
getCurrentArguments();
+                                              
configCache.removeServiceConfiguration((String)args[0], (String)args[1]);
+                                              return null;
+                                            }).once();
+
+    // GatewayServices mock
+    GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
+    
EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
+    
EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
+    EasyMock.replay(ts, ccms, gws);
+
+    try {
+      setGatewayServices(gws);
+
+      // Create the monitor
+      TestablePollingConfigAnalyzer pca = new 
TestablePollingConfigAnalyzer(configCache);
+      pca.setInterval(5);
+
+      // Start the polling thread
+      ExecutorService pollingThreadExecutor = 
Executors.newSingleThreadExecutor();
+      pollingThreadExecutor.execute(pca);
+      pollingThreadExecutor.shutdown();
+
+      try {
+        pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        //
+      }
+
+      // Stop the config analyzer thread
+      pca.stop();
+
+      if (descriptor != null && descriptor.exists()) {
+        descriptor.deleteOnExit();
+      }
+
+      assertEquals("Expected the config cache entry for " + clusterName + " to 
have been removed.",
+                   0,
+                   configCache.getClusterNames().get(address).size());
+    } finally {
+      // Reset the GatewayServices field of GatewayServer
+      setGatewayServices(null);
+    }
+  }
+
+  /**
+   * Set the static GatewayServices field to the specified value.
+   *
+   * @param gws A GatewayServices object, or null.
+   */
+  private void setGatewayServices(final GatewayServices gws) {
+    try {
+      Field gwsField = GatewayServer.class.getDeclaredField("services");
+      gwsField.setAccessible(true);
+      gwsField.set(null, gws);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
   private ApiEvent createApiEvent(final ApiEventCategory category, final 
List<ApiEventAttribute> attrs) {
     ApiEvent event = EasyMock.createNiceMock(ApiEvent.class);
     
EasyMock.expect(event.getTimeOccurred()).andReturn(Instant.now().toString()).anyTimes();
diff --git 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
 
b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
index 411307e..2c1a62f 100644
--- 
a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
+++ 
b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
@@ -789,7 +789,7 @@ public class DefaultTopologyService extends 
FileAlterationListenerAdaptor implem
 
 
   /**
-   * Listener for Ambari config change events, which will trigger 
re-generation (including re-discovery) of the
+   * Listener for cluster config change events, which will trigger 
re-generation (including re-discovery) of the
    * affected topologies.
    */
   private static class TopologyDiscoveryTrigger implements 
ClusterConfigurationMonitor.ConfigurationChangeListener {
@@ -803,14 +803,14 @@ public class DefaultTopologyService extends 
FileAlterationListenerAdaptor implem
     }
 
     @Override
-    public void onConfigurationChange(String source, String clusterName) {
+    public void onConfigurationChange(final String source, final String 
clusterName) {
       log.noticedClusterConfigurationChange(source, clusterName);
       try {
         boolean affectedDescriptors = false;
         // Identify any descriptors associated with the cluster configuration 
change
         for (File descriptor : topologyService.getDescriptors()) {
-          String descriptorContent = FileUtils.readFileToString(descriptor, 
StandardCharsets.UTF_8);
-          if (descriptorContent.contains(source) && 
descriptorContent.contains(clusterName)) {
+          SimpleDescriptor sd = 
SimpleDescriptorFactory.parse(descriptor.getAbsolutePath());
+          if (source.equals(sd.getDiscoveryAddress()) && 
clusterName.equals(sd.getCluster())) {
             affectedDescriptors = true;
             log.triggeringTopologyRegeneration(source, clusterName, 
descriptor.getAbsolutePath());
             // 'Touch' the descriptor to trigger re-generation of the 
associated topology

Reply via email to