This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 6f830c9 AMBARI-25464. Components autostart does not work sometimes
and ambari-agent restart
6f830c9 is described below
commit 6f830c9382eb6e69d9e4182f60b649c2c8ef2d47
Author: aonishuk <[email protected]>
AuthorDate: Mon Jan 27 12:50:05 2020 +0200
AMBARI-25464. Components autostart does not work sometimes and ambari-agent
restart
* AMBARI-25464. Components autostart does not work sometimes and
ambari-agent restart (aonishuk)
* AMBARI-25464. Components autostart does not work sometimes and
ambari-agent restart (aonishuk)
---
ambari-agent/src/main/python/ambari_agent/ClusterCache.py | 9 +++++----
.../main/python/ambari_agent/ClusterConfigurationCache.py | 9 ++++++++-
.../main/python/ambari_agent/ClusterHostLevelParamsCache.py | 13 ++++++++++++-
.../src/main/python/ambari_agent/InitializerModule.py | 10 ++++------
.../src/main/python/ambari_agent/RecoveryManager.py | 7 ++-----
.../ambari_agent/listeners/ConfigurationEventListener.py | 5 -----
.../ambari_agent/listeners/HostLevelParamsEventListener.py | 9 ---------
7 files changed, 31 insertions(+), 31 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 2e13f16..50ad8ee 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -75,6 +75,7 @@ class ClusterCache(dict):
# Example: hostname change and restart causes old topology loading to
fail with exception
logger.exception("Loading saved cache for {0}
failed".format(self.__class__.__name__))
self.rewrite_cache({}, None)
+ os.remove(self.__current_cache_hash_file)
def get_cluster_indepedent_data(self):
return self[ClusterCache.COMMON_DATA_CLUSTER]
@@ -99,7 +100,7 @@ class ClusterCache(dict):
del self[cache_id_to_delete]
self.on_cache_update()
- self.persist_cache()
+ self.persist_cache(cache_hash)
# if all of above are sucessful finally set the hash
self.hash = cache_hash
@@ -131,7 +132,7 @@ class ClusterCache(dict):
with self._cache_lock:
self[cluster_id] = immutable_cache
- def persist_cache(self):
+ def persist_cache(self, cache_hash):
# ensure that our cache directory exists
if not os.path.exists(self.cluster_cache_dir):
os.makedirs(self.cluster_cache_dir)
@@ -140,9 +141,9 @@ class ClusterCache(dict):
with open(self.__current_cache_json_file, 'w') as f:
json.dump(self, f, indent=2)
- if self.hash is not None:
+ if cache_hash is not None:
with open(self.__current_cache_hash_file, 'w') as fp:
- fp.write(self.hash)
+ fp.write(cache_hash)
def _get_mutable_copy(self):
with self._cache_lock:
diff --git
a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
index 677fff2..1bdc581 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
@@ -30,13 +30,20 @@ class ClusterConfigurationCache(ClusterCache):
configuration properties.
"""
- def __init__(self, cluster_cache_dir):
+ def __init__(self, cluster_cache_dir, initializer_module):
"""
Initializes the configuration cache.
:param cluster_cache_dir: directory the changed json are saved
:return:
"""
+ self.initializer_module = initializer_module
super(ClusterConfigurationCache, self).__init__(cluster_cache_dir)
+
+ def on_cache_update(self):
+ for cluster_id, configurations in self.iteritems():
+ # FIXME: Recovery manager does not support multiple cluster as of now.
+ self.initializer_module.recovery_manager.cluster_id = cluster_id
+ self.initializer_module.recovery_manager.on_config_update(configurations)
def get_cache_name(self):
return 'configurations'
diff --git
a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
index 3e490c5..9dd062f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py
@@ -33,13 +33,24 @@ class ClusterHostLevelParamsCache(ClusterCache):
differently for every host.
"""
- def __init__(self, cluster_cache_dir):
+ def __init__(self, cluster_cache_dir, initializer_module):
"""
Initializes the host level params cache.
:param cluster_cache_dir:
:return:
"""
+ self.initializer_module = initializer_module
super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir)
+ def on_cache_update(self):
+ for cluster_id, host_level_params in self.iteritems():
+ # FIXME: Recovery manager does not support multiple cluster as of now.
+ if 'recoveryConfig' in host_level_params:
+ logger.info("Updating recoveryConfig from hostLevelParams")
+
+ self.initializer_module.recovery_manager.cluster_id = cluster_id
+
self.initializer_module.recovery_manager.update_recovery_config(host_level_params)
+
+
def get_cache_name(self):
return 'host_level_params'
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index b15aaec..5ef4ecc 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -76,26 +76,24 @@ class InitializerModule:
self.action_queue = None
self.alert_scheduler_handler = None
- self.init()
-
def init(self):
"""
Initialize properties
"""
self.is_registered = False
+ self.recovery_manager = RecoveryManager(self)
self.metadata_cache = ClusterMetadataCache(self.config.cluster_cache_dir)
self.topology_cache = ClusterTopologyCache(self.config.cluster_cache_dir,
self.config)
- self.host_level_params_cache =
ClusterHostLevelParamsCache(self.config.cluster_cache_dir)
- self.configurations_cache =
ClusterConfigurationCache(self.config.cluster_cache_dir)
+ self.host_level_params_cache =
ClusterHostLevelParamsCache(self.config.cluster_cache_dir, self)
+ self.configurations_cache =
ClusterConfigurationCache(self.config.cluster_cache_dir, self)
self.alert_definitions_cache =
ClusterAlertDefinitionsCache(self.config.cluster_cache_dir)
self.configuration_builder = ConfigurationBuilder(self)
self.stale_alerts_monitor = StaleAlertsMonitor(self)
self.server_responses_listener = ServerResponsesListener(self)
self.file_cache = FileCache(self.config)
- self.customServiceOrchestrator = CustomServiceOrchestrator(self)
self.hooks_orchestrator = HooksOrchestrator(self)
- self.recovery_manager = RecoveryManager(self)
+ self.customServiceOrchestrator = CustomServiceOrchestrator(self)
self.commandStatuses = CommandStatusDict(self)
self.init_threads()
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index ba6507c..66da323 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -98,7 +98,6 @@ class RecoveryManager:
self.active_command_count = 0
self.cluster_id = None
self.initializer_module = initializer_module
- self.host_level_params_cache = initializer_module.host_level_params_cache
self.actions = {}
self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only,
auto_install_start)
@@ -113,7 +112,7 @@ class RecoveryManager:
def is_blueprint_provisioning_for_component(self, component_name):
try:
- blueprint_state =
self.host_level_params_cache[self.cluster_id]['blueprint_provisioning_state'][component_name]
+ blueprint_state =
self.initializer_module.host_level_params_cache[self.cluster_id]['blueprint_provisioning_state'][component_name]
except KeyError:
blueprint_state = 'NONE'
@@ -481,7 +480,7 @@ class RecoveryManager:
self.enabled_components = enabled_components_list
- def on_config_update(self):
+ def on_config_update(self, cluster_cache):
recovery_enabled = False
auto_start_only = False
auto_install_start = False
@@ -490,8 +489,6 @@ class RecoveryManager:
retry_gap = 5
max_lifetime_count = 12
- cluster_cache =
self.initializer_module.configurations_cache[self.cluster_id]
-
if 'configurations' in cluster_cache and 'cluster-env' in
cluster_cache['configurations']:
config = cluster_cache['configurations']['cluster-env']
if "recovery_type" in config:
diff --git
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index e5309bb..295a411 100644
---
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -50,11 +50,6 @@ class ConfigurationEventListener(EventListener):
self.configurations_cache.rewrite_cache(message['clusters'],
message['hash'])
- if message['clusters']:
- # FIXME: Recovery manager does not support multiple cluster as of now.
- self.recovery_manager.cluster_id = message['clusters'].keys()[0]
- self.recovery_manager.on_config_update()
-
def get_handled_path(self):
return Constants.CONFIGURATIONS_TOPIC
diff --git
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
index 18c1252..0a3eee5 100644
---
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
+++
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -48,14 +48,5 @@ class HostLevelParamsEventListener(EventListener):
self.host_level_params_cache.rewrite_cache(message['clusters'],
message['hash'])
- if message['clusters']:
- # FIXME: Recovery manager does not support multiple cluster as of now.
- cluster_id = message['clusters'].keys()[0]
-
- if 'recoveryConfig' in message['clusters'][cluster_id]:
- logging.info("Updating recoveryConfig from hostLevelParams")
- self.recovery_manager.cluster_id = cluster_id
-
self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id])
-
def get_handled_path(self):
return Constants.HOST_LEVEL_PARAMS_TOPIC
\ No newline at end of file