Repository: ambari
Updated Branches:
  refs/heads/trunk 995fc0be4 -> 1046c8fec


AMBARI-15795. Parallel execution should only be allowed on commands that have 
auto retry enabled (smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1046c8fe
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1046c8fe
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1046c8fe

Branch: refs/heads/trunk
Commit: 1046c8fec72285dee07342afd8ff941a000a6718
Parents: 995fc0b
Author: Sumit Mohanty <[email protected]>
Authored: Mon Apr 11 15:49:23 2016 -0700
Committer: Sumit Mohanty <[email protected]>
Committed: Mon Apr 11 15:49:23 2016 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 22 +++++++--
 .../test/python/ambari_agent/TestActionQueue.py | 48 +++++++++++++++++++-
 2 files changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1046c8fe/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index d603566..ccae62c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -156,11 +156,23 @@ class ActionQueue(threading.Thread):
           # commands using separate threads
           while (True):
             command = self.commandQueue.get(True, 
self.EXECUTION_COMMAND_WAIT_TIME)
-            logger.info("Kicking off a thread for the command, id=" +
-                        str(command['commandId']) + " taskId=" + 
str(command['taskId']))
-            t = threading.Thread(target=self.process_command, args=(command,))
-            t.daemon = True
-            t.start()
+            # If command is not retry_enabled then do not start them in 
parallel
+            # checking just one command is enough as all commands for a stage 
is sent
+            # at the same time and retry is only enabled for initial 
start/install
+            retryAble = False
+            if 'command_retry_enabled' in command['commandParams']:
+              retryAble = command['commandParams']['command_retry_enabled'] == 
"true"
+            if retryAble:
+              logger.info("Kicking off a thread for the command, id=" +
+                          str(command['commandId']) + " taskId=" + 
str(command['taskId']))
+              t = threading.Thread(target=self.process_command, 
args=(command,))
+              t.daemon = True
+              t.start()
+            else:
+              self.process_command(command)
+              break;
+            pass
+          pass
       except (Queue.Empty):
         pass
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1046c8fe/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py 
b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 8bd5ddc..2adf4ed 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -59,7 +59,26 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'hostLevelParams': {},
     'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }}
+    'configurationTags':{'global' : { 'tag': 'v1' }},
+    'commandParams': {
+      'command_retry_enabled': 'true'
+    }
+  }
+
+  datanode_install_no_retry_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {},
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }},
+    'commandParams': {
+      'command_retry_enabled': 'false'
+    }
   }
 
   datanode_auto_start_command = {
@@ -124,8 +143,11 @@ class TestActionQueue(TestCase):
     'taskId': 7,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'commandParams': {
+      'command_retry_enabled': 'true'
     }
+  }
 
   status_command = {
     "serviceName" : 'HDFS',
@@ -883,6 +905,28 @@ class TestActionQueue(TestCase):
     self.assertEqual(2, process_command_mock.call_count)
     
process_command_mock.assert_any_calls([call(self.datanode_install_command), 
call(self.hbase_install_command)])
 
+  @patch("threading.Thread")
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
+                         process_command_mock, gpeo_mock, threading_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    gpeo_mock.return_value = 1
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.put([self.datanode_install_no_retry_command, 
self.snamenode_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.start()
+    time.sleep(1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not 
stopped.')
+    self.assertEqual(1, process_command_mock.call_count)
+    self.assertEqual(0, threading_mock.call_count)
+    
process_command_mock.assert_any_calls([call(self.datanode_install_command), 
call(self.hbase_install_command)])
 
   @not_for_platform(PLATFORM_LINUX)
   @patch("time.sleep")

Reply via email to