This is an automated email from the ASF dual-hosted git repository.
pzampino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new 49d08ba KNOX-2298 - ClouderaManager cluster config monitor should
stop monitoring unreferenced clusters (#291)
49d08ba is described below
commit 49d08ba57cdec116f25503d73570b47439873ee2
Author: Phil Zampino <[email protected]>
AuthorDate: Tue Mar 17 17:38:33 2020 -0400
KNOX-2298 - ClouderaManager cluster config monitor should stop monitoring
unreferenced clusters (#291)
---
gateway-discovery-cm/pom.xml | 4 +
.../ClouderaManagerServiceDiscoveryMessages.java | 4 +
.../cm/monitor/PollingConfigurationAnalyzer.java | 98 +++++++++++++++
.../monitor/PollingConfigurationAnalyzerTest.java | 133 +++++++++++++++++++++
.../topology/impl/DefaultTopologyService.java | 8 +-
5 files changed, 243 insertions(+), 4 deletions(-)
diff --git a/gateway-discovery-cm/pom.xml b/gateway-discovery-cm/pom.xml
index f3ee23e..3107fb2 100644
--- a/gateway-discovery-cm/pom.xml
+++ b/gateway-discovery-cm/pom.xml
@@ -38,6 +38,10 @@
</dependency>
<dependency>
<groupId>org.apache.knox</groupId>
+ <artifactId>gateway-topology-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.knox</groupId>
<artifactId>gateway-i18n</artifactId>
</dependency>
<dependency>
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index aecb990..289c752 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -119,6 +119,10 @@ public interface ClouderaManagerServiceDiscoveryMessages {
text = "Stopped ClouderaManager cluster configuration monitor")
void stoppedClouderaManagerConfigMonitor();
+ @Message(level = MessageLevel.INFO,
+ text = "Terminating monitoring of {1} @ {0} for configuration
changes because there are no referencing descriptors.")
+ void stoppingConfigMonitoring(String discoverySource, String clusterName);
+
@Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for
configuration changes...")
void checkingClusterConfiguration(String clusterName, String
discoveryAddress);
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index 5631c41..b9f163b 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -29,13 +29,22 @@ import com.cloudera.api.swagger.model.ApiEventQueryResult;
import com.cloudera.api.swagger.model.ApiRole;
import com.cloudera.api.swagger.model.ApiRoleList;
import com.cloudera.api.swagger.model.ApiServiceConfig;
+import org.apache.knox.gateway.GatewayServer;
import org.apache.knox.gateway.i18n.messages.MessagesFactory;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
import org.apache.knox.gateway.services.security.AliasService;
import org.apache.knox.gateway.services.security.KeystoreService;
+import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import
org.apache.knox.gateway.topology.discovery.cm.ClouderaManagerServiceDiscoveryMessages;
import org.apache.knox.gateway.topology.discovery.cm.DiscoveryApiClient;
+import org.apache.knox.gateway.topology.simple.SimpleDescriptor;
+import org.apache.knox.gateway.topology.simple.SimpleDescriptorFactory;
+import java.io.File;
+import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -69,6 +78,9 @@ public class PollingConfigurationAnalyzer implements Runnable
{
private static final ClouderaManagerServiceDiscoveryMessages log =
MessagesFactory.get(ClouderaManagerServiceDiscoveryMessages.class);
+ // Fully-qualified cluster name delimiter
+ private static final String FQCN_DELIM = "::";
+
private ClusterConfigurationCache configCache;
// Single listener for configuration change events
@@ -78,6 +90,10 @@ public class PollingConfigurationAnalyzer implements
Runnable {
private KeystoreService keystoreService;
+ private TopologyService topologyService;
+
+ private ClusterConfigurationMonitorService ccms;
+
// Polling interval in seconds
private int interval;
@@ -134,11 +150,19 @@ public class PollingConfigurationAnalyzer implements
Runnable {
isActive = true;
while (isActive) {
+ List<String> clustersToStopMonitoring = new ArrayList<>();
+
for (Map.Entry<String, List<String>> entry :
configCache.getClusterNames().entrySet()) {
String address = entry.getKey();
for (String clusterName : entry.getValue()) {
log.checkingClusterConfiguration(clusterName, address);
+ // Check here for existing descriptor references, and add to the
removal list if there are not any
+ if (!clusterReferencesExist(address, clusterName)) {
+ clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+ continue;
+ }
+
// Configuration changes don't mean anything without corresponding
service restarts. Therefore, monitor
// restart events, and check the configuration only of the restarted
service(s) to identify changes
// that should trigger re-discovery.
@@ -202,12 +226,86 @@ public class PollingConfigurationAnalyzer implements
Runnable {
}
}
+ // Remove outdated entries from the cache
+ for (String fqcn : clustersToStopMonitoring) {
+ String[] parts = fqcn.split(FQCN_DELIM);
+ stopMonitoring(parts[0], parts[1]);
+ }
+ clustersToStopMonitoring.clear(); // reset the removal list
+
waitFor(interval);
}
log.stoppedClouderaManagerConfigMonitor();
}
+ private TopologyService getTopologyService() {
+ if (topologyService == null) {
+ GatewayServices gws = GatewayServer.getGatewayServices();
+ if (gws != null) {
+ topologyService = gws.getService(ServiceType.TOPOLOGY_SERVICE);
+ }
+ }
+ return topologyService;
+ }
+
+ private ClusterConfigurationMonitorService getConfigMonitorService() {
+ if (ccms == null) {
+ GatewayServices gws = GatewayServer.getGatewayServices();
+ if (gws != null) {
+ ccms =
gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE);
+ }
+ }
+ return ccms;
+ }
+
+ /**
+ * Determine if any descriptors reference the specified discovery source and
cluster.
+ *
+ * @param source A discovery source
+ * @param clusterName A discovery cluster name
+ *
+ * @return true, if at least one descriptor references the specified
discovery information; Otherwise, false.
+ */
+ private boolean clusterReferencesExist(final String source, final String
clusterName) {
+ boolean remainingClusterRefs = false;
+
+ if (source != null && clusterName != null) {
+ TopologyService ts = getTopologyService();
+ if (ts != null) {
+ for (File f : ts.getDescriptors()) {
+ try {
+ SimpleDescriptor sd =
SimpleDescriptorFactory.parse(f.toPath().toAbsolutePath().toString());
+ if (source.equals(sd.getDiscoveryAddress()) &&
clusterName.equals(sd.getCluster())) {
+ remainingClusterRefs = true;
+ break;
+ }
+ } catch (IOException e) {
+ // Ignore these errors
+ }
+ }
+ } else {
+ remainingClusterRefs = true; // If the TopologyService is unavailable,
assume references remain
+ }
+ }
+
+ return remainingClusterRefs;
+ }
+
+ /**
+ * Stop monitoring the specified cluster for configuration changes.
+ *
+ * @param source The discovery source
+ * @param clusterName The name of the cluster
+ */
+ private void stopMonitoring(final String source, final String clusterName) {
+ ClusterConfigurationMonitorService ms = getConfigMonitorService();
+ if (ms != null) {
+ log.stoppingConfigMonitoring(source, clusterName);
+ ms.clearCache(source, clusterName);
+ }
+ }
+
/**
* Notify the registered change listener.
*
diff --git
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
index 2001bea..a817c83 100644
---
a/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
+++
b/gateway-discovery-cm/src/test/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzerTest.java
@@ -20,11 +20,21 @@ import com.cloudera.api.swagger.client.ApiClient;
import com.cloudera.api.swagger.model.ApiEvent;
import com.cloudera.api.swagger.model.ApiEventAttribute;
import com.cloudera.api.swagger.model.ApiEventCategory;
+import org.apache.commons.io.FileUtils;
+import org.apache.knox.gateway.GatewayServer;
+import org.apache.knox.gateway.services.GatewayServices;
+import org.apache.knox.gateway.services.ServiceType;
+import org.apache.knox.gateway.services.topology.TopologyService;
+import org.apache.knox.gateway.topology.ClusterConfigurationMonitorService;
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import
org.apache.knox.gateway.topology.discovery.cm.model.hdfs.NameNodeServiceModelGenerator;
import org.easymock.EasyMock;
import org.junit.Test;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@@ -36,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static
org.apache.knox.gateway.topology.discovery.ClusterConfigurationMonitor.ConfigurationChangeListener;
+import static org.easymock.EasyMock.getCurrentArguments;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -150,6 +161,128 @@ public class PollingConfigurationAnalyzerTest {
}
+ @Test
+ public void
testClusterConfigMonitorTerminationForNoLongerReferencedClusters() {
+ final String address = "http://host1:1234";
+ final String clusterName = "Cluster 5";
+
+ final String updatedAddress = "http://host2:1234";
+ final String descContent =
+ "{\n" +
+ " \"discovery-type\": \"ClouderaManager\",\n" +
+ " \"discovery-address\": \"" + updatedAddress + "\",\n" +
+ " \"cluster\": \"" + clusterName + "\",\n" +
+ " \"provider-config-ref\": \"ldap\",\n" +
+ " \"services\": [\n" +
+ " {\n" +
+ " \"name\": \"WEBHDFS\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ File descriptor = null;
+ try {
+ descriptor = File.createTempFile("test", ".json");
+ FileUtils.writeStringToFile(descriptor, descContent,
StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // Mock the service discovery details
+ ServiceDiscoveryConfig sdc =
EasyMock.createNiceMock(ServiceDiscoveryConfig.class);
+ EasyMock.expect(sdc.getCluster()).andReturn(clusterName).anyTimes();
+ EasyMock.expect(sdc.getAddress()).andReturn(address).anyTimes();
+ EasyMock.expect(sdc.getUser()).andReturn("u").anyTimes();
+ EasyMock.expect(sdc.getPasswordAlias()).andReturn("a").anyTimes();
+ EasyMock.replay(sdc);
+
+ final Map<String, List<String>> clusterNames = new HashMap<>();
+ clusterNames.put(address, Collections.singletonList(clusterName));
+
+ // Create the original ServiceConfigurationModel details
+ final Map<String, ServiceConfigurationModel> serviceConfigurationModels =
new HashMap<>();
+ final Map<String, String> nnServiceConf = new HashMap<>();
+ final Map<String, Map<String, String>> nnRoleConf = new HashMap<>();
+ nnRoleConf.put(NameNodeServiceModelGenerator.ROLE_TYPE,
Collections.emptyMap());
+ serviceConfigurationModels.put(NameNodeServiceModelGenerator.SERVICE_TYPE
+ "-1", createModel(nnServiceConf, nnRoleConf));
+
+ // Create a ClusterConfigurationCache for the monitor to use
+ final ClusterConfigurationCache configCache = new
ClusterConfigurationCache();
+ configCache.addDiscoveryConfig(sdc);
+ configCache.addServiceConfiguration(address, clusterName,
serviceConfigurationModels);
+ assertEquals(1, configCache.getClusterNames().get(address).size());
+
+ // Set up GatewayServices
+
+ // TopologyService mock
+ TopologyService ts = EasyMock.createNiceMock(TopologyService.class);
+
EasyMock.expect(ts.getDescriptors()).andReturn(Collections.singletonList(descriptor)).anyTimes();
+
+ // ClusterConfigurationMonitorService mock
+ ClusterConfigurationMonitorService ccms =
EasyMock.createNiceMock(ClusterConfigurationMonitorService.class);
+ // Implement the clearing of the cache for the mock
+ ccms.clearCache(address, clusterName);
+ EasyMock.expectLastCall().andAnswer(() -> {
+ Object[] args =
getCurrentArguments();
+
configCache.removeServiceConfiguration((String)args[0], (String)args[1]);
+ return null;
+ }).once();
+
+ // GatewayServices mock
+ GatewayServices gws = EasyMock.createNiceMock(GatewayServices.class);
+
EasyMock.expect(gws.getService(ServiceType.TOPOLOGY_SERVICE)).andReturn(ts).anyTimes();
+
EasyMock.expect(gws.getService(ServiceType.CLUSTER_CONFIGURATION_MONITOR_SERVICE)).andReturn(ccms).anyTimes();
+ EasyMock.replay(ts, ccms, gws);
+
+ try {
+ setGatewayServices(gws);
+
+ // Create the monitor
+ TestablePollingConfigAnalyzer pca = new
TestablePollingConfigAnalyzer(configCache);
+ pca.setInterval(5);
+
+ // Start the polling thread
+ ExecutorService pollingThreadExecutor =
Executors.newSingleThreadExecutor();
+ pollingThreadExecutor.execute(pca);
+ pollingThreadExecutor.shutdown();
+
+ try {
+ pollingThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ //
+ }
+
+ // Stop the config analyzer thread
+ pca.stop();
+
+ if (descriptor != null && descriptor.exists()) {
+ descriptor.deleteOnExit();
+ }
+
+ assertEquals("Expected the config cache entry for " + clusterName + " to
have been removed.",
+ 0,
+ configCache.getClusterNames().get(address).size());
+ } finally {
+ // Reset the GatewayServices field of GatewayServer
+ setGatewayServices(null);
+ }
+ }
+
+ /**
+ * Set the static GatewayServices field to the specified value.
+ *
+ * @param gws A GatewayServices object, or null.
+ */
+ private void setGatewayServices(final GatewayServices gws) {
+ try {
+ Field gwsField = GatewayServer.class.getDeclaredField("services");
+ gwsField.setAccessible(true);
+ gwsField.set(null, gws);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
private ApiEvent createApiEvent(final ApiEventCategory category, final
List<ApiEventAttribute> attrs) {
ApiEvent event = EasyMock.createNiceMock(ApiEvent.class);
EasyMock.expect(event.getTimeOccurred()).andReturn(Instant.now().toString()).anyTimes();
diff --git
a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
index 411307e..2c1a62f 100644
---
a/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
+++
b/gateway-server/src/main/java/org/apache/knox/gateway/services/topology/impl/DefaultTopologyService.java
@@ -789,7 +789,7 @@ public class DefaultTopologyService extends
FileAlterationListenerAdaptor implem
/**
- * Listener for Ambari config change events, which will trigger
re-generation (including re-discovery) of the
+ * Listener for cluster config change events, which will trigger
re-generation (including re-discovery) of the
* affected topologies.
*/
private static class TopologyDiscoveryTrigger implements
ClusterConfigurationMonitor.ConfigurationChangeListener {
@@ -803,14 +803,14 @@ public class DefaultTopologyService extends
FileAlterationListenerAdaptor implem
}
@Override
- public void onConfigurationChange(String source, String clusterName) {
+ public void onConfigurationChange(final String source, final String
clusterName) {
log.noticedClusterConfigurationChange(source, clusterName);
try {
boolean affectedDescriptors = false;
// Identify any descriptors associated with the cluster configuration
change
for (File descriptor : topologyService.getDescriptors()) {
- String descriptorContent = FileUtils.readFileToString(descriptor,
StandardCharsets.UTF_8);
- if (descriptorContent.contains(source) &&
descriptorContent.contains(clusterName)) {
+ SimpleDescriptor sd =
SimpleDescriptorFactory.parse(descriptor.getAbsolutePath());
+ if (source.equals(sd.getDiscoveryAddress()) &&
clusterName.equals(sd.getCluster())) {
affectedDescriptors = true;
log.triggeringTopologyRegeneration(source, clusterName,
descriptor.getAbsolutePath());
// 'Touch' the descriptor to trigger re-generation of the
associated topology