Repository: ambari Updated Branches: refs/heads/trunk b960e606e -> 37434acae
AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting component (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/37434aca Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/37434aca Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/37434aca Branch: refs/heads/trunk Commit: 37434acae16a5b5036321d9cef426eb08c342cbe Parents: b960e60 Author: Dmitry Sen <[email protected]> Authored: Mon Mar 17 21:49:17 2014 +0200 Committer: Dmitry Sen <[email protected]> Committed: Mon Mar 17 21:49:17 2014 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 9 ++- .../python/ambari_agent/ActualConfigHandler.py | 44 +++++++---- .../src/main/python/ambari_agent/LiveStatus.py | 6 +- .../ambari_agent/TestActualConfigHandler.py | 77 ++++++++++++++++++-- .../test/python/ambari_agent/TestLiveStatus.py | 15 ++-- 5 files changed, 114 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/37434aca/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 549651a..551b59e 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -28,7 +28,6 @@ import os from LiveStatus import LiveStatus from shell import shellRunner import PuppetExecutor -import PythonExecutor from ActualConfigHandler import ActualConfigHandler from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator @@ -75,6 +74,7 @@ class ActionQueue(threading.Thread): self.config = config self.controller = controller self.sh = shellRunner() + self.configTags = {} self._stop = threading.Event() self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, @@ -214,7 +214,7 @@ class ActionQueue(threading.Thread): roleResult['structuredOut'] = '' # let ambari know that configuration tags were applied if status == self.COMPLETED_STATUS: - configHandler = ActualConfigHandler(self.config) + configHandler = ActualConfigHandler(self.config, self.configTags) if command.has_key('configurationTags'): configHandler.write_actual(command['configurationTags']) roleResult['configurationTags'] = command['configurationTags'] @@ -226,7 +226,8 @@ class ActionQueue(threading.Thread): (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \ command['hostLevelParams'].has_key('custom_command') and \ command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)): - configHandler.copy_to_component(command['role']) + configHandler.write_actual_component(command['role'], command['configurationTags']) + configHandler.write_client_components(command['serviceName'], command['configurationTags']) roleResult['configurationTags'] = configHandler.read_actual_component(command['role']) self.commandStatuses.put_command_status(command, roleResult) @@ -248,7 +249,7 @@ class ActionQueue(threading.Thread): command_format = self.determine_command_format_version(command) livestatus = LiveStatus(cluster, service, component, - globalConfig, self.config) + globalConfig, self.config, self.configTags) component_status = None if command_format == self.COMMAND_FORMAT_V2: # For custom services, responsibility to determine service status is http://git-wip-us.apache.org/repos/asf/ambari/blob/37434aca/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py index 024c575..483b940 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py @@ -21,15 +21,16 @@ limitations under the License. import json import logging import os -import shutil +import LiveStatus logger = logging.getLogger() class ActualConfigHandler: CONFIG_NAME = 'config.json' - def __init__(self, config): - self.config = config; + def __init__(self, config, configTags): + self.config = config + self.configTags = configTags def findRunDir(self): runDir = '/var/run/ambari-agent' @@ -39,18 +40,28 @@ class ActualConfigHandler: runDir = '/tmp' return runDir - def write_actual(self, configTags): - runDir = self.findRunDir() - conf_file = open(os.path.join(runDir, self.CONFIG_NAME), 'w') - json.dump(configTags, conf_file) - conf_file.close() + def write_actual(self, tags): + self.write_file(self.CONFIG_NAME, tags) + + def write_actual_component(self, component, tags): + self.configTags[component] = tags + filename = component + "_" + self.CONFIG_NAME + self.write_file(filename, tags) - def copy_to_component(self, componentName): + def write_client_components(self, serviceName, tags): + for comp in LiveStatus.LiveStatus.CLIENT_COMPONENTS: + if comp['serviceName'] == serviceName: + componentName = comp['componentName'] + if componentName in self.configTags and \ + tags != self.configTags[componentName]: + self.write_actual_component(componentName, tags) + pass + + def write_file(self, filename, tags): runDir = self.findRunDir() - srcfile = os.path.join(runDir, self.CONFIG_NAME) - if os.path.isfile(srcfile): - dstfile = os.path.join(runDir, componentName + "_" + self.CONFIG_NAME) - shutil.copy(srcfile, dstfile) + conf_file = open(os.path.join(runDir, filename), 'w') + json.dump(tags, conf_file) + conf_file.close() def read_file(self, filename): runDir = self.findRunDir() @@ -75,6 +86,7 @@ class ActualConfigHandler: return self.read_file(self.CONFIG_NAME) def read_actual_component(self, componentName): - return self.read_file(componentName + "_" + self.CONFIG_NAME) - - + if componentName not in self.configTags.keys(): + self.configTags[componentName] = \ + self.read_file(componentName + "_" + self.CONFIG_NAME) + return self.configTags[componentName] http://git-wip-us.apache.org/repos/asf/ambari/blob/37434aca/ambari-agent/src/main/python/ambari_agent/LiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py index 6f9f4db..746f70a 100644 --- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py +++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py @@ -151,14 +151,16 @@ class LiveStatus: LIVE_STATUS = "STARTED" DEAD_STATUS = "INSTALLED" - def __init__(self, cluster, service, component, globalConfig, config): + def __init__(self, cluster, service, component, globalConfig, config, + configTags): self.cluster = cluster self.service = service self.component = component self.globalConfig = globalConfig versionsFileDir = config.get('agent', 'prefix') self.versionsHandler = StackVersionsFileHandler(versionsFileDir) - self.actualConfigHandler = ActualConfigHandler(config) + self.configTags = configTags + self.actualConfigHandler = ActualConfigHandler(config, configTags) def belongsToService(self, component): #TODO: Should also check belonging of server to cluster http://git-wip-us.apache.org/repos/asf/ambari/blob/37434aca/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py index e62dd1a..bfe48a5 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py @@ -23,7 +23,7 @@ from ambari_agent.AmbariConfig import AmbariConfig from ambari_agent.ActualConfigHandler import ActualConfigHandler import os import logging -import json +from mock.mock import patch class TestActualConfigHandler(TestCase): @@ -33,9 +33,9 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) - + tags = { "global": "version1", "core-site": "version2" } + handler = ActualConfigHandler(config, tags) handler.write_actual(tags) output = handler.read_actual() self.assertEquals(tags, output) @@ -45,7 +45,7 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) + handler = ActualConfigHandler(config, {}) conf_file = open(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME), 'w') conf_file.write("") @@ -59,14 +59,14 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) tags1 = { "global": "version1", "core-site": "version2" } + handler = ActualConfigHandler(config, {}) handler.write_actual(tags1) - handler.copy_to_component('FOO') + handler.write_actual_component('FOO', tags1) output1 = handler.read_actual_component('FOO') - output2 = handler.read_actual_component('GOO') + output2 = handler.read_actual_component('GOO') self.assertEquals(tags1, output1) self.assertEquals(None, output2) @@ -80,3 +80,66 @@ class TestActualConfigHandler(TestCase): self.assertEquals(tags1, output4) os.remove(os.path.join(tmpdir, "FOO_" + ActualConfigHandler.CONFIG_NAME)) os.remove(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME)) + + def test_write_actual_component_and_client_components(self): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags1 = { "global": "version1", "core-site": "version2" } + tags2 = { "global": "version33", "core-site": "version33" } + handler = ActualConfigHandler(config, {}) + handler.write_actual_component('HDFS_CLIENT', tags1) + handler.write_actual_component('HBASE_CLIENT', tags1) + self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT')) + handler.write_actual_component('DATANODE', tags2) + self.assertEquals(tags2, handler.read_actual_component('DATANODE')) + self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT')) + handler.write_client_components('HDFS', tags2) + self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT')) + + os.remove(os.path.join(tmpdir, "DATANODE_" + ActualConfigHandler.CONFIG_NAME)) + os.remove(os.path.join(tmpdir, "HBASE_CLIENT_" + ActualConfigHandler.CONFIG_NAME)) + os.remove(os.path.join(tmpdir, "HDFS_CLIENT_" + ActualConfigHandler.CONFIG_NAME)) + + @patch.object(ActualConfigHandler, "write_file") + def test_write_client_components(self, write_file_mock): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags0 = {"global": "version0", "core-site": "version0"} + tags1 = {"global": "version1", "core-site": "version2"} + tags2 = {"global": "version33", "core-site": "version33"} + configTags = {'HDFS_CLIENT': tags0, 'HBASE_CLIENT': tags1} + handler = ActualConfigHandler(config, configTags) + self.assertEquals(tags0, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT')) + handler.write_client_components('HDFS', tags2) + self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT')) + self.assertTrue(write_file_mock.called) + self.assertEqual(1, write_file_mock.call_count) + + @patch.object(ActualConfigHandler, "write_file") + @patch.object(ActualConfigHandler, "read_file") + def test_read_actual_component_inmemory(self, read_file_mock, write_file_mock): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags1 = { "global": "version1", "core-site": "version2" } + read_file_mock.return_value = tags1 + + handler = ActualConfigHandler(config, {}) + + handler.write_actual_component('NAMENODE', tags1) + self.assertTrue(write_file_mock.called) + self.assertEquals(tags1, handler.read_actual_component('NAMENODE')) + self.assertFalse(read_file_mock.called) + self.assertEquals(tags1, handler.read_actual_component('DATANODE')) + self.assertTrue(read_file_mock.called) + self.assertEquals(1, read_file_mock.call_count) + self.assertEquals(tags1, handler.read_actual_component('DATANODE')) + self.assertEquals(1, read_file_mock.call_count) http://git-wip-us.apache.org/repos/asf/ambari/blob/37434aca/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py index 96a21b9..345bd44 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py +++ b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py @@ -21,10 +21,9 @@ limitations under the License. from unittest import TestCase from ambari_agent.LiveStatus import LiveStatus from ambari_agent.AmbariConfig import AmbariConfig -import socket import os, sys, StringIO from ambari_agent import ActualConfigHandler -from mock.mock import patch, MagicMock, call +from mock.mock import patch import pprint from ambari_agent import StatusCheck @@ -46,7 +45,7 @@ class TestLiveStatus(TestCase): for component in LiveStatus.COMPONENTS: config = AmbariConfig().getConfig() config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files") - livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config) + livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config, {}) livestatus.versionsHandler.versionsFilePath = "ambari_agent" + os.sep + "dummy_files" + os.sep + "dummy_current_stack" result = livestatus.build() print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result)) @@ -57,23 +56,23 @@ class TestLiveStatus(TestCase): # Test build status for CLIENT component (in LiveStatus.CLIENT_COMPONENTS) read_actual_component_mock.return_value = "some tags" - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build() self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result.has_key('configurationTags')) # Test build status with forsed_component_status ## Alive - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) ## Dead - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.DEAD_STATUS) - livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) @@ -89,7 +88,7 @@ class TestLiveStatus(TestCase): config = AmbariConfig().getConfig() config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files") livestatus = LiveStatus('', 'SOME_UNKNOWN_SERVICE', - 'SOME_UNKNOWN_COMPONENT', {}, config) + 'SOME_UNKNOWN_COMPONENT', {}, config, {}) livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \ os.sep + "dummy_files" + os.sep + "dummy_current_stack" result = livestatus.build(forsed_component_status = "STARTED")
