This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 24147b6 AMBARI-25471. After node reboot autostart of components takes
too much time. (aonishuk)
24147b6 is described below
commit 24147b6191801fba23e2dfaf9ef6cf9a51d4670c
Author: Andrew Onishuk <[email protected]>
AuthorDate: Mon Feb 3 14:16:54 2020 +0200
AMBARI-25471. After node reboot autostart of components takes too much
time. (aonishuk)
---
.../src/main/python/ambari_agent/ActionQueue.py | 20 +++++++++++++++-----
.../python/ambari_agent/ComponentStatusExecutor.py | 2 +-
.../src/main/python/ambari_agent/RecoveryManager.py | 9 ++++++++-
.../src/test/python/ambari_agent/TestAlerts.py | 2 +-
.../test/python/ambari_agent/TestRecoveryManager.py | 1 +
5 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3403b98..eede227 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -58,6 +58,9 @@ class ActionQueue(threading.Thread):
# How much time(in seconds) we need wait for new incoming execution command
before checking status command queue
EXECUTION_COMMAND_WAIT_TIME = 2
+ # key name in command dictionary
+ IS_RECOVERY_COMMAND = "isRecoveryCommand"
+
def __init__(self, initializer_module):
super(ActionQueue, self).__init__()
self.commandQueue = Queue.Queue()
@@ -134,7 +137,11 @@ class ActionQueue(threading.Thread):
if command is None:
break
- self.process_command(command)
+ # Recovery commands should be run in parallel (since we don't know
the ordering on agent)
+ if self.IS_RECOVERY_COMMAND in command and
command[self.IS_RECOVERY_COMMAND]:
+ self.start_parallel_command(command)
+ else:
+ self.process_command(command)
else:
# If parallel execution is enabled, just kick off all available
# commands using separate threads
@@ -150,10 +157,7 @@ class ActionQueue(threading.Thread):
if 'commandParams' in command and 'command_retry_enabled' in
command['commandParams']:
retry_able = command['commandParams']['command_retry_enabled']
== "true"
if retry_able:
- logger.info("Kicking off a thread for the command, id={}
taskId={}".format(command['commandId'], command['taskId']))
- t = threading.Thread(target=self.process_command,
args=(command,))
- t.daemon = True
- t.start()
+ self.start_parallel_command(command)
else:
self.process_command(command)
break
@@ -165,6 +169,12 @@ class ActionQueue(threading.Thread):
logger.exception("ActionQueue thread failed with exception. Re-running
it")
logger.info("ActionQueue thread has successfully finished")
+ def start_parallel_command(self, command):
+ logger.info("Kicking off a thread for the command, id={}
taskId={}".format(command['commandId'], command['taskId']))
+ t = threading.Thread(target=self.process_command, args=(command,))
+ t.daemon = True
+ t.start()
+
def fill_recovery_commands(self):
if self.recovery_manager.enabled() and not
self.tasks_in_progress_or_pending():
self.put(self.recovery_manager.get_recovery_commands())
diff --git
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 7bf00df..e1fe52b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -112,7 +112,7 @@ class ComponentStatusExecutor(threading.Thread):
if result:
cluster_reports[cluster_id].append(result)
-
+ self.recovery_manager.statuses_computed_at_least_once = True
cluster_reports = self.discard_stale_reports(cluster_reports)
self.send_updates_to_server(cluster_reports)
except ConnectionIsAlreadyClosed: # server and agent disconnected during
sending data. Not an issue
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 66da323..6712997 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -45,6 +45,7 @@ class RecoveryManager:
HAS_STALE_CONFIG = "hasStaleConfigs"
EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
ROLE_COMMAND = "roleCommand"
+ IS_RECOVERY_COMMAND = "isRecoveryCommand"
COMMAND_ID = "commandId"
PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
@@ -101,6 +102,7 @@ class RecoveryManager:
self.actions = {}
self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only,
auto_install_start)
+ self.statuses_computed_at_least_once = False
def on_execution_command_start(self):
with self.__active_command_lock:
@@ -274,6 +276,10 @@ class RecoveryManager:
INSTALLED_FAILED --> INSTALLED
INSTALLED_FAILED --> STARTED
"""
+ # wait until all component statuses are computed
+ if not self.statuses_computed_at_least_once:
+ return []
+
commands = []
for component in self.statuses.keys():
if self.configured_for_recovery(component) and
self.requires_recovery(component) and self.may_execute(component):
@@ -659,7 +665,8 @@ class RecoveryManager:
self.COMMAND_TYPE: AgentCommand.auto_execution,
self.TASK_ID: command_id,
self.ROLE: component,
- self.COMMAND_ID: command_id
+ self.COMMAND_ID: command_id,
+ self.IS_RECOVERY_COMMAND: True
}
if component in self.__component_to_service_map:
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 47e38f0..4f0398e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -1403,7 +1403,7 @@ class TestAlerts(TestCase):
"""
with patch("__builtin__.open") as open_mock:
open_mock.side_effect = self.open_side_effect
- cluster_configuration = ClusterConfigurationCache("/tmp/test_cache")
+ cluster_configuration = ClusterConfigurationCache("/tmp/test_cache",
None)
return cluster_configuration
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index e5947cf..4c863ce 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -347,6 +347,7 @@ class _TestRecoveryManager(TestCase):
[1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
rm = RecoveryManager(MagicMock())
+ rm.statuses_computed_at_least_once = True
rm.update_config(5, 5, 0, 11, True, False, False)
command1 = copy.deepcopy(self.command)