http://git-wip-us.apache.org/repos/asf/ambari/blob/af1bf85c/ambari-server/src/main/python/upgradeHelper.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/upgradeHelper.py b/ambari-server/src/main/python/upgradeHelper.py deleted file mode 100644 index 31aa721..0000000 --- a/ambari-server/src/main/python/upgradeHelper.py +++ /dev/null @@ -1,2338 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -""" -Upgrade catalog file format description: - - -Format version 1.0 - -Global section description: - STACKNAME - name of stack, for example HDP - OLDVERSION - version of stack from which upgrade should be done, used by fromStack script argument - NEWVERSION - version of stack to which upgrade should be done, used by toStack script argument - -Sub-section options: - config-types - contains global per-config settings - merged-copy - would merge latest server properties with properties defined in "properties" section, - without this option server properties would be rewritten by properties defined in "properties" section - required-services - properties from json catalog would be processed only if desired services are present on the cluster - property level definition will always override catalog level definition. - -Sub-section properties - Contains property definition -Sub-section property-mapping(optional) - contains mapping of property names in case, if some property changed their name in NEWVERSION - -Example: - -{ - "version": "1.0", - "stacks": [ - { - "name": "STACKNAME", - "old-version": "OLDVERSION", - "target-version": "NEWVERSION", - "options": { - "config-types": { - "CONFIGTYPE1": { - "merged-copy": "yes", - "required-services": ["HDFS"] - } - } - }, - "properties": { - "CONFIGTYPE1": { - "some_property": "some property value", - "some_second_property: { - "remove": "yes" - }, - "template_property": { - "value": "{TEMPLATE_TAG}", - "template": "yes", - "required-services": ["HDFS", "YARN"] - }, - "test_property": { - "value": "new value", - "override: "no", (optional, override already existed property yes/no. Default: yes) - "value-required": "old value", (optional, property would be set if the required value is present) - "can-create": "no", (optional, process property only if that property present on the server. - i.e. ability to create new property. Default: yes) - "required-services": ["HDFS", "YARN"], (optional, process property only if selected services existed) - "resolve-dependency": "no" (optional, use Stack Advisor to get depended properties changes. Default: no) - } - } - }, - "property-mapping": { - "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" }) - "old-property1-name": { (usually key is an name of the property which need to be mapped, but in case of same - property should be set to unique name and "map-from" option used instead) - "map-from": "old property name", (optional, define property name which should be mapped) - "map-to": "new_property1_name", (optional, new property name. If not set, would be used old property name) - "from-catalog": "test", (optional, require "to-catalog. Source of old-property1-name) - "to-catalog": "test", (optional, require "from-catalog. Target of new_property1_name) - "default": "default value", (optional, if set and old property not exists, new one would be created with default value) - "template": "yes", (optional, template parsing for default option) - "coerce-to": "pre-defined type", (optional, convert value from one type to another. Types supported: - yaml-array - converts string item1,item2 to ['item1', 'item2'] - ) - "replace-from": "something", (optional, should be present both from and to. Replace 'from' value to 'to') - "replace-to": "something, - "required-services": ["YARN"], (optional, process entry if services in the list existed on the cluster - } - } - } - ] -} - -More examples available in ambari-server/src/main/resources/upgrade/catalog/ -""" - -import getpass -import optparse -from pprint import pprint -import re -import sys -import os.path -import logging -import time -import base64 -from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError - -try: - # try to import new simplejson version, which should be faster than outdated python 2.6 version - import ambari_simplejson as json -except ImportError: - import json - - -# ============================== -# Error classes definition -# ============================== -class FatalException(Exception): - def __init__(self, code, reason): - self.code = code - self.reason = reason - - def __str__(self): - return repr("Fatal exception: %s, exit code %s" % (self.reason, self.code)) - - def _get_message(self): - return str(self) - - -class ReadOnlyPropertyException(Exception): - def __str__(self): - return "Property is read-only" - - def _get_message(self): - return self.__str__() - - -class NotSupportedCatalogVersion(Exception): - def __init__(self, catalog_version): - self._version = catalog_version - - def __str__(self): - return "Version %s of loaded catalog not supported" % self._version - - def _get_message(self): - return self.__str__() - - message = property(__str__) - - -class CatalogNotFoundException(Exception): - pass - - -class TemplateProcessingException(Exception): - pass - - -class CatalogExistException(Exception): - pass - - -class PropertyNotFoundException(Exception): - pass - - -class StackNotFoundException(Exception): - pass - - -class MalformedPropertyDefinitionException(Exception): - pass - - -# ============================== -# Constant class definition -# ============================== -class Const(object): - def __new__(cls, *args, **kwargs): - raise Exception("Class couldn't be created") - - -class Options(Const): - # action commands - API_PROTOCOL = "http" - API_PORT = "8080" - - GET_MR_MAPPING_ACTION = "save-mr-mapping" - VERIFY_ACTION = "verify" - DELETE_MR_ACTION = "delete-mr" - ADD_YARN_MR2_ACTION = "add-yarn-mr2" - MODIFY_CONFIG_ACTION = "update-configs" - BACKUP_CONFIG_ACTION = "backup-configs" - INSTALL_YARN_MR2_ACTION = "install-yarn-mr2" - - MR_MAPPING_FILE = "mr_mapping" - CAPACITY_SCHEDULER_TAG = "capacity-scheduler" - REPLACE_JH_HOST_NAME_TAG = "REPLACE_JH_HOST" - REPLACE_RM_HOST_NAME_TAG = "REPLACE_RM_HOST" - REPLACE_WITH_TAG = "REPLACE_WITH_" - PHOENIX_QUERY_SERVER = "PHOENIX_QUERY_SERVER" - ZK_OPTIONS = "zoo.cfg" - KAFKA_BROKER_CONF = "kafka-broker" - RANGER_ADMIN = "admin-properties" - RANGER_USERSYNC = "usersync-properties" - RANGER_ENV = "ranger-env" - KAFKA_PORT = "port" - RANGER_EXTERNAL_URL = "policymgr_external_url" - ZK_CLIENTPORT = "clientPort" - DELETE_OLD_TAG = "DELETE_OLD" - - ZOOKEEPER_SERVER = "ZOOKEEPER_SERVER" - KAFKA_BROKER = "KAFKA_BROKER" - NAMENODE = "NAMENODE" - - MR_MAPPING = None - logger = None - server_config_factory = None - """:type : ServerConfigFactory""" - stack_advisor = None - """:type : StackAdvisor""" - ambari_server = None - """:type : AmbariServer""" - - # Api constants - ROOT_URL = None - CLUSTER_URL = None - COMPONENTS_FORMAT = None - TEZ_VIEW_URL = None - - # Curl options - CURL_PRINT_ONLY = None - CURL_WRITE_ONLY = None - - ARGS = None - OPTIONS = None - HOST = None - CLUSTER_NAME = None - - # for verify action - REPORT_FILE = None - - SERVICES = [] - - API_TOKENS = { - "user": None, - "pass": None - } - - HEADERS = { - 'X-Requested-By': 'upgradeHelper' - } - - @classmethod - def initialize(cls): - cls.ROOT_URL = '%s://%s:%s/api/v1' % (cls.API_PROTOCOL, cls.HOST, cls.API_PORT) - cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME - cls.COMPONENTS_URL = cls.CLUSTER_URL + "/components?fields=ServiceComponentInfo/total_count" - cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}" - cls.TEZ_VIEW_URL = cls.ROOT_URL + "/views/TEZ" - cls.STACKS_URL = cls.ROOT_URL + "/stacks" - cls.STACKS_VERSIONS_URL = cls.STACKS_URL + "/{0}/versions" - cls.STACK_ADVISOR_URL = cls.STACKS_VERSIONS_URL + "/{1}/recommendations" - cls.AMBARI_SERVER_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_SERVER" - cls.AMBARI_AGENTS_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_AGENT" - if cls.CLUSTER_NAME is not None and cls.HOST is not None: - cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services())) - - cls.ambari_server = AmbariServer() - if not cls.isPropertyAttributesSupported(): - cls.logger.warning("Property attributes not supported by current Ambari version") - - @classmethod - def isPropertyAttributesSupported(cls): - if cls.ambari_server.server_version[0] * 10 + cls.ambari_server.server_version[1] >= 17: - return True - return False - - @classmethod - def initialize_logger(cls, filename=None): - cls.logger = logging.getLogger('UpgradeHelper') - cls.logger.setLevel(logging.DEBUG) - - if filename is not None: - handler = logging.FileHandler(filename) - handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s')) - cls.logger.addHandler(handler) - cls.logger.info("") - cls.logger.info("Start new logging section") - - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) - cls.logger.addHandler(handler) - - -class CatConst(Const): - VERSION_TAG = "version" - STACK_VERSION_OLD = "old-version" - STACK_VERSION_TARGET = "target-version" - STACK_STAGS_TAG = "stacks" - STACK_NAME = "name" - CONFIG_OPTIONS = "options" - CONFIG_TYPES = "config-types" - STACK_PROPERTIES = "properties" - STACK_PROPERTIES_ATTRIBUTES = "properties_attributes" - PROPERTY_VALUE_TAG = "value" - VERSIONS_TAG = "versions" - PROPERTY_REMOVE_TAG = "remove" - PROPERTY_MAP_TO = "map-to" - PROPERTY_MAP_FROM = "map-from" - PROPERTY_FROM_CATALOG = "from-catalog" - PROPERTY_TO_CATALOG = "to-catalog" - PROPERTY_DEFAULT = "default" - MERGED_COPY_TAG = "merged-copy" - REQUIRED_SERVICES = "required-services" - COERCE_TO_PROPERTY_TAG = "coerce-to" - RESOLVE_DEPENDENCY_TAG = "resolve-dependency" - COERCE_YAML_OPTION_TAG = "yaml-array" - REPLACE_FROM_TAG = "replace-from" - REPLACE_TO_TAG = "replace-to" - OVERRIDE_TAG = "override" - ITEMS_TAG = "items" - TYPE_TAG = "type" - TRUE_TAG = "yes" - VALUE_REQUIRED_TAG = "value-required" - PROPERTY_CAN_CREATE_TAG = "can-create" - STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping" - VALUE_TEMPLATE_TAG = "template" - SEARCH_PATTERN = "(\{[^\{\}]+\})" # {XXXXX} - ACTION_COMMIT = "commit" - ACTION_RELOAD = "reload" - ACTION_RENAME_PROPERTY = "rename-property" - TEMPLATE_HANDLER = "template_handler" - - -# ============================== -# Catalog classes definition -# ============================== - -class AmbariServer(object): - def __init__(self): - Options.logger.info("Resolving Ambari server configuration ...") - self._get_server_info() - self._get_agents_info() - self._get_components() - - def _get_components(self): - info = curl(Options.COMPONENTS_URL, parse=True) - self._components = [] - if CatConst.ITEMS_TAG in info: - for item in info[CatConst.ITEMS_TAG]: - if "ServiceComponentInfo" in item and "total_count" in item["ServiceComponentInfo"] and \ - int(item["ServiceComponentInfo"]["total_count"]) > 0 and "component_name" in item["ServiceComponentInfo"]: - self._components.append(item["ServiceComponentInfo"]["component_name"]) - - def _get_server_info(self): - info = curl(Options.AMBARI_SERVER_URL, parse=True) - self._server_version = [0, 0, 0] - - if "RootServiceComponents" in info: - server_props = info["RootServiceComponents"] - ver = server_props["component_version"] if "component_version" in server_props else None - try: - self._server_version = list(map(lambda x: int(x), ver.split("."))) - except ValueError: - pass - - def _get_agents_info(self): - info = curl(Options.AMBARI_AGENTS_URL, parse=True) - self._agents = [] - if "hostComponents" in info: - agent_props = info["hostComponents"] - self._agents = list(map(lambda x: x["RootServiceHostComponents"]["host_name"], agent_props)) - - @property - def components(self): - return self._components - - @property - def server_version(self): - return self._server_version - - @property - def agent_hosts(self): - return self._agents - -class StackAdvisorFactory(object): - def __init__(self): - self._stack_info = self._load_stack_info() - - def _load_stack_versions(self, stack): - versions = curl(Options.STACKS_VERSIONS_URL.format(stack), parse=True) - if CatConst.ITEMS_TAG in versions: - versions = list(map(lambda x: x["Versions"]["stack_version"], versions[CatConst.ITEMS_TAG])) - - return versions - - def _load_stack_info(self): - stacks = curl(Options.STACKS_URL, parse=True) - if CatConst.ITEMS_TAG in stacks: - stacks = list(map(lambda x: x["Stacks"]["stack_name"], stacks["items"])) - else: - stacks = {} - - stacks_dict = {} - - for stack in stacks: - stacks_dict[stack] = self._load_stack_versions(stack) - - return stacks_dict - - def get_instance(self, stack, version): - sversion = Options.ambari_server.server_version - if sversion[0] * 10 + sversion[1] < 21: - Options.logger.warning("Ambari server version \"%s.%s.%s\" doesn't support property dependencies suggestion" % - (sversion[0], sversion[1], sversion[2])) - return BaseStackAdvisor(stack, version) - - if stack in self._stack_info and version in self._stack_info[stack]: - return StackAdvisor(stack, version) - else: - raise StackNotFoundException("Stack %s-%s not exist on the server" % (stack, version)) - -class StackAdvisorRequestProperty(object): - def __init__(self, catalog, property_name): - self._catalog = catalog - self._property_name = property_name - - @property - def catalog(self): - return self._catalog - - @property - def name(self): - return self._property_name - - def get_json(self): - return { - "type": self.catalog, - "name": self.name - } - - -class BaseStackAdvisor(object): - def __init__(self, stack, version): - self._req_url = Options.STACK_ADVISOR_URL.format(stack, version) - - def get_suggestion(self, cfg_factory, changed_properties): - return {} - - -class StackAdvisor(BaseStackAdvisor): - def __init__(self, stack, version): - super(StackAdvisor, self).__init__(stack, version) - - def _transform_properties(self, cfg_factory): - """ - Transform properties list to blueprint output format - :type cfg_factory: ServerConfigFactory - :rtype dict - """ - props = cfg_factory.get_json() - for cfg in props: - props[cfg] = { - "properties": props[cfg] - } - - return props - - def _from_blueprint_properties_transform(self, props): - """ - Transform SA response to dict - """ - for p in props: - rprop = {} - if "properties" in props[p] and props[p]["properties"] is not None: - rprop = props[p]["properties"] - if "property_attributes" in props[p]: - for property_attribute in props[p]["property_attributes"]: - if "delete" in props[p]["property_attributes"][property_attribute] and \ - props[p]["property_attributes"][property_attribute]["delete"] == "true": - rprop[property_attribute] = None - - props[p] = rprop - - return props - - def _generate_req_properties(self, properties): - rlist = [] - for item in properties: - if isinstance(item, StackAdvisorRequestProperty): - rlist.append(item.get_json()) - return rlist - - def get_suggestion(self, cfg_factory, changed_properties): - """ - :type cfg_factory: ServerConfigFactory - :type catalog_name str - :type changed_properties: list - :rtype dict - """ - request = { - "recommend": "configuration-dependencies", - "hosts": Options.ambari_server.agent_hosts, - "services": list(Options.SERVICES), - "changed_configurations": self._generate_req_properties(changed_properties), - "recommendations": { - "blueprint": { - "host_groups": [], - "configurations": self._transform_properties(cfg_factory), - "blueprint_cluster_binding": {} - } - } - } - response = curl(self._req_url, request_type="POST", data=request, parse=True) - if "resources" in response and isinstance(response["resources"], list) and len(response["resources"]) > 0: - response = response["resources"][0] - if "recommendations" in response and "blueprint" in response["recommendations"] and \ - "configurations" in response["recommendations"]["blueprint"]: - return self._from_blueprint_properties_transform(response["recommendations"]["blueprint"]["configurations"]) - - return {} - - - -class UpgradeCatalogFactory(object): - # versions of catalog which is currently supported - _supported_catalog_versions = ["1.0"] - - # private variables - _json_catalog = None - - def __init__(self, path): - self._load(path) - - def _load(self, path): - f = None - try: - f = open(path, 'r') - json_string = f.read() - self._json_catalog = json.loads(json_string) - self._parse_upgrade_catalog() - except IOError as e: - raise FatalException(e.errno, "Couldn't open upgrade catalog file %s: %s" % (path, e.strerror)) - except NotSupportedCatalogVersion as e: - raise FatalException(1, e.message) - except ValueError as e: - raise FatalException(1, "Malformed upgrade catalog: %s" % e.message) - finally: - try: - if f is not None: - f.close() - except IOError as e: - pass - - def _parse_upgrade_catalog(self): - catalog_version = None - if CatConst.VERSION_TAG in self._json_catalog: - catalog_version = self._json_catalog[CatConst.VERSION_TAG] - - if catalog_version is None or catalog_version not in self._supported_catalog_versions: - raise NotSupportedCatalogVersion(str(catalog_version)) - - def get_catalog(self, from_version=None, to_version=None): - search_version = { - CatConst.STACK_VERSION_OLD: from_version, - CatConst.STACK_VERSION_TARGET: to_version - } - - for stack in self._json_catalog[CatConst.STACK_STAGS_TAG]: - version = { - CatConst.STACK_VERSION_OLD: stack[CatConst.STACK_VERSION_OLD], - CatConst.STACK_VERSION_TARGET: stack[CatConst.STACK_VERSION_TARGET] - } - if version == search_version: - return UpgradeCatalog(catalog=stack, version=version) - - return None - - -class UpgradeCatalog(object): - # private variables - _json_catalog = None - _properties_catalog = None - _properties_map_catalog = None - _version = None - _search_pattern = None - _catalog_options = None - - def __init__(self, catalog=None, version=None): - self._handlers = {} - self._json_catalog = catalog - self._version = version - self._search_pattern = re.compile(CatConst.SEARCH_PATTERN) - - if CatConst.STACK_PROPERTIES in catalog: - self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES]) - - if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog: - self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]) - else: - self._properties_map_catalog = PropertyMapping() - - if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \ - and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]: - self._catalog_options = catalog[CatConst.CONFIG_OPTIONS] - - def add_handler(self, name, handler): - if name not in self._handlers: - self._handlers[name] = handler - - def _format_catalog_properties(self, properties): - """ - Transform properties from short form to normal one: - "property": "text" => "property": { "value": "text" } - :param properties: dict - :return: dict - """ - for config_item in properties: - cfg_item = properties[config_item] - - """ - case when "properties": { - "yarn-site": { - ..... - } - } - is set like "properties": { - "yarn-site": "" - } - """ - if not isinstance(cfg_item, dict): - raise MalformedPropertyDefinitionException("The property catalog '%s' definition error" % config_item) - - properties[config_item] = dict(zip( - cfg_item.keys(), - map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values()) - )) - return properties - - @property - def version(self): - return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET]) - - @property - def target_version(self): - return self._version[CatConst.STACK_VERSION_TARGET] - - @property - def source_version(self): - return self._version[CatConst.STACK_VERSION_OLD] - - def get_parsed_version(self): - """ - Get numeric representation of the version for comparation purposes - - Example: - 1.3-2.1 will be represented as { from: 13, to: 21 } - - :return: Numeric version - """ - v_from = self._version[CatConst.STACK_VERSION_OLD].split(".") - v_to = self._version[CatConst.STACK_VERSION_TARGET].split(".") - try: - v_from = int(v_from[0]) * 10 + int(v_from[1]) - v_to = int(v_to[0]) * 10 + int(v_to[1]) - except ValueError: - v_from = 0 - v_to = 0 - - version = { - "from": v_from, - "to": v_to - } - - return version - - @property - def name(self): - if CatConst.STACK_NAME in self._json_catalog: - return self._json_catalog[CatConst.STACK_NAME] - return "" - - @property - def mapping(self): - return self._properties_map_catalog - - @property - def items(self): - return self._properties_catalog - - @property - def options(self): - if CatConst.CONFIG_TYPES in self._catalog_options: - return self._catalog_options[CatConst.CONFIG_TYPES] - return {} - - @property - def action_handlers(self): - return self._handlers - - @property - def tag_search_pattern(self): - return self._search_pattern - - def __handle_remove_tag(self, name, catalog_item_name, catalog_property_item, properties): - """ - :type name str - :type catalog_item_name str - :type catalog_property_item dict - :type properties dict - """ - if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \ - catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \ - catalog_item_name in properties: - del properties[catalog_item_name] - - def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item): - """ - :type catalog_item_name str - :type catalog_property_item dict - """ - if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \ - CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[ - CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: - try: - parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER]( - self, - self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]), - catalog_property_item[CatConst.PROPERTY_VALUE_TAG] - ) - catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value - except TemplateProcessingException: - pass - - def __handle_add_new(self, name, catalog_item_name, catalog_property_item, properties): - """ - :type name str - :type catalog_item_name str - :type catalog_property_item dict - :type properties dict - """ - catalog_property_item = dict(catalog_property_item) - can_add_new = not (CatConst.PROPERTY_CAN_CREATE_TAG in catalog_property_item and - catalog_property_item[CatConst.PROPERTY_CAN_CREATE_TAG].upper() == "NO") - if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties and can_add_new: - self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) - properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] - - def __handle_change_existing(self, name, catalog_item_name, catalog_property_item, properties): - """ - :type name str - :type catalog_item_name str - :type catalog_property_item dict - :type properties dict - """ - catalog_property_item = dict(catalog_property_item) - can_override = True - - if CatConst.OVERRIDE_TAG in catalog_property_item and catalog_property_item[CatConst.OVERRIDE_TAG] != CatConst.TRUE_TAG: - can_override = False - - if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties and can_override: - self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) - properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] - return properties - - def __handle_dependency_tag(self, name, catalog_item_name, catalog_property_item, properties): - """ - :type name str - :type catalog_item_name str - :type catalog_property_item dict - :type properties dict - """ - if CatConst.RESOLVE_DEPENDENCY_TAG in catalog_property_item and \ - catalog_property_item[CatConst.RESOLVE_DEPENDENCY_TAG] == CatConst.TRUE_TAG: - sa_suggestions = Options.stack_advisor.get_suggestion(Options.server_config_factory, - [StackAdvisorRequestProperty(name, catalog_item_name)]) - for sa_catalog in sa_suggestions: - # create new config group if not existed - if sa_catalog not in Options.server_config_factory.items(): - Options.server_config_factory.create_config(sa_catalog) - - catalog_properties = Options.server_config_factory.get_config(sa_catalog).properties - for sa_property in sa_suggestions[sa_catalog]: - if sa_suggestions[sa_catalog][sa_property] is None and sa_property in catalog_properties: - print "rem %s:%s" % (sa_catalog, sa_property) - del catalog_properties[sa_property] - elif sa_suggestions[sa_catalog][sa_property] is not None: - catalog_properties[sa_property] = sa_suggestions[sa_catalog][sa_property] - - - def __can_handler_execute(self, catalog_options, catalog_property_item, property_item, properties): - """ - :type catalog_options dict - :type catalog_property_item str - :type property_item dict - :type properties dict - """ - can_process = True - - # process required services tag - required_list = None - - if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \ - isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list): - required_list = catalog_options[CatConst.REQUIRED_SERVICES] - - if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\ - isinstance(property_item[CatConst.REQUIRED_SERVICES], list): - required_list = property_item[CatConst.REQUIRED_SERVICES] - - if required_list is not None: - can_process = can_process and is_services_exists(required_list) - - if CatConst.VALUE_REQUIRED_TAG in property_item and property_item[CatConst.VALUE_REQUIRED_TAG] is not None and\ - CatConst.PROPERTY_VALUE_TAG in property_item and catalog_property_item in properties: - can_process = properties[catalog_property_item] == property_item[CatConst.VALUE_REQUIRED_TAG] - - return can_process - - def process_simple_transformations(self, name, properties): - """ - :type properties dict - :type name str - """ - tag_handlers = [ - self.__handle_add_new, - self.__handle_change_existing, - self.__handle_dependency_tag, - self.__handle_remove_tag - ] - # catalog has no update entries for this config group - if name not in self._properties_catalog: - return 0 - - catalog_item = self._properties_catalog[name] - for catalog_property_item in catalog_item.keys(): - catalog_options = self.options[name] if name in self.options else {} - if self.__can_handler_execute(catalog_options, catalog_property_item, catalog_item[catalog_property_item], properties): - for handler in tag_handlers: - handler(name, catalog_property_item, catalog_item[catalog_property_item], properties) - - -class PropertyMapping(object): - _mapping_list = {} - - def __init__(self, map_list=None): - if map_list is not None: - self._mapping_list = self._convert_list(map_list) - - def _convert_list(self, map_list): - return dict(zip( - map_list.keys(), - map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values()) - )) - - def get(self, old_property_name): - """ - Get property mapping dict - :old_property_name str - :return dict - """ - if old_property_name in self._mapping_list: - return self._mapping_list[old_property_name] - - raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name) - - def list(self): - return self._mapping_list.keys() - - def get_mapped_name(self, old_property_name): - if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name): - raise MalformedPropertyDefinitionException("%s option is not set for %s property" % - (CatConst.PROPERTY_MAP_TO, old_property_name)) - return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO] - - def exists(self, old_property_name): - return old_property_name in self._mapping_list - - -class ServerConfigFactory(object): - def __init__(self): - self.__observers = [] - self._server_catalogs = {} - self._load_configs() - - def subscribe(self, name, config_item): - self.__observers.append((name, config_item)) - - def _load_configs(self): - Options.logger.info('Getting latest cluster configuration from the server...') - new_configs = get_config_resp_all() - for config_item in new_configs: - if config_item in self._server_catalogs: - self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item]) - else: - self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item]) - - def notify_observers(self, action, arg=None): - for name, config_item in self.__observers: - if config_item is not None and name in self._server_catalogs: - config_item.notify(action, arg) - - def notify_observer(self, _name, action, arg=None): - for name, config_item in self.__observers: - if config_item is not None and name == _name and name in self._server_catalogs: - config_item.notify(action, arg) - - def __str__(self): - catalogs = {} - for cfg in self._server_catalogs: - catalogs[cfg] = str(self._server_catalogs[cfg]) - - return json.dumps(catalogs) - - def get_json(self): - catalogs = {} - for cfg in self._server_catalogs: - catalogs[cfg] = self._server_catalogs[cfg].properties - - return catalogs - def get_config(self, name): - """ - Get configuration item object - :type name str - :rtype: ServerConfig - """ - if name in self._server_catalogs: - return self._server_catalogs[name] - - raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name) - - def create_config(self, name): - if name not in self._server_catalogs: - self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}}) - else: - raise CatalogExistException("Config group \"%s\" already existed" % name) - - def items(self): - return self._server_catalogs.keys() - - def reload(self): - self._load_configs() - - def process_mapping_transformations(self, catalog): - """ - :type catalog UpgradeCatalog - """ - for map_item in catalog.mapping.list(): - self._process_single_map_transformation(catalog, map_item, catalog.mapping.get(map_item)) - - def _process_default_template_map_replacement(self, catalog, item): - """ - :type catalog: UpgradeCatalog - :type item: dict - """ - if CatConst.VALUE_TEMPLATE_TAG in item and CatConst.TEMPLATE_HANDLER in catalog.action_handlers and\ - CatConst.PROPERTY_DEFAULT in item and item[CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: - try: - parsed_value = catalog.action_handlers[CatConst.TEMPLATE_HANDLER]( - catalog, - catalog.tag_search_pattern.findall(item[CatConst.PROPERTY_DEFAULT]), - item[CatConst.PROPERTY_DEFAULT] - ) - item[CatConst.PROPERTY_DEFAULT] = parsed_value - except TemplateProcessingException: - pass - - def _process_property_value_transformation(self, catalog, property_map_definition, old_value): - """ - :type catalog: UpgradeCatalog - :type property_map_definition: dict - :type old_value: str - :return: str - """ - - tmp = old_value - - if CatConst.REPLACE_FROM_TAG in property_map_definition and CatConst.REPLACE_TO_TAG in property_map_definition and\ - property_map_definition[CatConst.REPLACE_TO_TAG] is not None and property_map_definition[CatConst.REPLACE_FROM_TAG] is not None: - tmp = tmp.replace(property_map_definition[CatConst.REPLACE_FROM_TAG], property_map_definition[CatConst.REPLACE_TO_TAG]) - - if CatConst.COERCE_TO_PROPERTY_TAG in property_map_definition: - if property_map_definition[CatConst.COERCE_TO_PROPERTY_TAG] == CatConst.COERCE_YAML_OPTION_TAG: - # for example c6401,c6402 into ['c6401','c6402'] - data = list(map(lambda x: "'%s'" % x.strip(), tmp.split(','))) - tmp = "[%s]" % ','.join(data) - - return tmp - - def _process_single_map_transformation(self, catalog, map_item_name, map_property_item): - """ - :type catalog UpgradeCatalog - :type map_item_name str - :type map_property_item dict - """ - old_property_name = map_item_name - - # map-from item name could be re-defined via PROPERTY_MAP_FROM property to avoid duplicate entries - if CatConst.PROPERTY_MAP_FROM in map_property_item and map_property_item[CatConst.PROPERTY_MAP_FROM] is not None: - old_property_name = map_property_item[CatConst.PROPERTY_MAP_FROM] - - new_property_name = old_property_name - - if CatConst.PROPERTY_MAP_TO in map_property_item: - new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO] - - # process first required section - required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None - - # process required-services tag - if required_services is not None and not is_services_exists(required_services): - return 0 - - # process template tag - self._process_default_template_map_replacement(catalog, map_property_item) - - source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\ - map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None - target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \ - map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None - default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \ - map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None - - if source_cfg_group is None and target_cfg_group is None: # global scope mapping renaming - self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [old_property_name, new_property_name, - self._process_property_value_transformation, - catalog, - map_property_item - ]) - elif source_cfg_group is not None and target_cfg_group is not None: # group-to-group moving - if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs: - old_cfg_group = self.get_config(source_cfg_group).properties - new_cfg_group = self.get_config(target_cfg_group).properties - - if old_property_name in old_cfg_group: - new_cfg_group[new_property_name] = self._process_property_value_transformation(catalog, map_property_item, old_cfg_group[old_property_name]) - if new_property_name != old_property_name: - del old_cfg_group[old_property_name] - elif old_property_name not in old_cfg_group and default_value is not None: - new_cfg_group[new_property_name] = default_value - - def commit(self): - self.notify_observers(CatConst.ACTION_COMMIT) - - -class ServerConfig(object): - def __init__(self, factory, name, initial_configs): - """ - Initialize configuration item - :factory ServerConfigFactory - """ - factory.subscribe(name, self) - self._configs = initial_configs - self._hash = self._calculate_hash() - self._name = name - - def _calculate_hash(self): - return hash(str(self._configs)) - - def notify(self, action, arg=None): - if action == CatConst.ACTION_RELOAD: - self._configs = arg - self._hash = self._calculate_hash() - elif action == CatConst.ACTION_COMMIT: - self._commit() - elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 5: - self._rename_property(*arg) - - def _rename_property(self, old_name, new_name, transform_func, catalog, map_item): - """ - :type old_name: str - :type new_name: str - :type transform_func: function - :type catalog: UpgradeCatalog - :type map_item: dict - :return: - """ - if old_name in self.properties: - old_property_value = self.properties[old_name] - if transform_func is not None: - self.properties[new_name] = transform_func(catalog, map_item, old_property_value) - else: - self.properties[new_name] = old_property_value - - if old_name != new_name: - del self.properties[old_name] - - def is_attributes_exists(self): - return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs - - def __str__(self): - return json.dumps(self.properties) - - @property - def properties(self): - return self._configs[CatConst.STACK_PROPERTIES] - - @properties.setter - def properties(self, value): - self._configs[CatConst.STACK_PROPERTIES] = value - - @property - def attributes(self): - return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] - - @attributes.setter - def attributes(self, value): - self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value - - def _commit(self): - if self._hash != self._calculate_hash(): - Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name) - if self.is_attributes_exists(): - update_config(self.properties, self._name, self.attributes) - else: - update_config(self.properties, self._name) - - def clear(self): - self.properties = {} - self.attributes = {} - - def merge(self, catalog_item): - """ - :type catalog_item UpgradeCatalog - """ - # handle "merged-copy" tag - config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {} - clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and - config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG) - if clear_properties: - self.clear() - Options.logger.info("Processing configuration group: %s", self._name) - catalog_item.process_simple_transformations(self._name, self.properties) - - -def write_mapping(hostmapping): - if os.path.isfile(Options.MR_MAPPING_FILE): - os.remove(Options.MR_MAPPING_FILE) - json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w')) - - -def read_mapping(): - if os.path.isfile(Options.MR_MAPPING_FILE): - if Options.MR_MAPPING is not None: - return Options.MR_MAPPING - else: - Options.MR_MAPPING = json.load(open(Options.MR_MAPPING_FILE)) - return Options.MR_MAPPING - else: - raise FatalException(-1, "MAPREDUCE host mapping file, mr_mapping, is not available or badly formatted. Execute " - "action save-mr-mapping. Ensure the file is present in the directory where you are " - "executing this command.") - - -def get_mr1_mapping(): - components = ["MAPREDUCE_CLIENT", "JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] - GET_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE/components/%s' - hostmapping = {} - for component in components: - hostlist = [] - structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True) - - if 'host_components' in structured_resp: - for hostcomponent in structured_resp['host_components']: - if 'HostRoles' in hostcomponent: - if 'host_name' in hostcomponent['HostRoles']: - hostlist.append(hostcomponent['HostRoles']['host_name']) - - hostmapping[component] = hostlist - write_mapping(hostmapping) - - pprint("File mr_mapping contains the host mapping for mapreduce components. This file is critical for later " - "steps.") - - -def get_YN_input(prompt, default): - yes = set(['yes', 'ye', 'y']) - no = set(['no', 'n']) - return get_choice_string_input(prompt, default, yes, no) - - -def get_choice_string_input(prompt, default, firstChoice, secondChoice): - choice = raw_input(prompt).lower() - if choice in firstChoice: - return True - elif choice in secondChoice: - return False - elif choice is "": # Just enter pressed - return default - else: - print "input not recognized, please try again: " - return get_choice_string_input(prompt, default, firstChoice, secondChoice) - - -def delete_mr(): - saved_mr_mapping = get_YN_input("Have you saved MR host mapping using action save-mr-mapping [y/n] (n)? ", False) - if not saved_mr_mapping: - raise FatalException(1, "Ensure MAPREDUCE host component mapping is saved before deleting it. Use action " - "save-mr-mapping.") - - SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE' - COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/%s/host_components/%s' - NON_CLIENTS = ["JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] - PUT_IN_DISABLED = { - "HostRoles": { - "state": "DISABLED" - } - } - - hostmapping = read_mapping() - - for key, value in hostmapping.items(): - if (key in NON_CLIENTS) and (len(value) > 0): - for host in value: - curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED, - validate=True) - - curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True) - - -def get_cluster_stackname(): - VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version' - - structured_resp = curl(VERSION_URL_FORMAT, validate=True, parse=True) - - if 'Clusters' in structured_resp: - if 'version' in structured_resp['Clusters']: - return structured_resp['Clusters']['version'] - - raise FatalException(-1, "Unable to get the cluster version") - - -def has_component_in_stack_def(stack_name, service_name, component_name): - STACK_COMPONENT_URL_FORMAT = Options.ROOT_URL + '/stacks2/{0}/versions/{1}/stackServices/{2}/serviceComponents/{3}' - stack, stack_version = stack_name.split('-') - - try: - curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name), - validate=True) - return True - except FatalException: - return False - - -def add_services(): - SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' - COMPONENT_URL_FORMAT = SERVICE_URL_FORMAT + '/components/{1}' - HOST_COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/{0}/host_components/{1}' - service_comp = { - "YARN": ["NODEMANAGER", "RESOURCEMANAGER", "YARN_CLIENT"], - "MAPREDUCE2": ["HISTORYSERVER", "MAPREDUCE2_CLIENT"]} - new_old_host_map = { - "NODEMANAGER": "TASKTRACKER", - "HISTORYSERVER": "HISTORYSERVER", - "RESOURCEMANAGER": "JOBTRACKER", - "YARN_CLIENT": "MAPREDUCE_CLIENT", - "MAPREDUCE2_CLIENT": "MAPREDUCE_CLIENT"} - - stack_name = get_cluster_stackname() - stack_has_ats = has_component_in_stack_def(stack_name, "YARN", "APP_TIMELINE_SERVER") - - # if upgrading to stack > 2.1 (which has ats) - if stack_has_ats: - service_comp["YARN"].append("APP_TIMELINE_SERVER") - new_old_host_map["APP_TIMELINE_SERVER"] = "JOBTRACKER" - - hostmapping = read_mapping() - - for service in service_comp.keys(): - curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST") - - for component in service_comp[service]: - curl(COMPONENT_URL_FORMAT.format(service, component), - validate=True, request_type="POST") - - for host in hostmapping[new_old_host_map[component]]: - curl(HOST_COMPONENT_URL_FORMAT.format(host, component), - validate=True, request_type="POST") - - -def update_config(properties, config_type, attributes=None): - tag = "version" + str(int(time.time() * 1000)) - properties_payload = {"Clusters": {"desired_config": {"type": config_type, "tag": tag, "properties": properties}}} - if attributes is not None: - properties_payload["Clusters"]["desired_config"]["properties_attributes"] = attributes - - expect_body = config_type != "cluster-env" # ToDo: make exceptions more flexible - - curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True) - - -def build_all_options(desired_configs): - """ - Get all configs in the old-fashion way ( versions below 1.7.0 doesn't support "properties" filter ) - """ - config_url_tpl = Options.CLUSTER_URL + "/configurations?type={0}&tag={1}" - all_options = {CatConst.ITEMS_TAG: []} - for config in desired_configs: - cfg_item = curl(config_url_tpl.format(config, desired_configs[config]["tag"]), parse=True, validate=True) - if CatConst.ITEMS_TAG in cfg_item and len(cfg_item[CatConst.ITEMS_TAG]) == 1: - all_options[CatConst.ITEMS_TAG].append(cfg_item[CatConst.ITEMS_TAG][0]) - - return all_options - - -def get_config_resp_all(): - desired_configs = {} - config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties" - desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True) - - if 'Clusters' in desired_configs_resp: - if 'desired_configs' in desired_configs_resp['Clusters']: - desired_configs_resp = desired_configs_resp['Clusters']['desired_configs'] - else: - return None - else: - return None - - if Options.isPropertyAttributesSupported(): - config_all_properties_url += ",properties_attributes" - all_options = curl(config_all_properties_url, validate=True, parse=True) - else: - all_options = build_all_options(desired_configs_resp) - - if CatConst.ITEMS_TAG in all_options: - all_options = all_options[CatConst.ITEMS_TAG] - else: - return None - - all_options = filter( - lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][ - "tag"], - all_options) - - for item in all_options: - dc_item = {} - - if CatConst.STACK_PROPERTIES in item: # config item could not contain any property - dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES] - else: - dc_item[CatConst.STACK_PROPERTIES] = {} - - if CatConst.STACK_PROPERTIES_ATTRIBUTES in item: - dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES] - - if "tag" in item: - dc_item["tag"] = item["tag"] - - if dc_item != {}: - desired_configs[item[CatConst.TYPE_TAG]] = dc_item - - return desired_configs - - -def is_services_exists(required_services): - """ - return true, if required_services is a part of Options.SERVICES - :param required_services: list - :return: bool - """ - # sets are equal - if Options.SERVICES == set(required_services): - return True - - return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES - - -def get_cluster_services(): - services_url = Options.CLUSTER_URL + '/services' - raw_services = curl(services_url, parse=True) - - # expected structure: - # items: [ {"href":"...", "ServiceInfo":{"cluster_name":"..", "service_name":".."}}, ..., ... ] - if raw_services is not None and "items" in raw_services and isinstance(raw_services["items"], list): - return list(map(lambda item: item["ServiceInfo"]["service_name"], raw_services["items"])) - - Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work") - return [] - - -def get_zookeeper_quorum(): - zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, parse=True) - zoo_quorum = [] - zoo_def_port = "2181" - if Options.server_config_factory is not None and Options.ZK_OPTIONS in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.ZK_OPTIONS) - if Options.ZK_CLIENTPORT in props.properties: - zoo_def_port = props.properties[Options.ZK_CLIENTPORT] - - if "host_components" in zoo_cfg: - for item in zoo_cfg["host_components"]: - zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port)) - - return ",".join(zoo_quorum) - - -def get_tez_history_url_base(): - try: - tez_view = curl(Options.TEZ_VIEW_URL, validate=False, parse=True) - except HTTPError as e: - raise TemplateProcessingException(str(e)) - - version = "" - if "versions" in tez_view and \ - len(tez_view['versions']) > 0 and \ - "ViewVersionInfo" in tez_view['versions'][0] and \ - 'version' in tez_view['versions'][0]['ViewVersionInfo']: - version = tez_view['versions'][0]['ViewVersionInfo']['version'] - url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(Options.API_PROTOCOL, Options.HOST, Options.API_PORT, version) - return url - -def get_kafka_listeners(): - kafka_host = "localhost" - kafka_port = "6667" - if Options.server_config_factory is not None and Options.KAFKA_BROKER_CONF in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.KAFKA_BROKER_CONF) - if Options.KAFKA_PORT in props.properties: - kafka_port = props.properties[Options.KAFKA_PORT] - - # Default kafka listeners string - kafka_listeners = "PLAINTEXT://{0}:{1}".format(kafka_host, kafka_port) - - return kafka_listeners - - -def get_ranger_xaaudit_hdfs_destination_directory(): - namenode_hostname="localhost" - namenode_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.NAMENODE), validate=False, parse=True) - if "host_components" in namenode_cfg: - namenode_hostname = namenode_cfg["host_components"][0]["HostRoles"]["host_name"] - - return "hdfs://{0}:8020/ranger/audit".format(namenode_hostname) - -def get_ranger_policymgr_external_url(): - url = "{{policymgr_mgr_url}}" - if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) - if Options.RANGER_EXTERNAL_URL in props.properties: - url = props.properties[Options.RANGER_EXTERNAL_URL] - return url - -def get_jdbc_driver(): - driver = "{{jdbc_driver}}" - if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) - if "DB_FLAVOR" in props.properties: - db = props.properties["DB_FLAVOR"] - - if db.lower() == "mysql": - driver = "com.mysql.jdbc.Driver" - elif db.lower() == "oracle": - driver = "oracle.jdbc.OracleDriver" - return driver - -def get_audit_jdbc_url(): - audit_jdbc_url = "{{audit_jdbc_url}}" - if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) - if "DB_FLAVOR" in props.properties: - xa_audit_db_flavor = props.properties["DB_FLAVOR"] - if "db_host" in props.properties: - xa_db_host = props.properties["db_host"] - if "audit_db_name" in props.properties: - xa_audit_db_name = props.properties["audit_db_name"] - - if xa_audit_db_flavor.lower() == 'mysql': - audit_jdbc_url = "jdbc:mysql://{0}/{1}".format(xa_db_host, xa_audit_db_name) - elif xa_audit_db_flavor.lower() == 'oracle': - audit_jdbc_url = "jdbc:oracle:thin:\@//{0}".format(xa_db_host) - return audit_jdbc_url - -def get_audit_db_passwd(): - audit_db_passwd = "crypted" - if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) - if "audit_db_password" in props.properties: - audit_db_passwd = props.properties['audit_db_password'] - return audit_db_passwd - -def get_audit_to_db_enabled(config_name): - audit_to_db = "true" - if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(config_name) - if "XAAUDIT.DB.IS_ENABLED" in props.properties: - audit_to_db = props.properties["XAAUDIT.DB.IS_ENABLED"] - return audit_to_db - -def get_audit_to_hdfs_enabled(config_name): - audit_to_hdfs = "false" - if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(config_name) - if "XAAUDIT.HDFS.IS_ENABLED" in props.properties: - audit_to_hdfs = props.properties["XAAUDIT.HDFS.IS_ENABLED"] - return audit_to_hdfs - -def get_hdfs_batch_filespool_dir(config_name, component): - if component == 'hdfs': - path = '/var/log/hadoop/hdfs/audit/hdfs/spool' - else: - path = '/var/log/{0}/audit/hdfs/spool'.format(component) - if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(config_name) - if "XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY" in props.properties: - path = props.properties["XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY"] - return path - - -def get_usersync_sync_source(): - ug_sync_source = 'org.apache.ranger.unixusersync.process.UnixUserGroupBuilder' - sync_source = 'unix' - if Options.server_config_factory is not None and Options.RANGER_USERSYNC in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_USERSYNC) - if "SYNC_SOURCE" in props.properties: - sync_source = props.properties['SYNC_SOURCE'] - - if sync_source == 'ldap': - ug_sync_source = 'org.apache.ranger.ldapusersync.process.LdapUserGroupBuilder' - return ug_sync_source - -def get_audit_check(audit_type): - audit_check_flag = "false" - if Options.server_config_factory is not None and Options.RANGER_ENV in Options.server_config_factory.items(): - props = Options.server_config_factory.get_config(Options.RANGER_ENV) - audit_property = "xasecure.audit.destination.{0}".format(audit_type) - if audit_property in props.properties: - audit_check_flag = props.properties[audit_property] - - return audit_check_flag - -def get_jt_host(catalog): - """ - :type catalog: UpgradeCatalog - :rtype str - """ - if catalog.get_parsed_version()["from"] == 13: - return read_mapping()["JOBTRACKER"][0] - - return "" - - -def get_jh_host(catalog): - """ - :type catalog: UpgradeCatalog - :rtype str - """ - if catalog.get_parsed_version()["from"] == 13: - return read_mapping()["HISTORYSERVER"][0] - - return "" - -def get_ranger_host(): - ranger_config = curl(Options.COMPONENTS_FORMAT.format('RANGER_ADMIN'), validate=False, parse=True) - ranger_host_list = [] - if "host_components" in ranger_config: - for item in ranger_config["host_components"]: - ranger_host_list.append(item["HostRoles"]["host_name"]) - return ranger_host_list[0] - -def get_ranger_service_details(): - server_cfg_factory = Options.server_config_factory - server_cfg_catalog = server_cfg_factory.get_config('admin-properties') - properties_latest = server_cfg_catalog.properties - data = {} - - if properties_latest['DB_FLAVOR'].lower() == 'mysql': - data['RANGER_JDBC_DRIVER'] = 'com.mysql.jdbc.Driver' - data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.MySQLPlatform' - data['RANGER_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['db_name']) - data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['audit_db_name']) - data['RANGER_ROOT_JDBC_URL'] = 'jdbc:mysql://{0}'.format(properties_latest['db_host']) - elif properties_latest['DB_FLAVOR'].lower() == 'oracle': - data['RANGER_JDBC_DRIVER'] = 'oracle.jdbc.OracleDriver' - data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.OraclePlatform' - data['RANGER_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) - data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) - data['RANGER_ROOT_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) - - return data - -def get_hive_security_authorization_setting(): - # this pattern should be used only once, changes here mimic UpgradeCatalog210.java -> updateRangerHiveConfigs - scf = Options.server_config_factory - response = "None" - - if "hive-env" in scf.items() and "hive_security_authorization" in scf.get_config("hive-env").properties: - response = scf.get_config("hive-env").properties["hive_security_authorization"] - - old_ranger_catalog = "ranger-hive-plugin-properties" - old_ranger_setting = "ranger-hive-plugin-enabled" - hive_server_catalog = "hiveserver2-site" - hive_sec_property = "hive.security.authorization.enabled" - - if scf is not None and old_ranger_catalog in scf.items(): - cfg = scf.get_config(old_ranger_catalog) - prop = cfg.properties - if old_ranger_setting in prop and cfg.properties[old_ranger_setting].upper() == "YES": - response = "Ranger" - if hive_server_catalog in scf.items(): - hive_props = scf.get_config(hive_server_catalog).properties - hive_props[hive_sec_property] = "true" - if old_ranger_setting in prop: - del prop[old_ranger_setting] - - # workaround for buggy stack advisor - if "HIVE" in Options.SERVICES and response == "None": - if hive_server_catalog not in scf.items(): - scf.create_config(hive_server_catalog) - - scf.get_config(hive_server_catalog).properties[hive_sec_property] = "false" - - return response - - -def get_hbase_coprocessmaster_classes(): - scf = Options.server_config_factory - prop = "hbase.coprocessor.master.classes" - hbase_ranger_enabled = False - old_value = "" - if "hbase-site" in scf.items(): - if prop in scf.get_config("hbase-site").properties: - old_value = scf.get_config("hbase-site").properties[prop] - if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ - scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": - hbase_ranger_enabled = True - - if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: - if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: - old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", - "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") - else: - val = [] if old_value.strip() == "" else old_value.split(',') - val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") - old_value = ','.join(val) - - return old_value - - -def get_rpc_scheduler_factory_class(): - if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: - return "org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory" - else: - return "" - - -def get_hbase_rpc_controllerfactory_class(): - if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: - return "org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory" - else: - return "" - - -def get_hbase_regionserver_wal_codec(): - prop = "phoenix_sql_enabled" - scf = Options.server_config_factory - if "hbase-env" in scf.items(): - if prop in scf.get_config("hbase-env").properties and scf.get_config("hbase-env").properties[prop].upper() == "TRUE": - return "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec" - return "org.apache.hadoop.hbase.regionserver.wal.WALCellCodec" - - -def get_hbase_coprocessor_region_classes(): - scf = Options.server_config_factory - prop = "hbase.coprocessor.region.classes" - hbase_ranger_enabled = False - old_value = "" - if "hbase-site" in scf.items(): - if prop in scf.get_config("hbase-site").properties: - old_value = scf.get_config("hbase-site").properties[prop] - if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ - scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": - hbase_ranger_enabled = True - - if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: - if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: - old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", - "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") - else: - val = [] if old_value.strip() == "" else old_value.split(',') - val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") - old_value = ','.join(val) - - return old_value - - -def _substitute_handler(upgrade_catalog, tokens, value): - """ - Substitute handler - :param upgrade_catalog: UpgradeCatalog - :param tokens: list - :param value: str - :rtype str - """ - for token in tokens: - if token == "{JOBHISTORY_HOST}": - value = value.replace(token, get_jh_host(upgrade_catalog)) - elif token == "{RESOURCEMANAGER_HOST}": - value = value.replace(token, get_jt_host(upgrade_catalog)) - elif token == "{HBASE_REGIONSERVER_WAL_CODEC}": - value = value.replace(token, get_hbase_regionserver_wal_codec()) - elif token == "{HBASE_REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS}": - value = value.replace(token, get_rpc_scheduler_factory_class()) - elif token == "{HBASE_RPC_CONTROLLERFACTORY_CLASS}": - value = value.replace(token, get_hbase_rpc_controllerfactory_class()) - elif token == "{ZOOKEEPER_QUORUM}": - value = value.replace(token, get_zookeeper_quorum()) - elif token == "{HBASE_COPROCESS_MASTER_CLASSES}": - value = value.replace(token, get_hbase_coprocessmaster_classes()) - elif token == "{HBASE_COPROCESSOR_REGION_CLASSES}": - value = value.replace(token, get_hbase_coprocessor_region_classes()) - elif token == "{HIVE_SECURITY_AUTHORIZATION}": - value = value.replace(token, get_hive_security_authorization_setting()) - elif token == "{TEZ_HISTORY_URL_BASE}": - value = value.replace(token, get_tez_history_url_base()) - elif token == "{RANGER_JDBC_DRIVER}": - value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DRIVER']) - elif token == "{RANGER_JDBC_URL}": - value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_URL']) - elif token == "{RANGER_AUDIT_JDBC_URL}": - value = value.replace(token, get_ranger_service_details()['RANGER_AUDIT_JDBC_URL']) - elif token == "{RANGER_HOST}": - value = value.replace(token, get_ranger_host()) - elif token == "{RANGER_JDBC_DIALECT}": - value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DIALECT']) - elif token == "{KAFKA_LISTENERS}": - value = value.replace(token, get_kafka_listeners()) - elif token == "{RANGER_PLUGIN_HBASE_POLICY_CACHE_DIR}": - value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hbase")) - elif token == "{RANGER_PLUGIN_HDFS_POLICY_CACHE_DIR}": - value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hadoop")) - elif token == "{RANGER_PLUGIN_HIVE_POLICY_CACHE_DIR}": - value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hive")) - elif token == "{RANGER_PLUGIN_KNOX_POLICY_CACHE_DIR}": - value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_knox")) - elif token == "{RANGER_PLUGIN_STORM_POLICY_CACHE_DIR}": - value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_storm")) - elif token == "{RANGER_HBASE_KEYSTORE_CREDENTIAL_FILE}": - value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hbase")) - elif token == "{RANGER_HDFS_KEYSTORE_CREDENTIAL_FILE}": - value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hadoop")) - elif token == "{RANGER_HIVE_KEYSTORE_CREDENTIAL_FILE}": - value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hive")) - elif token == "{RANGER_KNOX_KEYSTORE_CREDENTIAL_FILE}": - value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_knox")) - elif token == "{RANGER_STORM_KEYSTORE_CREDENTIAL_FILE}": - value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_storm")) - elif token == "{XAAUDIT_HDFS_DESTINATION_DIRECTORY}": - value = value.replace(token, get_ranger_xaaudit_hdfs_destination_directory()) - elif token == "{HBASE_RANGER_REPO_NAME}": - value = value.replace(token, Options.CLUSTER_NAME+"_hbase") - elif token == "{HDFS_RANGER_REPO_NAME}": - value = value.replace(token, Options.CLUSTER_NAME+"_hadoop") - elif token == "{HIVE_RANGER_REPO_NAME}": - value = value.replace(token, Options.CLUSTER_NAME+"_hive") - elif token == "{KNOX_RANGER_REPO_NAME}": - value = value.replace(token, Options.CLUSTER_NAME+"_knox") - elif token == "{STORM_RANGER_REPO_NAME}": - value = value.replace(token, Options.CLUSTER_NAME+"_storm") - elif token == "{POLICYMGR_MGR_URL}": - value = value.replace(token, get_ranger_policymgr_external_url()) - elif token == "{HDFS_JDBC_DRIVER}": - value = value.replace(token, get_jdbc_driver()) - elif token == "{HBASE_JDBC_DRIVER}": - value = value.replace(token, get_jdbc_driver()) - elif token == "{HIVE_JDBC_DRIVER}": - value = value.replace(token, get_jdbc_driver()) - elif token == "{KNOX_JDBC_DRIVER}": - value = value.replace(token, get_jdbc_driver()) - elif token == "{STORM_JDBC_DRIVER}": - value = value.replace(token, get_jdbc_driver()) - elif token == "{HDFS_AUDIT_JDBC_URL}": - value = value.replace(token, get_audit_jdbc_url()) - elif token == "{HBASE_AUDIT_JDBC_URL}": - value = value.replace(token, get_audit_jdbc_url()) - elif token == "{HIVE_AUDIT_JDBC_URL}": - value = value.replace(token, get_audit_jdbc_url()) - elif token == "{KNOX_AUDIT_JDBC_URL}": - value = value.replace(token, get_audit_jdbc_url()) - elif token == "{STORM_AUDIT_JDBC_URL}": - value = value.replace(token, get_audit_jdbc_url()) - elif token == "{AUDIT_TO_DB_HDFS}": - value = value.replace(token, get_audit_to_db_enabled("ranger-hdfs-plugin-properties")) - elif token == "{AUDIT_TO_DB_HBASE}": - value = value.replace(token, get_audit_to_db_enabled("ranger-hbase-plugin-properties")) - elif token == "{AUDIT_TO_DB_HIVE}": - value = value.replace(token, get_audit_to_db_enabled("ranger-hive-plugin-properties")) - elif token == "{AUDIT_TO_DB_KNOX}": - value = value.replace(token, get_audit_to_db_enabled("ranger-knox-plugin-properties")) - elif token == "{AUDIT_TO_DB_STORM}": - value = value.replace(token, get_audit_to_db_enabled("ranger-storm-plugin-properties")) - elif token == "{AUDIT_TO_HDFS_HDFS}": - value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hdfs-plugin-properties")) - elif token == "{AUDIT_TO_HDFS_HIVE}": - value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hive-plugin-properties")) - elif token == "{AUDIT_TO_HDFS_HBASE}": - value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hbase-plugin-properties")) - elif token == "{AUDIT_TO_HDFS_KNOX}": - value = value.replace(token, get_audit_to_hdfs_enabled("ranger-knox-plugin-properties")) - elif token == "{AUDIT_TO_HDFS_STORM}": - value = value.replace(token, get_audit_to_hdfs_enabled("ranger-storm-plugin-properties")) - elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HDFS}": - value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hdfs-plugin-properties", "hdfs")) - elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HIVE}": - value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hive-plugin-properties", "hive")) - elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HBASE}": - value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hbase-plugin-properties", "hbase")) - elif token == "{AUDIT_HDFS_FILESPOOL_DIR_KNOX}": - value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-knox-plugin-properties", "knox")) - elif token == "{AUDIT_HDFS_FILESPOOL_DIR_STORM}": - value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-storm-plugin-properties", "storm")) - elif token == "{USERSYNC_SYNC_SOURCE}": - value = value.replace(token, get_usersync_sync_source()) - elif token == "{AUDIT_TO_DB}": - value = value.replace(token, get_audit_check("db")) - elif token == "{AUDIT_TO_HDFS}": - value = value.replace(token, get_audit_check("hdfs")) - elif token == "{RANGER_ROOT_JDBC_URL}": - value = value.replace(token, get_ranger_service_details()['RANGER_ROOT_JDBC_URL']) - - return value - - -def modify_config_item(config_type, catalog, server_config_factory): - """ - Modify configuration item - :type config_type str - :type catalog UpgradeCatalog - :type server_config_factory ServerConfigFactory - """ - - # if config group is absent on the server, we will create it - if config_type not in server_config_factory.items(): - server_config_factory.create_config(config_type) - - server_config_catalog = server_config_factory.get_config(config_type) - - server_config_catalog.merge(catalog) - - -def modify_configs(): - if len(Options.ARGS) > 1: - config_type = Options.ARGS[1] - else: - config_type = None - - catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog - catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, - Options.OPTIONS.to_stack) # get desired version of catalog - Options.stack_advisor = StackAdvisorFactory().get_instance(catalog.name, catalog.target_version) - - # load all desired configs from the server - # ToDo: implement singleton for that class - Options.server_config_factory = ServerConfigFactory() - - if catalog is None: - raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified" - % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) - - # add user-defined template processing function - catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler) - - if config_type is not None and config_type not in catalog.items: - raise FatalException("Config type %s not exists, no configs was modified" % config_type) - - if config_type is not None: - modify_config_item(config_type, catalog, Options.server_config_factory) - else: - for collection_name in catalog.items: - modify_config_item(collection_name, catalog, Options.server_config_factory) - - Options.server_config_factory.process_mapping_transformations(catalog) - - # commit changes to server, if any will be found - Options.server_config_factory.commit() - - -def backup_configs(conf_type=None): - dir = "backups_%d" % time.time() - file_pattern = "%s%s%s_%s.json" - configs = get_config_resp_all() - if configs is None: - Options.logger.error("Unexpected response from the server") - return -1 - - if conf_type is not None and conf_type in configs: - configs = {conf_type: configs[conf_type]} - - if not os.path.exists(dir): - os.mkdir(dir) - - for item in configs: - filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"]) - if os.path.exists(filename): - os.remove(filename) - - try: - with open(filename, "w") as f: - f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4)) - Options.logger.info("Catalog \"%s\" stored to %s", item, filename) - except IOError as e: - Options.logger.error("Unable to store \"%s\": %s", item, e) - - -def install_services(): - SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' - SERVICES = ["MAPREDUCE2", "YARN"] - PUT_IN_INSTALLED = [ - { - "RequestInfo": { - "context": "Install MapReduce2" - }, - "Body": { - "ServiceInfo": { - "state": "INSTALLED" - } - } - }, - { - "RequestInfo": { - "context": "Install YARN" - }, - "Body": { - "ServiceInfo": { - "state": "INSTALLED" - } - } - } - ] - - err_retcode = 0 - err_message = "" - for index in [0, 1]: - try: - curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index]) - except FatalException as e: - if not e.code == 0: - err_retcode = e.code - err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "." - - if err_retcode != 0: - raise FatalException(err_retcode, - err_message + "(Services may already be installed or agents are not yet started.)") - - Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \ - "the status of the install requests." - - -def generate_auth_header(user, password): - token = "%s:%s" % (user, password) - token = base64.encodestring(token) - return {"Authorization": "Basic %s" % token.replace('\n', '')} - - -def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False, - validate=False, soft_validation=False): - """ - :rtype type - """ - _headers = {} - handler_chain = [] - post_req = ["POST", "PUT"] - get_req = ["GET", "DELETE"] - - print_url = Options.CURL_PRINT_ONLY is not None - write_only_print = Options.CURL_WRITE_ONLY is not None - - if request_type not in post_req + get_req: - raise IOError("Wrong request type \"%s\" passed" % request_type) - - if data is not None and isinstance(data, dict): - data = json.dumps(data) - - if tokens is not None: - _headers.update(generate_auth_header(tokens["user"], tokens["pass"])) - elif Options.API_TOKENS is not None: - _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"])) - - if request_type in post_req and data is not None: - _headers["Content-Length"] = len(data) - - if headers is not None: - _headers.update(headers) - - if Options.HEADERS is not None: - _headers.update(Options.HEADERS) - - director = build_opener(*handler_chain) - if request_type in post_req: - _data = bytes(data) - req = Request(url, headers=_headers, data=_data) - else: - req = Request(url, headers=_headers) - - req.get_method = lambda: request_type - - if print_url: - if write_only_print: - if request_type in post_req: - Options.logger.info(url) - if data is not None: - Options.logger.info("POST Data: \n" + str(data)) - else: - Options.logger.info(url) - if request_type in post_req and data is not None: - Options.logger.info("POST Data: \n" + str(data)) - - code = 200 - if not (print_url and request_type in post_req): - try: - resp = director.open(req) - out = resp.read() - if isinstance(out, bytes): - out = out.decode("utf-8") - code = resp.code - except URLError as e: - Options.logger.error(str(e)) - if isinstance(e, HTTPError): - raise e - else: - raise FatalException(-1, str(e)) - else: - if not print_url: - Options.logger.info(url) - out = "{}" - - if validate and not print_url and (code > 299 or code < 200): - if soft_validation: - Options.logger.warning("Response validation failed, please check previous action result manually.") - else: - raise FatalException(code, "Response validation failed, please check previous action result manually.") - - if parse: - return json.loads(out) - else: - return out - - -def configuration_item_diff(collection_name, catalog, actual_properties_list): - """ - Merge catalog item with actual config item on the server - Diff item response: - { - "property" : name, - "catalog_item": value, - "catalog_value": value, - "actual_value": value - } - :param collection_name: - :param catalog: - :param actual_properties_list - :return: - """ - - verified_catalog = [] - catalog_properties = dict(catalog) - actual_properties = dict(actual_properties_list) - - if actual_properties is None: - verified_catalog = map(lambda x: { - "property": x, - "catalog_item": catalog_properties[x], - "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG], - "actual_value": None - }, catalog_properties.keys()) - else: - # build list of properties according to actual properties - verified_catalog = map(lambda x: { - "property": x, - "catalog_item": catalog_properties[x] if x in catalog_properties else None, - "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if x in catalog_properties else None, - "actual_value": actual_properties[x] - }, actual_properties.keys()) - - # build list of properties according to catalog properties - verified_catalog_catalog = map(lambda x: { - "property": x, - "catalog_item": catalog_properties[x], - "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in - catalog_properties[x] else None, - "actual_value": actual_properties[x] if x in actual_properties else None, - }, catalog_properties.keys()) - - # append properties, which are listened in catalog but doesn't present in the actual configuration - verified_catalog += filter(lambda x: x["property"] not in actual_properties, verified_catalog_catalog) - - return verified_catalog - - -def configuration_diff_analyze(diff_list): - report = {} - for item_key in diff_list.keys(): - property_diff_list = diff_list[item_key] - item_stat = { - "skipped": {"count": 0, "items": []}, - "ok": {"count": 0, "items": []}, - "fail": {"count": 0, "items": []}, - "total": {"count": len(property_diff_list), "items": []} - } - - def push_status(status, _property_item): - item_stat[status]["count"] += 1 - item_stat[status]["items"].append(_property_item) - - for property_item in property_diff_list: - # process properties which can be absent - - # item was removed, from actual configs according to catalog instructions - if property_item["actual_value"] is None \ - and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \ - and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG: - - push_status("ok", property_item) - - # currently skip values with template tag, as there no filter implemented - # ToDo: implement possibility to filter values without filter handler, - # ToDo: currently filtering is possible only on update-configs stage - elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \ - and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \ - and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: - - push_status("skipped", property_item) - - # item not present in actual config, but present in catalog and no remove tag is present - elif property_item["actual_value"] is None and property_item["catalog_value"] is not None: - push_status("fail", property_item) - - # property exists in actual configuration, but not described in catalog configuration - elif property_item["actual_value"] is not None and property_item["catalog_value"] is None: - push_status("skipped", property_item) - - # actual and catalog properties are equal - elif property_item["catalog_value"] == property_item["actual_value"]: - push_status("ok", property_item) - elif property_item["catalog_value"] != property_item["actual_value"]: - push_status("fail", property_item) - - report[item_key] = item_stat - return report - - -def verify_configuration(): - diff_list = {} - - if len(Options.ARGS) > 1: - config_type = Options.ARGS[1] - else: - config_type = None - - catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog - catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, - Options.OPTIONS.to_stack) # get desired version of catalog - server_configs = ServerConfigFactory() - - if catalog is None: - raise FatalException(1, "Upgrade catalog for version %s-%s not found" - % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) - - if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items(): - raise FatalException("Config type %s not exists" % config_type) - - # fetch from server all option at one time and filter only desired versions - - if config_type is not None: - diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties) - else: - for collection_name in catalog.items.keys(): - diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties) - - analyzed_list = configuration_diff_analyze(diff_list) - - report_file = None - if Options.REPORT_FILE is not None: - try: - report_file = open(Options.REPORT_FILE, "w") - except IOError as e: - Options.logger.error("Report file open error: %s" % e.message) - - for config_item in analyzed_list: - if analyzed_list[config_item]["fail"]["count"] != 0: - Options.logger.info( - "%s: %s missing configuration(s) - please look in the output file for the missing params" % ( - config_item, analyzed_list[config_item]["fail"]["count"] - ) - ) - if report_file is not None: - report_formatter(report_file, config_item, analyzed_list[config_item]) - else: - Options.logger.info("%s: verified" % config_item) - - if report_file is not None: - try: - report_file.close() - except IOError as e: - Options.logger.error("Report file close error: %s" % e.message) - - -def report_formatter(report_file, config_item, analyzed_list_item): - prefix = "Configuration item %s" % config_item - if analyzed_list_item["fail"]["count"] > 0: - for item in analyzed_list_item["fail"]["items"]: - report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % ( - prefix, item["property"], item["actual_value"], item["catalog_value"] - )) - - -def main(): - action_list = { # list of supported actions - Options.GET_MR_MAPPING_ACTION: get_mr1_mapping, - Options.DELETE_MR_ACTION: delete_mr, - Options.ADD_YARN_MR2_ACTION: add_services, - Options.MODIFY_CONFIG_ACTION: modify_configs, - Options.INSTALL_YARN_MR2_ACTION: install_services, - Options.BACKUP_CONFIG_ACTION: backup_configs, - Options.VERIFY_ACTION: verify_configuration - } - - parser = optparse.OptionParser(usage="usage: %prog [options] action\n Valid actions: " - + ", ".join(action_list.keys()) - + "\n update-configs accepts type, e.g. hdfs-site to update specific configs") - - parser.add_option("-n", "--printonly", - action="store_true", dest="printonly", default=False, - help="Prints all the curl commands to be executed (no post/update request will be performed)") - parser.add_option("-w", "--writeonly", - action="store_true", dest="writeonly", default=False, - help="in the combination with --printonly param will print only post/update requests") - parser.add_option("-o", "--log", dest="logfile", default=None, - help="Log file") - parser.add_option("--report", dest="report", default=None, - help="Report file output location") - - parser.add_option('--upgradeCatalog', default=None, help="Upgrade Catalog file full path", dest="upgrade_json") - parser.add_option('--fromStack', default=None, help="stack version to upgrade from", dest="from_stack") - parser.add_option('--toStack', default=None, help="stack version to upgrade to", dest="to_stack") - - parser.add_option('--hostname', default=None, help="Hostname for Ambari server", dest="hostname") - parser.add_option('--port', default='8080', help="Port number for Ambari server", dest="port") - parser.add_option('--https', default=False, action="store_true", dest="https", help="Use https protocol for connection to the server") - parser.add_option('--user', default=None, help="Ambari admin user", dest="user") - parser.add_option('--password', default=None, help="Ambari admin password", dest="password") - parser.add_option('--clustername', default=None, help="Cluster name", dest="clustername") - - (options, args) = parser.parse_args() - Options.initialize_logger(options.logfile) - options.warnings = [] - - if len(arg
<TRUNCATED>