Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 917898cdb -> d4cd91436


AMBARI-21401. Support cancel commands, some changes to status/command reports, 
bugfixes (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d4cd9143
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d4cd9143
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d4cd9143

Branch: refs/heads/branch-3.0-perf
Commit: d4cd914367bf2e1716454d5dd5775ddb989f0382
Parents: 917898c
Author: Andrew Onishuk <[email protected]>
Authored: Wed Jul 5 11:52:45 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Wed Jul 5 11:52:45 2017 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  4 +--
 .../python/ambari_agent/CommandStatusDict.py    | 12 +++++---
 .../ambari_agent/CommandStatusReporter.py       |  4 ++-
 .../ambari_agent/ComponentStatusExecutor.py     |  2 +-
 .../ambari_agent/CustomServiceOrchestrator.py   | 18 +++++------
 .../src/main/python/ambari_agent/Utils.py       | 17 +++++++++++
 .../listeners/CommandsEventListener.py          | 12 ++++++--
 .../ambari_agent/TestAgentStompResponses.py     | 14 ++++-----
 .../dummy_files/stomp/execution_commands.json   | 13 ++------
 .../stomp/topology_add_component.json           |  8 +++--
 .../stomp/topology_cache_expected.json          | 18 +++++++++--
 .../dummy_files/stomp/topology_create.json      | 32 +++++++++++++++-----
 12 files changed, 101 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index a470697..f2fc253 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -90,7 +90,7 @@ class ActionQueue(threading.Thread):
     for command in commands:
       if not command.has_key('serviceName'):
         command['serviceName'] = "null"
-      if command.has_key('clusterId'):
+      if not command.has_key('clusterId'):
         command['clusterId'] = "null"
 
       logger.info("Adding " + command['commandType'] + " for role " + \
@@ -302,7 +302,7 @@ class ActionQueue(threading.Thread):
         retryDuration -= delay  # allow one last attempt
         commandresult['stderr'] += "\n\nCommand failed. Retrying command 
execution ...\n\n"
         logger.info("Retrying command with taskId = {cid} after a wait of 
{delay}".format(cid=taskId, delay=delay))
-        command['commandBeingRetried'] = "true"
+        command['agentLevelParams']['commandBeingRetried'] = "true"
         time.sleep(delay)
         continue
       else:

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 133701f..ff526e3 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -21,6 +21,7 @@ limitations under the License.
 import logging
 import threading
 import copy
+from collections import defaultdict
 from Grep import Grep
 
 from ambari_agent import Constants
@@ -55,10 +56,10 @@ class CommandStatusDict():
       self.current_state[key] = (command, new_report)
       self.reported_reports.discard(key)
 
-    self.force_update_to_server([new_report])
+    self.force_update_to_server(command['clusterId'], new_report)
 
-  def force_update_to_server(self, reports):
-    self.initializer_module.connection.send(message=reports, 
destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+  def force_update_to_server(self, cluster_id, report):
+    self.initializer_module.connection.send(message={'clusters':{cluster_id: 
[report]}}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
 
   def get_command_status(self, taskId):
     with self.lock:
@@ -74,13 +75,14 @@ class CommandStatusDict():
     self.generated_reports = []
     from ActionQueue import ActionQueue
     with self.lock: # Synchronized
-      resultReports = []
+      resultReports = defaultdict(lambda:[])
       for key, item in self.current_state.items():
         command = item[0]
         report = item[1]
+        cluster_id = report['clusterId']
         if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, 
ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
           if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
-            resultReports.append(report)
+            resultReports[cluster_id].append(report)
             self.reported_reports.add(key)
           else:
             in_progress_report = self.generate_in_progress_report(command, 
report)

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index 6ee4474..5e5eb0d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -44,8 +44,10 @@ class CommandStatusReporter(threading.Thread):
       try:
         if self.initializer_module.is_registered:
           report = self.commandStatuses.generate_report()
+
           if report:
-            self.initializer_module.connection.send(message=report, 
destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+            self.initializer_module.connection.send(message={'clusters': 
report}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+
           self.commandStatuses.clear_reported_reports()
       except:
         logger.exception("Exception in CommandStatusReporter. Re-running it")

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 2ac904f..5e53ed8 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -105,7 +105,7 @@ class ComponentStatusExecutor(threading.Thread):
     if not cluster_reports or not self.initializer_module.is_registered:
       return
 
-    self.initializer_module.connection.send(message=cluster_reports, 
destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
+    self.initializer_module.connection.send(message={'clusters': 
cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT)
 
     for cluster_id, reports in cluster_reports.iteritems():
       for report in reports:

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index c0b20ed..6389878 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -31,6 +31,7 @@ from AgentException import AgentException
 from PythonExecutor import PythonExecutor
 from resource_management.libraries.functions.log_process_information import 
log_process_information
 from resource_management.core.utils import PasswordString
+from ambari_agent.Utils import Utils
 import subprocess
 import Constants
 import hostname
@@ -310,9 +311,9 @@ class CustomServiceOrchestrator():
     """
     try:
       command = self.generate_command(command_header)
-      script_type = command['script_type'] # TODO STOMP: take this from 
command?
-      script = command['componentLevelParams']['script']
-      timeout = int('300') # TODO STOMP: fix it
+      script_type = command['commandParams']['script_type']
+      script = command['commandParams']['script']
+      timeout = int(command['commandParams']['command_timeout'])
 
       server_url_prefix = command['clusterLevelParams']['jdk_location']
 
@@ -366,8 +367,7 @@ class CustomServiceOrchestrator():
         credentialStoreEnabled = 
(command['serviceLevelParams']['credentialStoreEnabled'] == "true")
 
       if credentialStoreEnabled == True:
-        # TODO STOMP: fix this with execution commands
-        if 'commandBeingRetried' not in command or 
command['commandBeingRetried'] != "true":
+        if 'commandBeingRetried' not in command['agentLevelParams'] or 
command['agentLevelParams']['commandBeingRetried'] != "true":
           self.generateJceks(command)
         else:
           logger.info("Skipping generation of jceks files as this is a retry 
of the command")
@@ -469,12 +469,9 @@ class CustomServiceOrchestrator():
       'serviceLevelParams': metadata_cache.serviceLevelParams[service_name],
       'hostLevelParams': host_level_params_cache,
       'componentLevelParams': component_dict.componentLevelParams,
-      'script_type': self.SCRIPT_TYPE_PYTHON
+      'commandParams': component_dict.commandParams
     }
     command_dict.update(configurations_cache)
-    #command_dict['componentLevelParams']['script'] = 
component_dict.statusCommandParams['script']
-    #command_dict['serviceLevelParams']['hooks_folder'] = 
metadata_cache['hooks_folder']
-    #command_dict['serviceLevelParams']['service_package_folder'] = 
component_dict.statusCommandParams['service_package_folder']
 
     command_dict['agentLevelParams'] = {
       'public_hostname': self.public_fqdn,
@@ -486,9 +483,8 @@ class CustomServiceOrchestrator():
         "use_system_proxy_settings": self.config.use_system_proxy_setting()
       }
     }
-    command = copy.copy(command_header)
-    command.update(command_dict)
 
+    command = Utils.update_nested(command_dict, command_header)
     return command
 
   def requestComponentStatus(self, command_header):

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py 
b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 46aa955..af70d77 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -19,6 +19,7 @@ limitations under the License.
 """
 import os
 import threading
+import collections
 from functools import wraps
 from ambari_agent.ExitHelper import ExitHelper
 
@@ -103,6 +104,22 @@ class Utils(object):
     return True
 
   @staticmethod
+  def update_nested(d, u):
+    """
+    Update the dictionary 'd' and its sub-dictionaries with values of 
dictionary 'u' and its sub-dictionaries.
+    """
+    for k, v in u.iteritems():
+      if isinstance(d, collections.Mapping):
+        if isinstance(v, collections.Mapping):
+          r = Utils.update_nested(d.get(k, {}), v)
+          d[k] = r
+        else:
+          d[k] = u[k]
+      else:
+        d = {k: u[k]}
+    return d
+
+  @staticmethod
   def make_immutable(value):
     if isinstance(value, ImmutableDictionary):
       return value

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index c3839cb..ae8d400 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -42,12 +42,18 @@ class CommandsEventListener(EventListener):
     """
     ""
     commands = []
+    cancel_commands = []
     for cluster_id in message['clusters'].keys():
       cluster_dict = message['clusters'][cluster_id]
-      for command in cluster_dict['commands']:
-        commands.append(command)
 
-    self.action_queue.put(commands)
+      if 'commands' in cluster_dict:
+        commands += cluster_dict['commands']
+      if 'cancelCommands' in cluster_dict:
+        cancel_commands += cluster_dict['cancelCommands']
+
+    with self.action_queue.lock:
+      self.action_queue.cancel(cancel_commands)
+      self.action_queue.put(commands)
 
   def get_handled_path(self):
     return Constants.COMMANDS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py 
b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index feaf7dd..26c40b3 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -136,13 +136,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'],
 'c6401.ambari.apache.org')
     
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'],
 ('STATUS',))
     
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'],
 '2181')
-    self.assertEquals(dn_install_in_progress_frame[0]['roleCommand'], 
'INSTALL')
-    self.assertEquals(dn_install_in_progress_frame[0]['role'], 'DATANODE')
-    self.assertEquals(dn_install_in_progress_frame[0]['status'], 'IN_PROGRESS')
-    self.assertEquals(dn_install_failed_frame[0]['status'], 'FAILED')
-    self.assertEquals(dn_recovery_in_progress_frame[0]['roleCommand'], 
'INSTALL')
-    self.assertEquals(dn_recovery_in_progress_frame[0]['role'], 'DATANODE')
-    self.assertEquals(dn_recovery_in_progress_frame[0]['status'], 
'IN_PROGRESS')
+    
self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['roleCommand'],
 'INSTALL')
+    
self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['role'], 
'DATANODE')
+    
self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['status'], 
'IN_PROGRESS')
+    self.assertEquals(dn_install_failed_frame['clusters']['0'][0]['status'], 
'FAILED')
+    
self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['roleCommand'],
 'INSTALL')
+    
self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['role'], 
'DATANODE')
+    
self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['status'], 
'IN_PROGRESS')
 
     
#============================================================================================
     
#============================================================================================

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 075699e..76dac1b 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -15,22 +15,15 @@
 
           },
           "commandParams":{
-            "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
-            "hooks_folder":"HDP/2.0.6/hooks",
-            "script":"scripts/datanode.py",
-            "phase":"INITIAL_INSTALL",
-            "max_duration_for_retries":"600",
-            "command_retry_enabled":"false",
-            "command_timeout":"1200",
-            "refresh_topology":"True",
-            "script_type":"PYTHON"
+            "command_retry_enabled": "true",
+            "refresh_topology":"true"
           }
         },
         {
           "requestId":6,
           "taskId":9,
           "commandId":0,
-          "clusterId": "null",
+          "clusterId": "0",
           "serviceName":"ZOOKEEPER",
           "role":"ZOOKEEPER_SERVER",
           "commandType":"EXECUTION_COMMAND",

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
index 1514516..d6deecf 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
@@ -14,8 +14,12 @@
           ],
           "componentLevelParams": {
             "unlimited_key_jce_required": "false",
-            "clientsToUpdateConfigs": "[\"*\"]",
-            "script":"scripts/snamenode.py"
+            "clientsToUpdateConfigs": "[\"*\"]"
+          },
+          "commandParams": {
+            "script": "scripts/namenode.py",
+            "script_type":"PYTHON",
+            "command_timeout":"1200"
           }
         }
       ]

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
index ff2b3fd..08bccd7 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
@@ -2,9 +2,13 @@
   "0": {
     "components": [
       {
+        "commandParams": {
+          "command_timeout": "1200",
+          "script": "scripts/datanode.py",
+          "script_type": "PYTHON"
+        },
         "componentLevelParams": {
           "clientsToUpdateConfigs": "[\"*\"]",
-          "script": "scripts/namenode.py",
           "unlimited_key_jce_required": "false"
         },
         "componentName": "DATANODE",
@@ -15,9 +19,13 @@
         "version": "2.6.0.3-8"
       },
       {
+        "commandParams": {
+          "command_timeout": "1200",
+          "script": "scripts/hdfs_client.py",
+          "script_type": "PYTHON"
+        },
         "componentLevelParams": {
           "clientsToUpdateConfigs": "[\"*\"]",
-          "script": "scripts/hdfs_client.py",
           "unlimited_key_jce_required": "false"
         },
         "componentName": "HDFS_CLIENT",
@@ -29,9 +37,13 @@
         "version": "2.6.0.3-8"
       },
       {
+        "commandParams": {
+          "command_timeout": "1200",
+          "script": "scripts/namenode.py",
+          "script_type": "PYTHON"
+        },
         "componentLevelParams": {
           "clientsToUpdateConfigs": "[\"*\"]",
-          "script": "scripts/snamenode.py",
           "unlimited_key_jce_required": "false"
         },
         "componentName": "SECONDARY_NAMENODE",

http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
index 6df4bc3..758fe6d 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
@@ -13,8 +13,12 @@
           ],
           "componentLevelParams": {
             "unlimited_key_jce_required": "false",
-            "clientsToUpdateConfigs": "[\"*\"]",
-            "script": "scripts/namenode.py"
+            "clientsToUpdateConfigs": "[\"*\"]"
+          },
+          "commandParams": {
+            "script": "scripts/namenode.py",
+            "script_type":"PYTHON",
+            "command_timeout":"1200"
           }
         },
         {
@@ -27,8 +31,12 @@
           ],
           "componentLevelParams": {
             "unlimited_key_jce_required": "false",
-            "clientsToUpdateConfigs": "[\"*\"]",
-            "script": "scripts/namenode.py"
+            "clientsToUpdateConfigs": "[\"*\"]"
+          },
+          "commandParams": {
+            "script": "scripts/datanode.py",
+            "script_type":"PYTHON",
+            "command_timeout":"1200"
           }
         },
         {
@@ -40,8 +48,12 @@
           ],
           "componentLevelParams": {
             "unlimited_key_jce_required": "false",
-            "clientsToUpdateConfigs": "[\"*\"]",
-            "script": "scripts/hdfs_client.py"
+            "clientsToUpdateConfigs": "[\"*\"]"
+          },
+          "commandParams": {
+            "script": "scripts/hdfs_client.py",
+            "script_type":"PYTHON",
+            "command_timeout":"1200"
           }
         }
       ],
@@ -71,8 +83,12 @@
           ],
           "componentLevelParams": {
             "unlimited_key_jce_required": "false",
-            "clientsToUpdateConfigs": "[\"*\"]",
-            "script": "scripts/namenode.py"
+            "clientsToUpdateConfigs": "[\"*\"]"
+          },
+          "commandParams": {
+            "script": "scripts/namenode.py",
+            "script_type":"PYTHON",
+            "command_timeout":"1200"
           }
         }
       ],

Reply via email to