Repository: ambari Updated Branches: refs/heads/trunk 8d27ac2b7 -> 7330d0eac
AMBARI-9154. Upgrade pack for Flume (Yurii Shylov via ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7330d0ea Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7330d0ea Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7330d0ea Branch: refs/heads/trunk Commit: 7330d0eacc2e825a441cddbfa1684e4aae6557f9 Parents: 8d27ac2 Author: Nate Cole <[email protected]> Authored: Fri Jan 16 15:56:47 2015 -0500 Committer: Nate Cole <[email protected]> Committed: Fri Jan 16 15:56:47 2015 -0500 ---------------------------------------------------------------------- .../1.4.0.2.0/package/scripts/flume_handler.py | 19 ++++ .../1.4.0.2.0/package/scripts/flume_upgrade.py | 96 ++++++++++++++++++++ .../FLUME/1.4.0.2.0/package/scripts/params.py | 3 + .../stacks/HDP/2.2/upgrades/upgrade-2.2.xml | 26 +++++- .../python/stacks/2.0.6/FLUME/test_flume.py | 11 +++ .../python/stacks/2.0.6/configs/flume_22.json | 1 + 6 files changed, 152 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py index 849fcf1..a24e6ce 100644 --- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_handler.py @@ -17,12 +17,15 @@ limitations under the License. """ +import flume_upgrade + from flume import flume from flume import get_desired_state from resource_management import * from resource_management.libraries.functions.flume_agent_helper import find_expected_agent_names from resource_management.libraries.functions.flume_agent_helper import get_flume_status +from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version class FlumeHandler(Script): @@ -50,6 +53,9 @@ class FlumeHandler(Script): flume(action='stop') + if rolling_restart: + flume_upgrade.post_stop_backup() + def configure(self, env): import params @@ -79,5 +85,18 @@ class FlumeHandler(Script): elif len(expected_agents) == 0 and 'INSTALLED' == get_desired_state(): raise ComponentIsNotRunning() + def pre_rolling_restart(self, env): + import params + env.set_params(params) + + # this function should not execute if the version can't be determined or + # is not at least HDP 2.2.0.0 + if not params.version or compare_versions(format_hdp_stack_version(params.version), '2.2.0.0') < 0: + return + + Logger.info("Executing Flume Rolling Upgrade pre-restart") + Execute(format("hdp-select set flume-server {version}")) + flume_upgrade.pre_start_restore() + if __name__ == "__main__": FlumeHandler().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py new file mode 100644 index 0000000..e2028fc --- /dev/null +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/flume_upgrade.py @@ -0,0 +1,96 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import shutil +import tarfile +import tempfile + +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail + +BACKUP_TEMP_DIR = "flume-upgrade-backup" +BACKUP_CONF_DIR_ARCHIVE = "flume-conf-backup.tar" + +def post_stop_backup(): + """ + Backs up the flume config, config dir, file/spillable channels as part of the + upgrade process. + :return: + """ + Logger.info('Backing up Flume data and configuration before upgrade...') + directoryMappings = _get_directory_mappings() + + absolute_backup_dir = os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR) + if not os.path.isdir(absolute_backup_dir): + os.makedirs(absolute_backup_dir) + + for directory in directoryMappings: + if not os.path.isdir(directory): + raise Fail("Unable to backup missing directory {0}".format(directory)) + + archive = os.path.join(absolute_backup_dir, directoryMappings[directory]) + Logger.info('Compressing {0} to {1}'.format(directory, archive)) + + if os.path.exists(archive): + os.remove(archive) + + tarball = None + try: + tarball = tarfile.open(archive, "w") + tarball.add(directory, arcname=os.path.basename(directory)) + finally: + if tarball: + tarball.close() + +def pre_start_restore(): + """ + Restores the flume config, config dir, file/spillable channels to their proper locations + after an upgrade has completed. + :return: + """ + Logger.info('Restoring Flume data and configuration after upgrade...') + directoryMappings = _get_directory_mappings() + + for directory in directoryMappings: + archive = os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR, + directoryMappings[directory]) + + if os.path.isfile(archive): + Logger.info('Extracting {0} to {1}'.format(archive, directory)) + tarball = None + try: + tarball = tarfile.open(archive, "r") + tarball.extractall(directory) + finally: + if tarball: + tarball.close() + + # cleanup + if os.path.exists(os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR)): + shutil.rmtree(os.path.join(tempfile.gettempdir(), BACKUP_TEMP_DIR)) + +def _get_directory_mappings(): + """ + Gets a dictionary of directory to archive name that represents the + directories that need to be backed up and their output tarball archive targets + :return: the dictionary of directory to tarball mappings + """ + import params + + return { params.flume_conf_dir : BACKUP_CONF_DIR_ARCHIVE} http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py index 28c4240..efa22d4 100644 --- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py @@ -25,6 +25,9 @@ config = Script.get_config() stack_name = default("/hostLevelParams/stack_name", None) +# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade +version = default("/commandParams/version", None) + user_group = config['configurations']['cluster-env']['user_group'] proxyuser_group = config['configurations']['hadoop-env']['proxyuser_group'] http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml index 4a4b158..325df21 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml +++ b/ambari-server/src/main/resources/stacks/HDP/2.2/upgrades/upgrade-2.2.xml @@ -118,6 +118,20 @@ </service> </group> + <group name="SLIDER" title="Slider"> + <skippable>true</skippable> + <service name="SLIDER"> + <component>SLIDER</component> + </service> + </group> + + <group name="FLUME" title="Flume"> + <skippable>true</skippable> + <service name="FLUME"> + <component>FLUME_HANDLER</component> + </service> + </group> + <group name="CLIENTS" title="Client Components"> <service name="HDFS"> <component>HDFS_CLIENT</component> @@ -147,10 +161,6 @@ <component>HIVE_CLIENT</component> <component>HCAT</component> </service> - - <service name="SLIDER"> - <component>SLIDER</component> - </service> </group> <group xsi:type="cluster" name="POST_CLUSTER" title="Finalize Upgrade"> @@ -483,5 +493,13 @@ </post-upgrade> </component> </service> + + <service name="FLUME"> + <component name="FLUME_HANDLER"> + <upgrade> + <task xsi:type="restart" /> + </upgrade> + </component> + </service> </processing> </upgrade> http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py index 193eefb..ada29c2 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py +++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py @@ -410,6 +410,17 @@ class TestFlumeHandler(RMFTestCase): owner="flume", content=content) + def test_pre_rolling_restart(self): + self.executeScript(self.COMMON_SERVICES_PACKAGE_DIR + "/scripts/flume_handler.py", + classname = "FlumeHandler", + command = "pre_rolling_restart", + config_file="flume_22.json", + hdp_stack_version = self.STACK_VERSION, + target = RMFTestCase.TARGET_COMMON_SERVICES) + + self.assertResourceCalled("Execute", "hdp-select set flume-server 2.2.1.0-2067") + + def build_flume(content): result = {} agent_names = [] http://git-wip-us.apache.org/repos/asf/ambari/blob/7330d0ea/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json b/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json index 3a7aa33..909e965 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json +++ b/ambari-server/src/test/python/stacks/2.0.6/configs/flume_22.json @@ -23,6 +23,7 @@ "serviceName": "HIVE", "role": "HIVE_SERVER", "commandParams": { + "version": "2.2.1.0-2067", "command_timeout": "300", "service_package_folder": "OOZIE", "script_type": "PYTHON",
