Repository: ambari Updated Branches: refs/heads/trunk 9e870b2cc -> 45de7eff7
AMBARI-5065 Rolling restart should also handle clients on the same machine as the restarting component (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/45de7eff Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/45de7eff Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/45de7eff Branch: refs/heads/trunk Commit: 45de7eff7c4587c72a8982ed0942f08105db7c3a Parents: 9e870b2 Author: Dmitry Sen <[email protected]> Authored: Thu Mar 13 20:04:28 2014 +0200 Committer: Dmitry Sen <[email protected]> Committed: Thu Mar 13 20:04:28 2014 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 8 +-- .../python/ambari_agent/ActualConfigHandler.py | 53 +++++++++----- .../src/main/python/ambari_agent/LiveStatus.py | 8 ++- .../ambari_agent/TestActualConfigHandler.py | 73 ++++++++++++++++++-- .../test/python/ambari_agent/TestLiveStatus.py | 15 ++-- 5 files changed, 119 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 549651a..b611027 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -28,7 +28,6 @@ import os from LiveStatus import LiveStatus from shell import shellRunner import PuppetExecutor -import PythonExecutor from ActualConfigHandler import ActualConfigHandler from CommandStatusDict import CommandStatusDict from CustomServiceOrchestrator import CustomServiceOrchestrator @@ -75,6 +74,7 @@ class ActionQueue(threading.Thread): self.config = config self.controller = controller self.sh = shellRunner() + self.configTags = {} self._stop = threading.Event() self.tmpdir = config.get('agent', 'prefix') self.customServiceOrchestrator = CustomServiceOrchestrator(config, @@ -214,7 +214,7 @@ class ActionQueue(threading.Thread): roleResult['structuredOut'] = '' # let ambari know that configuration tags were applied if status == self.COMPLETED_STATUS: - configHandler = ActualConfigHandler(self.config) + configHandler = ActualConfigHandler(self.config, self.configTags) if command.has_key('configurationTags'): configHandler.write_actual(command['configurationTags']) roleResult['configurationTags'] = command['configurationTags'] @@ -226,7 +226,7 @@ class ActionQueue(threading.Thread): (command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND and \ command['hostLevelParams'].has_key('custom_command') and \ command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART)): - configHandler.copy_to_component(command['role']) + configHandler.write_actual_component(command['role'], command['configurationTags']) roleResult['configurationTags'] = configHandler.read_actual_component(command['role']) self.commandStatuses.put_command_status(command, roleResult) @@ -248,7 +248,7 @@ class ActionQueue(threading.Thread): command_format = self.determine_command_format_version(command) livestatus = LiveStatus(cluster, service, component, - globalConfig, self.config) + globalConfig, self.config, self.configTags) component_status = None if command_format == self.COMMAND_FORMAT_V2: # For custom services, responsibility to determine service status is http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py index 024c575..3937f69 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/ActualConfigHandler.py @@ -21,15 +21,16 @@ limitations under the License. import json import logging import os -import shutil +import LiveStatus logger = logging.getLogger() class ActualConfigHandler: CONFIG_NAME = 'config.json' - def __init__(self, config): - self.config = config; + def __init__(self, config, configTags): + self.config = config + self.configTags = configTags def findRunDir(self): runDir = '/var/run/ambari-agent' @@ -39,18 +40,35 @@ class ActualConfigHandler: runDir = '/tmp' return runDir - def write_actual(self, configTags): - runDir = self.findRunDir() - conf_file = open(os.path.join(runDir, self.CONFIG_NAME), 'w') - json.dump(configTags, conf_file) - conf_file.close() + def write_actual(self, tags): + self.write_file(self.CONFIG_NAME, tags) + + def write_actual_component(self, component, tags): + self.configTags[component] = tags + filename = component + "_" + self.CONFIG_NAME + self.write_file(filename, tags) + self.write_client_components(tags) - def copy_to_component(self, componentName): + def write_client_components(self, tags): + for comp in LiveStatus.LiveStatus.CLIENT_COMPONENTS: + componentName = comp['componentName'] + if componentName in self.configTags.keys(): + tags_updated = False + for config_name in tags.keys(): + if config_name in self.configTags[componentName] and \ + tags[config_name] != self.configTags[componentName][config_name]: + self.configTags[componentName][config_name] = tags[config_name] + tags_updated = True + if tags_updated: + filename = componentName + "_" + self.CONFIG_NAME + self.write_file(filename, self.configTags[componentName]) + pass + + def write_file(self, filename, tags): runDir = self.findRunDir() - srcfile = os.path.join(runDir, self.CONFIG_NAME) - if os.path.isfile(srcfile): - dstfile = os.path.join(runDir, componentName + "_" + self.CONFIG_NAME) - shutil.copy(srcfile, dstfile) + conf_file = open(os.path.join(runDir, filename), 'w') + json.dump(tags, conf_file) + conf_file.close() def read_file(self, filename): runDir = self.findRunDir() @@ -74,7 +92,8 @@ class ActualConfigHandler: def read_actual(self): return self.read_file(self.CONFIG_NAME) - def read_actual_component(self, componentName): - return self.read_file(componentName + "_" + self.CONFIG_NAME) - - + def read_actual_component(self, component): + if component not in self.configTags.keys(): + self.configTags[component] = \ + self.read_file(component + "_" + self.CONFIG_NAME) + return self.configTags[component] http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/main/python/ambari_agent/LiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py index 6f9f4db..2a6959f 100644 --- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py +++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py @@ -151,14 +151,16 @@ class LiveStatus: LIVE_STATUS = "STARTED" DEAD_STATUS = "INSTALLED" - def __init__(self, cluster, service, component, globalConfig, config): + def __init__(self, cluster, service, component, globalConfig, config, + configTags): self.cluster = cluster self.service = service self.component = component self.globalConfig = globalConfig versionsFileDir = config.get('agent', 'prefix') self.versionsHandler = StackVersionsFileHandler(versionsFileDir) - self.actualConfigHandler = ActualConfigHandler(config) + self.configTags = configTags + self.actualConfigHandler = ActualConfigHandler(config, configTags) def belongsToService(self, component): #TODO: Should also check belonging of server to cluster @@ -194,7 +196,7 @@ class LiveStatus: "stackVersion": self.versionsHandler. read_stack_version(self.component) } - active_config = self.actualConfigHandler.read_actual_component(self.component) + active_config = self.actualConfigHandler.read_actual_component(self.component) # if not active_config is None: livestatus['configurationTags'] = active_config http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py index e62dd1a..a33bf32 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActualConfigHandler.py @@ -23,7 +23,7 @@ from ambari_agent.AmbariConfig import AmbariConfig from ambari_agent.ActualConfigHandler import ActualConfigHandler import os import logging -import json +from mock.mock import patch class TestActualConfigHandler(TestCase): @@ -33,9 +33,9 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) - + tags = { "global": "version1", "core-site": "version2" } + handler = ActualConfigHandler(config, tags) handler.write_actual(tags) output = handler.read_actual() self.assertEquals(tags, output) @@ -45,7 +45,7 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) + handler = ActualConfigHandler(config, {}) conf_file = open(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME), 'w') conf_file.write("") @@ -59,11 +59,11 @@ class TestActualConfigHandler(TestCase): config = AmbariConfig().getConfig() tmpdir = tempfile.gettempdir() config.set('agent', 'prefix', tmpdir) - handler = ActualConfigHandler(config) tags1 = { "global": "version1", "core-site": "version2" } + handler = ActualConfigHandler(config, {}) handler.write_actual(tags1) - handler.copy_to_component('FOO') + handler.write_actual_component('FOO', tags1) output1 = handler.read_actual_component('FOO') output2 = handler.read_actual_component('GOO') @@ -80,3 +80,64 @@ class TestActualConfigHandler(TestCase): self.assertEquals(tags1, output4) os.remove(os.path.join(tmpdir, "FOO_" + ActualConfigHandler.CONFIG_NAME)) os.remove(os.path.join(tmpdir, ActualConfigHandler.CONFIG_NAME)) + + def test_write_actual_component(self): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags1 = { "global": "version1", "core-site": "version2" } + tags2 = { "global": "version33", "core-site": "version33" } + handler = ActualConfigHandler(config, {}) + handler.write_actual_component('HDFS_CLIENT', tags1) + handler.write_actual_component('HBASE_CLIENT', tags1) + self.assertEquals(tags1, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT')) + handler.write_actual_component('DATANODE', tags2) + self.assertEquals(tags2, handler.read_actual_component('DATANODE')) + self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT')) + + os.remove(os.path.join(tmpdir, "DATANODE_" + ActualConfigHandler.CONFIG_NAME)) + os.remove(os.path.join(tmpdir, "HBASE_CLIENT_" + ActualConfigHandler.CONFIG_NAME)) + os.remove(os.path.join(tmpdir, "HDFS_CLIENT_" + ActualConfigHandler.CONFIG_NAME)) + + @patch.object(ActualConfigHandler, "write_file") + def test_write_client_components(self, write_file_mock): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags0 = {"global": "version0", "core-site": "version0"} + tags1 = {"global": "version1", "core-site": "version2"} + tags2 = {"global": "version33", "core-site": "version33"} + configTags = {'HDFS_CLIENT': tags0, 'HBASE_CLIENT': tags1} + handler = ActualConfigHandler(config, configTags) + self.assertEquals(tags0, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags1, handler.read_actual_component('HBASE_CLIENT')) + handler.write_client_components(tags2) + self.assertEquals(tags2, handler.read_actual_component('HDFS_CLIENT')) + self.assertEquals(tags2, handler.read_actual_component('HBASE_CLIENT')) + self.assertTrue(write_file_mock.called) + self.assertEqual(2, write_file_mock.call_count) + + @patch.object(ActualConfigHandler, "write_file") + @patch.object(ActualConfigHandler, "read_file") + def test_read_actual_component_inmemory(self, read_file_mock, write_file_mock): + config = AmbariConfig().getConfig() + tmpdir = tempfile.gettempdir() + config.set('agent', 'prefix', tmpdir) + + tags1 = { "global": "version1", "core-site": "version2" } + read_file_mock.return_value = tags1 + + handler = ActualConfigHandler(config, {}) + + handler.write_actual_component('NAMENODE', tags1) + self.assertTrue(write_file_mock.called) + self.assertEquals(tags1, handler.read_actual_component('NAMENODE')) + self.assertFalse(read_file_mock.called) + self.assertEquals(tags1, handler.read_actual_component('DATANODE')) + self.assertTrue(read_file_mock.called) + self.assertEquals(1, read_file_mock.call_count) + self.assertEquals(tags1, handler.read_actual_component('DATANODE')) + self.assertEquals(1, read_file_mock.call_count) http://git-wip-us.apache.org/repos/asf/ambari/blob/45de7eff/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py index 96a21b9..345bd44 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py +++ b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py @@ -21,10 +21,9 @@ limitations under the License. from unittest import TestCase from ambari_agent.LiveStatus import LiveStatus from ambari_agent.AmbariConfig import AmbariConfig -import socket import os, sys, StringIO from ambari_agent import ActualConfigHandler -from mock.mock import patch, MagicMock, call +from mock.mock import patch import pprint from ambari_agent import StatusCheck @@ -46,7 +45,7 @@ class TestLiveStatus(TestCase): for component in LiveStatus.COMPONENTS: config = AmbariConfig().getConfig() config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files") - livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config) + livestatus = LiveStatus('', component['serviceName'], component['componentName'], {}, config, {}) livestatus.versionsHandler.versionsFilePath = "ambari_agent" + os.sep + "dummy_files" + os.sep + "dummy_current_stack" result = livestatus.build() print "LiveStatus of {0}: {1}".format(component['serviceName'], str(result)) @@ -57,23 +56,23 @@ class TestLiveStatus(TestCase): # Test build status for CLIENT component (in LiveStatus.CLIENT_COMPONENTS) read_actual_component_mock.return_value = "some tags" - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build() self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result.has_key('configurationTags')) # Test build status with forsed_component_status ## Alive - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) ## Dead - livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.DEAD_STATUS) - livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config) + livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {}) result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) @@ -89,7 +88,7 @@ class TestLiveStatus(TestCase): config = AmbariConfig().getConfig() config.set('agent', 'prefix', "ambari_agent" + os.sep + "dummy_files") livestatus = LiveStatus('', 'SOME_UNKNOWN_SERVICE', - 'SOME_UNKNOWN_COMPONENT', {}, config) + 'SOME_UNKNOWN_COMPONENT', {}, config, {}) livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \ os.sep + "dummy_files" + os.sep + "dummy_current_stack" result = livestatus.build(forsed_component_status = "STARTED")
