This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 24147b6  AMBARI-25471. After node reboot autostart of components takes 
too much time. (aonishuk)
24147b6 is described below

commit 24147b6191801fba23e2dfaf9ef6cf9a51d4670c
Author: Andrew Onishuk <aonis...@hortonworks.com>
AuthorDate: Mon Feb 3 14:16:54 2020 +0200

    AMBARI-25471. After node reboot autostart of components takes too much 
time. (aonishuk)
---
 .../src/main/python/ambari_agent/ActionQueue.py      | 20 +++++++++++++++-----
 .../python/ambari_agent/ComponentStatusExecutor.py   |  2 +-
 .../src/main/python/ambari_agent/RecoveryManager.py  |  9 ++++++++-
 .../src/test/python/ambari_agent/TestAlerts.py       |  2 +-
 .../test/python/ambari_agent/TestRecoveryManager.py  |  1 +
 5 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 3403b98..eede227 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -58,6 +58,9 @@ class ActionQueue(threading.Thread):
   # How much time(in seconds) we need wait for new incoming execution command 
before checking status command queue
   EXECUTION_COMMAND_WAIT_TIME = 2
 
+  # key name in command dictionary
+  IS_RECOVERY_COMMAND = "isRecoveryCommand"
+
   def __init__(self, initializer_module):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
@@ -134,7 +137,11 @@ class ActionQueue(threading.Thread):
             if command is None:
               break
 
-            self.process_command(command)
+            # Recovery commands should be run in parallel (since we don't know 
the ordering on agent)
+            if self.IS_RECOVERY_COMMAND in command and 
command[self.IS_RECOVERY_COMMAND]:
+              self.start_parallel_command(command)
+            else:
+              self.process_command(command)
           else:
             # If parallel execution is enabled, just kick off all available
             # commands using separate threads
@@ -150,10 +157,7 @@ class ActionQueue(threading.Thread):
               if 'commandParams' in command and 'command_retry_enabled' in 
command['commandParams']:
                 retry_able = command['commandParams']['command_retry_enabled'] 
== "true"
               if retry_able:
-                logger.info("Kicking off a thread for the command, id={} 
taskId={}".format(command['commandId'], command['taskId']))
-                t = threading.Thread(target=self.process_command, 
args=(command,))
-                t.daemon = True
-                t.start()
+                self.start_parallel_command(command)
               else:
                 self.process_command(command)
                 break
@@ -165,6 +169,12 @@ class ActionQueue(threading.Thread):
         logger.exception("ActionQueue thread failed with exception. Re-running 
it")
     logger.info("ActionQueue thread has successfully finished")
 
+  def start_parallel_command(self, command):
+    logger.info("Kicking off a thread for the command, id={} 
taskId={}".format(command['commandId'], command['taskId']))
+    t = threading.Thread(target=self.process_command, args=(command,))
+    t.daemon = True
+    t.start()
+
   def fill_recovery_commands(self):
     if self.recovery_manager.enabled() and not 
self.tasks_in_progress_or_pending():
       self.put(self.recovery_manager.get_recovery_commands())
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 7bf00df..e1fe52b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -112,7 +112,7 @@ class ComponentStatusExecutor(threading.Thread):
               if result:
                 cluster_reports[cluster_id].append(result)
 
-
+        self.recovery_manager.statuses_computed_at_least_once = True
         cluster_reports = self.discard_stale_reports(cluster_reports)
         self.send_updates_to_server(cluster_reports)
       except ConnectionIsAlreadyClosed: # server and agent disconnected during 
sending data. Not an issue
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py 
b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 66da323..6712997 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -45,6 +45,7 @@ class RecoveryManager:
   HAS_STALE_CONFIG = "hasStaleConfigs"
   EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
   ROLE_COMMAND = "roleCommand"
+  IS_RECOVERY_COMMAND = "isRecoveryCommand"
   COMMAND_ID = "commandId"
   PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
   PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
@@ -101,6 +102,7 @@ class RecoveryManager:
 
     self.actions = {}
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, 
auto_install_start)
+    self.statuses_computed_at_least_once = False
 
   def on_execution_command_start(self):
     with self.__active_command_lock:
@@ -274,6 +276,10 @@ class RecoveryManager:
     INSTALLED_FAILED --> INSTALLED
     INSTALLED_FAILED --> STARTED
     """
+    # wait until all component statuses are computed
+    if not self.statuses_computed_at_least_once:
+      return []
+
     commands = []
     for component in self.statuses.keys():
       if self.configured_for_recovery(component) and 
self.requires_recovery(component) and self.may_execute(component):
@@ -659,7 +665,8 @@ class RecoveryManager:
         self.COMMAND_TYPE: AgentCommand.auto_execution,
         self.TASK_ID: command_id,
         self.ROLE: component,
-        self.COMMAND_ID: command_id
+        self.COMMAND_ID: command_id,
+        self.IS_RECOVERY_COMMAND: True
       }
 
       if component in self.__component_to_service_map:
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py 
b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
index 47e38f0..4f0398e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py
@@ -1403,7 +1403,7 @@ class TestAlerts(TestCase):
     """
     with patch("__builtin__.open") as open_mock:
       open_mock.side_effect = self.open_side_effect
-      cluster_configuration = ClusterConfigurationCache("/tmp/test_cache")
+      cluster_configuration = ClusterConfigurationCache("/tmp/test_cache", 
None)
       return cluster_configuration
 
 
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py 
b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index e5947cf..4c863ce 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -347,6 +347,7 @@ class _TestRecoveryManager(TestCase):
       [1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
 
     rm = RecoveryManager(MagicMock())
+    rm.statuses_computed_at_least_once = True
     rm.update_config(5, 5, 0, 11, True, False, False)
 
     command1 = copy.deepcopy(self.command)

Reply via email to