This is an automated email from the ASF dual-hosted git repository.
smolnar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/knox.git
The following commit(s) were added to refs/heads/master by this push:
new f96b62d KNOX-2351 - Catching any errors while monitoring CM
configuration changes (#324)
f96b62d is described below
commit f96b62d133d300519fc45f295f5e7a2b58922949
Author: Sandor Molnar <[email protected]>
AuthorDate: Mon Apr 27 16:58:09 2020 +0200
KNOX-2351 - Catching any errors while monitoring CM configuration changes
(#324)
---
.../ClouderaManagerServiceDiscoveryMessages.java | 3 +
.../cm/monitor/PollingConfigurationAnalyzer.java | 153 +++++++++++----------
2 files changed, 82 insertions(+), 74 deletions(-)
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
index 289c752..7f7c644 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/ClouderaManagerServiceDiscoveryMessages.java
@@ -126,6 +126,9 @@ public interface ClouderaManagerServiceDiscoveryMessages {
@Message(level = MessageLevel.DEBUG, text = "Checking {0} @ {1} for
configuration changes...")
void checkingClusterConfiguration(String clusterName, String
discoveryAddress);
+ @Message(level = MessageLevel.ERROR, text = "Error while monitoring
ClouderaManager configuration changes: {0}")
+ void clouderaManagerConfigurationChangesMonitoringError(@StackTrace(level =
MessageLevel.DEBUG) Exception e);
+
@Message(level = MessageLevel.ERROR,
text = "Error getting service configuration details from
ClouderaManager: {0}")
void clouderaManagerConfigurationAPIError(@StackTrace(level =
MessageLevel.DEBUG) ApiException e);
diff --git
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
index fb8d73c..4b7935f 100644
---
a/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
+++
b/gateway-discovery-cm/src/main/java/org/apache/knox/gateway/topology/discovery/cm/monitor/PollingConfigurationAnalyzer.java
@@ -162,93 +162,98 @@ public class PollingConfigurationAnalyzer implements
Runnable {
isActive = true;
while (isActive) {
- List<String> clustersToStopMonitoring = new ArrayList<>();
+ try {
+ final List<String> clustersToStopMonitoring = new ArrayList<>();
+
+ for (Map.Entry<String, List<String>> entry :
configCache.getClusterNames().entrySet()) {
+ String address = entry.getKey();
+ for (String clusterName : entry.getValue()) {
+ log.checkingClusterConfiguration(clusterName, address);
+
+ // Check here for existing descriptor references, and add to the
removal list if there are not any
+ if (!clusterReferencesExist(address, clusterName)) {
+ clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
+ continue;
+ }
- for (Map.Entry<String, List<String>> entry :
configCache.getClusterNames().entrySet()) {
- String address = entry.getKey();
- for (String clusterName : entry.getValue()) {
- log.checkingClusterConfiguration(clusterName, address);
+ // Configuration changes don't mean anything without corresponding
service start/restarts. Therefore, monitor
+ // start events, and check the configuration only of the restarted
service(s) to identify changes
+ // that should trigger re-discovery.
+ final List<StartEvent> relevantEvents = getRelevantEvents(address,
clusterName);
- // Check here for existing descriptor references, and add to the
removal list if there are not any
- if (!clusterReferencesExist(address, clusterName)) {
- clustersToStopMonitoring.add(address + FQCN_DELIM + clusterName);
- continue;
+ // If there are no recent start events, then nothing to do now
+ if (!relevantEvents.isEmpty()) {
+ // If a change has occurred, notify the listeners
+ if (hasConfigChanged(address, clusterName, relevantEvents)) {
+ notifyChangeListener(address, clusterName);
+ }
+ }
}
+ }
- // Configuration changes don't mean anything without corresponding
service start/restarts. Therefore, monitor
- // start events, and check the configuration only of the restarted
service(s) to identify changes
- // that should trigger re-discovery.
- List<StartEvent> relevantEvents = getRelevantEvents(address,
clusterName);
-
- // If there are no recent start events, then nothing to do now
- if (!relevantEvents.isEmpty()) {
- boolean configHasChanged = false;
-
- // If there are start events, then check the previously-recorded
properties for the same service to
- // identify if the configuration has changed
- Map<String, ServiceConfigurationModel> serviceConfigurations =
-
configCache.getClusterServiceConfigurations(address, clusterName);
-
- // Those services for which a start even has been handled
- List<String> handledServiceTypes = new ArrayList<>();
-
- for (StartEvent re : relevantEvents) {
- String serviceType = re.getServiceType();
-
- // Determine if we've already handled a start event for this
service type
- if (!handledServiceTypes.contains(serviceType)) {
-
- // Get the previously-recorded configuration
- ServiceConfigurationModel serviceConfig =
serviceConfigurations.get(re.getServiceType());
-
- if (serviceConfig != null) {
- // Get the current config for the started service, and
compare with the previously-recorded config
- ServiceConfigurationModel currentConfig =
- getCurrentServiceConfiguration(address,
clusterName, re.getService());
-
- if (currentConfig != null) {
- log.analyzingCurrentServiceConfiguration(re.getService());
- try {
- configHasChanged =
hasConfigurationChanged(serviceConfig, currentConfig);
- } catch (Exception e) {
-
log.errorAnalyzingCurrentServiceConfiguration(re.getService(), e);
- }
- }
- } else {
- // A new service (no prior config) represent a config
change, since a descriptor may have referenced
- // the "new" service, but discovery had previously not
succeeded because the service had not been
- // configured (appropriately) at that time.
- log.serviceEnabled(re.getService());
- configHasChanged = true;
- }
-
- handledServiceTypes.add(serviceType);
- }
+ // Remove outdated entries from the cache
+ for (String fqcn : clustersToStopMonitoring) {
+ String[] parts = fqcn.split(FQCN_DELIM);
+ stopMonitoring(parts[0], parts[1]);
+ }
+ clustersToStopMonitoring.clear(); // reset the removal list
- if (configHasChanged) {
- break; // No need to continue checking once we've identified
one reason to perform discovery again
- }
- }
+ waitFor(interval);
+ } catch (Exception e) {
+ log.clouderaManagerConfigurationChangesMonitoringError(e);
+ }
+ }
+
+ log.stoppedClouderaManagerConfigMonitor();
+ }
+
+ private boolean hasConfigChanged(String address, String clusterName,
List<StartEvent> relevantEvents) {
+ // If there are start events, then check the previously-recorded
properties for the same service to
+ // identify if the configuration has changed
+ final Map<String, ServiceConfigurationModel> serviceConfigurations =
configCache.getClusterServiceConfigurations(address, clusterName);
+
+ // Those services for which a start even has been handled
+ final List<String> handledServiceTypes = new ArrayList<>();
+
+ boolean configHasChanged = false;
+ for (StartEvent re : relevantEvents) {
+ String serviceType = re.getServiceType();
+
+ // Determine if we've already handled a start event for this service type
+ if (!handledServiceTypes.contains(serviceType)) {
+
+ // Get the previously-recorded configuration
+ ServiceConfigurationModel serviceConfig =
serviceConfigurations.get(re.getServiceType());
- // If a change has occurred, notify the listeners
- if (configHasChanged) {
- notifyChangeListener(address, clusterName);
+ if (serviceConfig != null) {
+ // Get the current config for the started service, and compare with
the previously-recorded config
+ ServiceConfigurationModel currentConfig =
+ getCurrentServiceConfiguration(address, clusterName,
re.getService());
+
+ if (currentConfig != null) {
+ log.analyzingCurrentServiceConfiguration(re.getService());
+ try {
+ configHasChanged = hasConfigurationChanged(serviceConfig,
currentConfig);
+ } catch (Exception e) {
+ log.errorAnalyzingCurrentServiceConfiguration(re.getService(),
e);
}
}
+ } else {
+ // A new service (no prior config) represent a config change, since
a descriptor may have referenced
+ // the "new" service, but discovery had previously not succeeded
because the service had not been
+ // configured (appropriately) at that time.
+ log.serviceEnabled(re.getService());
+ configHasChanged = true;
}
- }
- // Remove outdated entries from the cache
- for (String fqcn : clustersToStopMonitoring) {
- String[] parts = fqcn.split(FQCN_DELIM);
- stopMonitoring(parts[0], parts[1]);
+ handledServiceTypes.add(serviceType);
}
- clustersToStopMonitoring.clear(); // reset the removal list
- waitFor(interval);
+ if (configHasChanged) {
+ break; // No need to continue checking once we've identified one
reason to perform discovery again
+ }
}
-
- log.stoppedClouderaManagerConfigMonitor();
+ return configHasChanged;
}
private TopologyService getTopologyService() {