http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_checkpoint_time.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_checkpoint_time.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_checkpoint_time.py deleted file mode 100644 index 2a29d1a..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_checkpoint_time.py +++ /dev/null @@ -1,223 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import time -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging -import traceback - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.core.environment import Environment - -LABEL = 'Last Checkpoint: [{h} hours, {m} minutes, {tx} transactions]' - -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' -NN_CHECKPOINT_TX_KEY = '{{hdfs-site/dfs.namenode.checkpoint.txns}}' -NN_CHECKPOINT_PERIOD_KEY = '{{hdfs-site/dfs.namenode.checkpoint.period}}' - -PERCENT_WARNING_KEY = 'checkpoint.time.warning.threshold' -PERCENT_WARNING_DEFAULT = 200 - -PERCENT_CRITICAL_KEY = 'checkpoint.time.critical.threshold' -PERCENT_CRITICAL_DEFAULT = 200 - -CHECKPOINT_TX_DEFAULT = 1000000 -CHECKPOINT_PERIOD_DEFAULT = 21600 - -CONNECTION_TIMEOUT_KEY = 'connection.timeout' -CONNECTION_TIMEOUT_DEFAULT = 5.0 - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' - -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, - NN_CHECKPOINT_TX_KEY, NN_CHECKPOINT_PERIOD_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - """ - - if configurations is None: - return (('UNKNOWN', ['There were no configurations supplied to the script.'])) - - uri = None - scheme = 'http' - http_uri = None - https_uri = None - http_policy = 'HTTP_ONLY' - percent_warning = PERCENT_WARNING - percent_critical = PERCENT_CRITICAL - checkpoint_tx = CHECKPOINT_TX_DEFAULT - checkpoint_period = CHECKPOINT_PERIOD_DEFAULT - - if NN_HTTP_ADDRESS_KEY in configurations: - http_uri = configurations[NN_HTTP_ADDRESS_KEY] - - if NN_HTTPS_ADDRESS_KEY in configurations: - https_uri = configurations[NN_HTTPS_ADDRESS_KEY] - - if NN_HTTP_POLICY_KEY in configurations: - http_policy = configurations[NN_HTTP_POLICY_KEY] - - if NN_CHECKPOINT_TX_KEY in configurations: - checkpoint_tx = configurations[NN_CHECKPOINT_TX_KEY] - - if NN_CHECKPOINT_PERIOD_KEY in configurations: - checkpoint_period = configurations[NN_CHECKPOINT_PERIOD_KEY] - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - # parse script arguments - connection_timeout = CONNECTION_TIMEOUT_DEFAULT - if CONNECTION_TIMEOUT_KEY in parameters: - connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) - - percent_warning = PERCENT_WARNING_DEFAULT - if PERCENT_WARNING_KEY in parameters: - percent_warning = float(parameters[PERCENT_WARNING_KEY]) * 100 - - percent_critical = PERCENT_CRITICAL_DEFAULT - if PERCENT_CRITICAL_KEY in parameters: - percent_critical = float(parameters[PERCENT_CRITICAL_KEY]) * 100 - - # determine the right URI and whether to use SSL - uri = http_uri - if http_policy == 'HTTPS_ONLY': - scheme = 'https' - - if https_uri is not None: - uri = https_uri - - current_time = int(round(time.time() * 1000)) - - last_checkpoint_time_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem".format(scheme,uri) - journal_transaction_info_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme,uri) - - # start out assuming an OK status - label = None - result_code = "OK" - - try: - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - # curl requires an integer timeout - curl_connection_timeout = int(connection_timeout) - - last_checkpoint_time_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, - kerberos_principal, last_checkpoint_time_qry,"checkpoint_time_alert", executable_paths, False, - "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout) - - last_checkpoint_time_response_json = json.loads(last_checkpoint_time_response) - last_checkpoint_time = int(last_checkpoint_time_response_json["beans"][0]["LastCheckpointTime"]) - - journal_transaction_info_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, - kerberos_principal, journal_transaction_info_qry,"checkpoint_time_alert", executable_paths, - False, "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout) - - journal_transaction_info_response_json = json.loads(journal_transaction_info_response) - journal_transaction_info = journal_transaction_info_response_json["beans"][0]["JournalTransactionInfo"] - else: - last_checkpoint_time = int(get_value_from_jmx(last_checkpoint_time_qry, - "LastCheckpointTime", connection_timeout)) - - journal_transaction_info = get_value_from_jmx(journal_transaction_info_qry, - "JournalTransactionInfo", connection_timeout) - - journal_transaction_info_dict = json.loads(journal_transaction_info) - - last_tx = int(journal_transaction_info_dict['LastAppliedOrWrittenTxId']) - most_recent_tx = int(journal_transaction_info_dict['MostRecentCheckpointTxId']) - transaction_difference = last_tx - most_recent_tx - - delta = (current_time - last_checkpoint_time)/1000 - - label = LABEL.format(h=get_time(delta)['h'], m=get_time(delta)['m'], tx=transaction_difference) - - if (transaction_difference > int(checkpoint_tx)) and (float(delta) / int(checkpoint_period)*100 >= int(percent_critical)): - result_code = 'CRITICAL' - elif (transaction_difference > int(checkpoint_tx)) and (float(delta) / int(checkpoint_period)*100 >= int(percent_warning)): - result_code = 'WARNING' - - except: - label = traceback.format_exc() - result_code = 'UNKNOWN' - - return ((result_code, [label])) - -def get_time(delta): - h = int(delta/3600) - m = int((delta % 3600)/60) - return {'h':h, 'm':m} - - -def get_value_from_jmx(query, jmx_property, connection_timeout): - response = None - - try: - response = urllib2.urlopen(query, timeout=connection_timeout) - data = response.read() - - data_dict = json.loads(data) - return data_dict["beans"][0][jmx_property] - finally: - if response is not None: - try: - response.close() - except: - pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py deleted file mode 100644 index 606633e..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_datanode_unmounted_data_dir.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import os -import logging - -from resource_management.libraries.functions import file_system -from resource_management.libraries.functions import dfs_datanode_helper - -RESULT_STATE_OK = 'OK' -RESULT_STATE_WARNING = 'WARNING' -RESULT_STATE_CRITICAL = 'CRITICAL' -RESULT_STATE_UNKNOWN = 'UNKNOWN' - -DFS_DATA_DIR = '{{hdfs-site/dfs.datanode.data.dir}}' -DATA_STORAGE_TAGS = ['[DISK]','[SSD]','[RAM_DISK]','[ARCHIVE]'] -DATA_DIR_MOUNT_FILE = "/var/lib/ambari-agent/data/datanode/dfs_data_dir_mount.hist" - -logger = logging.getLogger() - - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (DFS_DATA_DIR, DATA_DIR_MOUNT_FILE) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - """ - warnings = [] - errors = [] - - if configurations is None: - return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) - - # Check required properties - if DFS_DATA_DIR not in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(DFS_DATA_DIR)]) - - dfs_data_dir = configurations[DFS_DATA_DIR] - - if dfs_data_dir is None: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script and the value is null'.format(DFS_DATA_DIR)]) - - data_dir_mount_file_exists = True - # This follows symlinks and will return False for a broken link (even in the middle of the linked list) - if not os.path.exists(DATA_DIR_MOUNT_FILE): - data_dir_mount_file_exists = False - warnings.append("File not found, {0} .".format(DATA_DIR_MOUNT_FILE)) - - valid_data_dirs = set() # data dirs that have been normalized - data_dirs_not_exist = set() # data dirs that do not exist - data_dirs_unknown = set() # data dirs for which could not determine mount - data_dirs_on_root = set() # set of data dirs that are on root mount - data_dirs_on_mount = set() # set of data dirs that are mounted on a device - data_dirs_unmounted = [] # list of data dirs that are known to have become unmounted - - for data_dir in dfs_data_dir.split(","): - if data_dir is None or data_dir.strip() == "": - continue - data_dir = data_dir.strip() - # filter out data storage tags - for tag in DATA_STORAGE_TAGS: - if data_dir.startswith(tag): - data_dir = data_dir.replace(tag, "") - continue - valid_data_dirs.add(data_dir) - - # Sort the data dirs, which is needed for deterministic behavior when running the unit tests. - valid_data_dirs = sorted(valid_data_dirs) - for data_dir in valid_data_dirs: - # This follows symlinks and will return False for a broken link (even in the middle of the linked list) - if os.path.isdir(data_dir): - curr_mount_point = file_system.get_mount_point_for_dir(data_dir) - curr_mount_point = curr_mount_point.strip() if curr_mount_point else curr_mount_point - - if curr_mount_point is not None and curr_mount_point != "": - if curr_mount_point == "/": - data_dirs_on_root.add(data_dir) - else: - data_dirs_on_mount.add(data_dir) - else: - data_dirs_unknown.add(data_dir) - else: - data_dirs_not_exist.add(data_dir) - - # To keep the messages consistent for all hosts, sort the sets into lists - valid_data_dirs = sorted(valid_data_dirs) - data_dirs_not_exist = sorted(data_dirs_not_exist) - data_dirs_unknown = sorted(data_dirs_unknown) - data_dirs_on_root = sorted(data_dirs_on_root) - - if data_dirs_not_exist: - errors.append("Data dir(s) not found: {0} .".format(", ".join(data_dirs_not_exist))) - - if data_dirs_unknown: - errors.append("Cannot find mount point for data dir(s): {0} .".format(", ".join(data_dirs_unknown))) - - if data_dir_mount_file_exists: - # Make a precise determination on which data dirs have become unmounted. - - class Params: - def __init__(self, mount_file): - self.data_dir_mount_file = mount_file - params = Params(DATA_DIR_MOUNT_FILE) - - # This dictionary contains the expected values of <data_dir, mount_point> - # Hence, we only need to analyze the data dirs that are currently on the root partition - # and report an error if they were expected to be on a mount. - # - # If one of the data dirs is not present in the file, it means that DataNode has not been restarted after - # the configuration was changed on the server, so we cannot make any assertions about it. - expected_data_dir_to_mount = dfs_datanode_helper.get_data_dir_to_mount_from_file(params) - for data_dir in data_dirs_on_root: - if data_dir in expected_data_dir_to_mount and expected_data_dir_to_mount[data_dir] != "/": - data_dirs_unmounted.append(data_dir) - - if len(data_dirs_unmounted) > 0: - errors.append("Detected data dir(s) that became unmounted and are now writing to the root partition: {0} .".format(", ".join(data_dirs_unmounted))) - else: - # Couldn't make guarantees about the expected value of mount points, so rely on this strategy that is likely to work. - # It will report false positives (aka false alarms) if the user actually intended to have - # 1+ data dirs on a mount and 1+ data dirs on the root partition. - if len(data_dirs_on_mount) >= 1 and len(data_dirs_on_root) >= 1: - errors.append("Detected at least one data dir on a mount point, but these are writing to the root partition: {0} .".format(", ".join(data_dirs_on_root))) - - # Determine the status based on warnings and errors. - if len(errors) == 0: - status = RESULT_STATE_OK - messages = [] - - # Check for warnings - if len(warnings) > 0: - status = RESULT_STATE_WARNING - messages += warnings - - if len(valid_data_dirs) > 0: - messages.append("Data dir(s) are fine, {0} .".format(", ".join(valid_data_dirs))) - else: - messages.append("No data dirs to analyze.") - - return (status, ["\n".join(messages)]) - else: - # Report errors - return (RESULT_STATE_CRITICAL, ["\n".join(errors)]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_ha_namenode_health.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_ha_namenode_health.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_ha_namenode_health.py deleted file mode 100644 index 13c113e..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_ha_namenode_health.py +++ /dev/null @@ -1,261 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging -import traceback - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.core.environment import Environment - -RESULT_STATE_OK = 'OK' -RESULT_STATE_CRITICAL = 'CRITICAL' -RESULT_STATE_UNKNOWN = 'UNKNOWN' -RESULT_STATE_SKIPPED = 'SKIPPED' - -HDFS_NN_STATE_ACTIVE = 'active' -HDFS_NN_STATE_STANDBY = 'standby' - -HDFS_SITE_KEY = '{{hdfs-site}}' -NAMESERVICE_KEY = '{{hdfs-site/dfs.nameservices}}' -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = '{{cluster-env/smokeuser}}' -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' - -CONNECTION_TIMEOUT_KEY = 'connection.timeout' -CONNECTION_TIMEOUT_DEFAULT = 5.0 - -LOGGER_EXCEPTION_MESSAGE = "[Alert] NameNode High Availability Health on {0} fails:" -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (HDFS_SITE_KEY, NAMESERVICE_KEY, NN_HTTP_ADDRESS_KEY, EXECUTABLE_SEARCH_PATHS, - NN_HTTPS_ADDRESS_KEY, DFS_POLICY_KEY, SMOKEUSER_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - """ - if configurations is None: - return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) - - # if not in HA mode, then SKIP - if not NAMESERVICE_KEY in configurations: - return (RESULT_STATE_SKIPPED, ['NameNode HA is not enabled']) - - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - # parse script arguments - connection_timeout = CONNECTION_TIMEOUT_DEFAULT - if CONNECTION_TIMEOUT_KEY in parameters: - connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - - # determine whether or not SSL is enabled - is_ssl_enabled = False - if DFS_POLICY_KEY in configurations: - dfs_policy = configurations[DFS_POLICY_KEY] - if dfs_policy == "HTTPS_ONLY": - is_ssl_enabled = True - - name_service = configurations[NAMESERVICE_KEY] - hdfs_site = configurations[HDFS_SITE_KEY] - - # look for dfs.ha.namenodes.foo - nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service - if not nn_unique_ids_key in hdfs_site: - return (RESULT_STATE_UNKNOWN, ['Unable to find unique namenode alias key {0}'.format(nn_unique_ids_key)]) - - namenode_http_fragment = 'dfs.namenode.http-address.{0}.{1}' - jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - if is_ssl_enabled: - namenode_http_fragment = 'dfs.namenode.https-address.{0}.{1}' - jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - - active_namenodes = [] - standby_namenodes = [] - unknown_namenodes = [] - - # now we have something like 'nn1,nn2,nn3,nn4' - # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id] - # ie dfs.namenode.http-address.hacluster.nn1 - nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') - for nn_unique_id in nn_unique_ids: - key = namenode_http_fragment.format(name_service,nn_unique_id) - - if key in hdfs_site: - # use str() to ensure that unicode strings do not have the u' in them - value = str(hdfs_site[key]) - - try: - jmx_uri = jmx_uri_fragment.format(value) - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - # curl requires an integer timeout - curl_connection_timeout = int(connection_timeout) - - state_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, - kerberos_keytab, kerberos_principal, jmx_uri,"ha_nn_health", executable_paths, False, - "NameNode High Availability Health", smokeuser, connection_timeout=curl_connection_timeout) - - state = _get_ha_state_from_json(state_response) - else: - state_response = get_jmx(jmx_uri, connection_timeout) - state = _get_ha_state_from_json(state_response) - - if state == HDFS_NN_STATE_ACTIVE: - active_namenodes.append(value) - elif state == HDFS_NN_STATE_STANDBY: - standby_namenodes.append(value) - else: - unknown_namenodes.append(value) - except: - logger.exception(LOGGER_EXCEPTION_MESSAGE.format(host_name)) - unknown_namenodes.append(value) - - # now that the request is done, determine if this host is the host that - # should report the status of the HA topology - is_active_namenode = False - for active_namenode in active_namenodes: - if active_namenode.startswith(host_name): - is_active_namenode = True - - # there's only one scenario here; there is exactly 1 active and 1 standby - is_topology_healthy = len(active_namenodes) == 1 and len(standby_namenodes) == 1 - - result_label = 'Active{0}, Standby{1}, Unknown{2}'.format(str(active_namenodes), - str(standby_namenodes), str(unknown_namenodes)) - - # Healthy Topology: - # - Active NN reports the alert, standby does not - # - # Unhealthy Topology: - # - Report the alert if this is the first named host - # - Report the alert if not the first named host, but the other host - # could not report its status - if is_topology_healthy: - if is_active_namenode is True: - return (RESULT_STATE_OK, [result_label]) - else: - return (RESULT_STATE_SKIPPED, ['Another host will report this alert']) - else: - # dfs.namenode.rpc-address.service.alias is guaranteed in HA mode - first_listed_host_key = 'dfs.namenode.rpc-address.{0}.{1}'.format( - name_service, nn_unique_ids[0]) - - first_listed_host = '' - if first_listed_host_key in hdfs_site: - first_listed_host = hdfs_site[first_listed_host_key] - - is_first_listed_host = False - if first_listed_host.startswith(host_name): - is_first_listed_host = True - - if is_first_listed_host: - return (RESULT_STATE_CRITICAL, [result_label]) - else: - # not the first listed host, but the first host might be in the unknown - return (RESULT_STATE_SKIPPED, ['Another host will report this alert']) - - -def get_jmx(query, connection_timeout): - response = None - - try: - response = urllib2.urlopen(query, timeout=connection_timeout) - json_data = response.read() - return json_data - finally: - if response is not None: - try: - response.close() - except: - pass - - -def _get_ha_state_from_json(string_json): - """ - Searches through the specified JSON string looking for HA state - enumerations. - :param string_json: the string JSON - :return: the value of the HA state (active, standby, etc) - """ - json_data = json.loads(string_json) - jmx_beans = json_data["beans"] - - - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=NameNodeStatus" and "State" in jmx_bean: - return jmx_bean["State"] - - - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=FSNamesystem": - return jmx_bean["tag.HAState"] http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_upgrade_finalized.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_upgrade_finalized.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_upgrade_finalized.py deleted file mode 100644 index 6e8945c..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/alerts/alert_upgrade_finalized.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging -import traceback - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request, CONNECTION_TIMEOUT_DEFAULT -from resource_management.core.environment import Environment - -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - - :rtype tuple - """ - return (NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, - KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations : a mapping of configuration key to value - parameters : a mapping of script parameter key to value - host_name : the name of this host where the alert is running - - :type configurations dict - :type parameters dict - :type host_name str - """ - - if configurations is None: - return (('UNKNOWN', ['There were no configurations supplied to the script.'])) - - uri = None - scheme = 'http' - http_uri = None - https_uri = None - http_policy = 'HTTP_ONLY' - - if NN_HTTP_ADDRESS_KEY in configurations: - http_uri = configurations[NN_HTTP_ADDRESS_KEY] - - if NN_HTTPS_ADDRESS_KEY in configurations: - https_uri = configurations[NN_HTTPS_ADDRESS_KEY] - - if NN_HTTP_POLICY_KEY in configurations: - http_policy = configurations[NN_HTTP_POLICY_KEY] - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - # determine the right URI and whether to use SSL - uri = http_uri - if http_policy == 'HTTPS_ONLY': - scheme = 'https' - - if https_uri is not None: - uri = https_uri - - upgrade_finalized_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme, uri) - - # start out assuming an OK status - label = None - result_code = "OK" - - try: - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - last_checkpoint_time_response, error_msg, time_millis = curl_krb_request( - env.tmp_dir, kerberos_keytab, - kerberos_principal, upgrade_finalized_qry, "upgrade_finalized_state", executable_paths, False, - "HDFS Upgrade Finalized State", smokeuser - ) - - upgrade_finalized_response_json = json.loads(last_checkpoint_time_response) - upgrade_finalized = bool(upgrade_finalized_response_json["beans"][0]["UpgradeFinalized"]) - - else: - upgrade_finalized = bool(get_value_from_jmx(upgrade_finalized_qry, - "UpgradeFinalized")) - - if upgrade_finalized: - label = "HDFS cluster is not in the upgrade state" - result_code = 'OK' - else: - label = "HDFS cluster is not finalized" - result_code = 'CRITICAL' - - except: - label = traceback.format_exc() - result_code = 'UNKNOWN' - - return ((result_code, [label])) - -def get_value_from_jmx(query, jmx_property): - """ - Read property from the jxm endpoint - - :param query: jmx uri path - :param jmx_property: property name to read - :return: jmx property value - - :type query str - :type jmx_property str - """ - response = None - - try: - response = urllib2.urlopen(query, timeout=int(CONNECTION_TIMEOUT_DEFAULT)) - data = response.read() - - data_dict = json.loads(data) - return data_dict["beans"][0][jmx_property] - finally: - if response is not None: - try: - response.close() - except: - pass http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkForFormat.sh ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkForFormat.sh b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkForFormat.sh deleted file mode 100644 index e6fd87d..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkForFormat.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env bash -# -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# - -export hdfs_user=$1 -shift -export conf_dir=$1 -shift -export bin_dir=$1 -shift -export old_mark_dir=$1 -shift -export mark_dir=$1 -shift -export name_dirs=$* - -export EXIT_CODE=0 -export command="namenode -format" -export list_of_non_empty_dirs="" - -mark_file=/var/run/hadoop/hdfs/namenode-formatted -if [[ -f ${mark_file} ]] ; then - rm -f ${mark_file} - mkdir -p ${mark_dir} -fi - -if [[ -d $old_mark_dir ]] ; then - mv ${old_mark_dir} ${mark_dir} -fi - -if [[ ! -d $mark_dir ]] ; then - for dir in `echo $name_dirs | tr ',' ' '` ; do - echo "NameNode Dirname = $dir" - cmd="ls $dir | wc -l | grep -q ^0$" - eval $cmd - if [[ $? -ne 0 ]] ; then - (( EXIT_CODE = $EXIT_CODE + 1 )) - list_of_non_empty_dirs="$list_of_non_empty_dirs $dir" - fi - done - - if [[ $EXIT_CODE == 0 ]] ; then - su -s /bin/bash - ${hdfs_user} -c "export PATH=$PATH:${bin_dir} ; yes Y | hdfs --config ${conf_dir} ${command}" - (( EXIT_CODE = $EXIT_CODE | $? )) - else - echo "ERROR: Namenode directory(s) is non empty. Will not format the namenode. List of non-empty namenode dirs ${list_of_non_empty_dirs}" - fi -else - echo "${mark_dir} exists. Namenode DFS already formatted" -fi - -exit $EXIT_CODE - http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkWebUI.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkWebUI.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkWebUI.py deleted file mode 100644 index aa60ffc..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/files/checkWebUI.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import optparse -import httplib - -# -# Main. -# -def main(): - parser = optparse.OptionParser(usage="usage: %prog [options] component ") - parser.add_option("-m", "--hosts", dest="hosts", help="Comma separated hosts list for WEB UI to check it availability") - parser.add_option("-p", "--port", dest="port", help="Port of WEB UI to check it availability") - parser.add_option("-s", "--https", dest="https", help="\"True\" if value of dfs.http.policy is \"HTTPS_ONLY\"") - - (options, args) = parser.parse_args() - - hosts = options.hosts.split(',') - port = options.port - https = options.https - - for host in hosts: - try: - conn = httplib.HTTPConnection(host, port) if not https.lower() == "true" else httplib.HTTPSConnection(host, port) - # This can be modified to get a partial url part to be sent with request - conn.request("GET", "/") - httpCode = conn.getresponse().status - conn.close() - except Exception: - httpCode = 404 - - if httpCode != 200: - print "Cannot access WEB UI on: http://" + host + ":" + port if not https.lower() == "true" else "Cannot access WEB UI on: https://" + host + ":" + port - exit(1) - -if __name__ == "__main__": - main() http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/__init__.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/__init__.py deleted file mode 100644 index 35de4bb..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -""" http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py deleted file mode 100644 index 0cce48c..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/balancer-emulator/hdfs-command.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' -import time -import sys -from threading import Thread - - -def write_function(path, handle, interval): - with open(path) as f: - for line in f: - handle.write(line) - handle.flush() - time.sleep(interval) - -thread = Thread(target = write_function, args = ('balancer.log', sys.stdout, 1.5)) -thread.start() - -threaderr = Thread(target = write_function, args = ('balancer-err.log', sys.stderr, 1.5 * 0.023)) -threaderr.start() - -thread.join() - - -def rebalancer_out(): - write_function('balancer.log', sys.stdout) - -def rebalancer_err(): - write_function('balancer-err.log', sys.stdout) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode.py deleted file mode 100644 index 1645938..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode.py +++ /dev/null @@ -1,157 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" -import datanode_upgrade -from hdfs_datanode import datanode -from resource_management.libraries.functions import conf_select -from resource_management.libraries.functions import iop_select -from resource_management import * -from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version -from resource_management.libraries.functions.security_commons import build_expectations, \ - cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, FILE_TYPE_XML -from hdfs import hdfs -from utils import get_hdfs_binary - -class DataNode(Script): - - def get_hdfs_binary(self): - """ - Get the name or path to the hdfs binary depending on the stack and version. - """ - import params - stack_to_comp = self.get_stack_to_component() - if params.stack_name in stack_to_comp: - return get_hdfs_binary(stack_to_comp[params.stack_name]) - return "hdfs" - def install(self, env): - import params - self.install_packages(env, params.exclude_packages) - env.set_params(params) - - def configure(self, env): - import params - env.set_params(params) - hdfs("datanode") - datanode(action="configure") - - def start(self, env, upgrade_type=None): - import params - env.set_params(params) - self.configure(env) - datanode(action="start") - - def stop(self, env, upgrade_type=None): - import params - env.set_params(params) - # pre-upgrade steps shutdown the datanode, so there's no need to call - - hdfs_binary = self.get_hdfs_binary() - if upgrade_type == "rolling": - stopped = datanode_upgrade.pre_rolling_upgrade_shutdown(hdfs_binary) - if not stopped: - datanode(action="stop") - else: - datanode(action="stop") - - def status(self, env): - import status_params - env.set_params(status_params) - check_process_status(status_params.datanode_pid_file) - - def get_stack_to_component(self): - return {"BigInsights": "hadoop-hdfs-datanode"} - - def pre_upgrade_restart(self, env, upgrade_type=None): - Logger.info("Executing DataNode Stack Upgrade pre-restart") - import params - env.set_params(params) - if params.version and compare_versions(format_hdp_stack_version(params.version), '4.0.0.0') >= 0: - conf_select.select(params.stack_name, "hadoop", params.version) - iop_select.select("hadoop-hdfs-datanode", params.version) - #Execute(format("iop-select set hadoop-hdfs-datanode {version}")) - - - def post_upgrade_restart(self, env, upgrade_type=None): - Logger.info("Executing DataNode Stack Upgrade post-restart") - import params - env.set_params(params) - hdfs_binary = self.get_hdfs_binary() - # ensure the DataNode has started and rejoined the cluster - datanode_upgrade.post_upgrade_check(hdfs_binary) - - def security_status(self, env): - import status_params - - env.set_params(status_params) - props_value_check = {"hadoop.security.authentication": "kerberos", - "hadoop.security.authorization": "true"} - props_empty_check = ["hadoop.security.auth_to_local"] - props_read_check = None - core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, - props_read_check) - props_value_check = None - props_empty_check = ['dfs.datanode.keytab.file', - 'dfs.datanode.kerberos.principal'] - props_read_check = ['dfs.datanode.keytab.file'] - hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, - props_read_check) - - hdfs_expectations = {} - hdfs_expectations.update(core_site_expectations) - hdfs_expectations.update(hdfs_site_expectations) - - security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, - {'core-site.xml': FILE_TYPE_XML, - 'hdfs-site.xml': FILE_TYPE_XML}) - - if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ - security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': - result_issues = validate_security_config_properties(security_params, hdfs_expectations) - if not result_issues: # If all validations passed successfully - try: - # Double check the dict before calling execute - if ('hdfs-site' not in security_params or - 'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or - 'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']): - self.put_structured_out({"securityState": "UNSECURED"}) - self.put_structured_out( - {"securityIssuesFound": "Keytab file or principal are not set property."}) - return - - cached_kinit_executor(status_params.kinit_path_local, - status_params.hdfs_user, - security_params['hdfs-site']['dfs.datanode.keytab.file'], - security_params['hdfs-site']['dfs.datanode.kerberos.principal'], - status_params.hostname, - status_params.tmp_dir) - self.put_structured_out({"securityState": "SECURED_KERBEROS"}) - except Exception as e: - self.put_structured_out({"securityState": "ERROR"}) - self.put_structured_out({"securityStateErrorInfo": str(e)}) - else: - issues = [] - for cf in result_issues: - issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) - self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) - self.put_structured_out({"securityState": "UNSECURED"}) - else: - self.put_structured_out({"securityState": "UNSECURED"}) - - -if __name__ == "__main__": - DataNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode_upgrade.py deleted file mode 100644 index 8f36001..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/datanode_upgrade.py +++ /dev/null @@ -1,141 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" -import re - -from resource_management.core.logger import Logger -from resource_management.core.exceptions import Fail -from resource_management.core.resources.system import Execute -from resource_management.core import shell -from resource_management.libraries.functions import format -from resource_management.libraries.functions.decorator import retry -from utils import get_dfsadmin_base_command - - -def pre_rolling_upgrade_shutdown(hdfs_binary): - """ - Runs the "shutdownDatanode {ipc_address} upgrade" command to shutdown the - DataNode in preparation for an upgrade. This will then periodically check - "getDatanodeInfo" to ensure the DataNode has shutdown correctly. - This function will obtain the Kerberos ticket if security is enabled. - :param hdfs_binary: name/path of the HDFS binary to use - :return: Return True if ran ok (even with errors), and False if need to stop the datanode forcefully. - """ - import params - - Logger.info('DataNode executing "shutdownDatanode" command in preparation for upgrade...') - if params.security_enabled: - Execute(params.dn_kinit_cmd, user = params.hdfs_user) - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - command = format('{dfsadmin_base_command} -shutdownDatanode {dfs_dn_ipc_address} upgrade') - - code, output = shell.call(command, user=params.hdfs_user) - if code == 0: - # verify that the datanode is down - _check_datanode_shutdown(hdfs_binary) - else: - # Due to bug HDFS-7533, DataNode may not always shutdown during stack upgrade, and it is necessary to kill it. - if output is not None and re.search("Shutdown already in progress", output): - Logger.error("Due to a known issue in DataNode, the command {0} did not work, so will need to shutdown the datanode forcefully.".format(command)) - return False - return True - - -def post_upgrade_check(hdfs_binary): - """ - Verifies that the DataNode has rejoined the cluster. This function will - obtain the Kerberos ticket if security is enabled. - :param hdfs_binary: name/path of the HDFS binary to use - :return: - """ - import params - - Logger.info("Checking that the DataNode has rejoined the cluster after upgrade...") - if params.security_enabled: - Execute(params.dn_kinit_cmd, user=params.hdfs_user) - - # verify that the datanode has started and rejoined the HDFS cluster - _check_datanode_startup(hdfs_binary) - - -@retry(times=24, sleep_time=5, err_class=Fail) -def _check_datanode_shutdown(hdfs_binary): - """ - Checks that a DataNode is down by running "hdfs dfsamin getDatanodeInfo" - several times, pausing in between runs. Once the DataNode stops responding - this method will return, otherwise it will raise a Fail(...) and retry - automatically. - The stack defaults for retrying for HDFS are also way too slow for this - command; they are set to wait about 45 seconds between client retries. As - a result, a single execution of dfsadmin will take 45 seconds to retry and - the DataNode may be marked as dead, causing problems with HBase. - https://issues.apache.org/jira/browse/HDFS-8510 tracks reducing the - times for ipc.client.connect.retry.interval. In the meantime, override them - here, but only for RU. - :param hdfs_binary: name/path of the HDFS binary to use - :return: - """ - import params - - # override stock retry timeouts since after 30 seconds, the datanode is - # marked as dead and can affect HBase during RU - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - command = format('{dfsadmin_base_command} -D ipc.client.connect.max.retries=5 -D ipc.client.connect.retry.interval=1000 -getDatanodeInfo {dfs_dn_ipc_address}') - - try: - Execute(command, user=params.hdfs_user, tries=1) - except: - Logger.info("DataNode has successfully shutdown for upgrade.") - return - - Logger.info("DataNode has not shutdown.") - raise Fail('DataNode has not shutdown.') - - -@retry(times=12, sleep_time=10, err_class=Fail) -def _check_datanode_startup(hdfs_binary): - """ - Checks that a DataNode is reported as being alive via the - "hdfs dfsadmin -fs {namenode_address} -report -live" command. Once the DataNode is found to be - alive this method will return, otherwise it will raise a Fail(...) and retry - automatically. - :param hdfs_binary: name/path of the HDFS binary to use - :return: - """ - import params - import socket - - try: - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - command = dfsadmin_base_command + ' -report -live' - return_code, hdfs_output = shell.call(command, user=params.hdfs_user) - except: - raise Fail('Unable to determine if the DataNode has started after upgrade.') - - if return_code == 0: - hostname = params.hostname.lower() - hostname_ip = socket.gethostbyname(params.hostname.lower()) - if hostname in hdfs_output.lower() or hostname_ip in hdfs_output.lower(): - Logger.info("DataNode {0} reports that it has rejoined the cluster.".format(params.hostname)) - return - else: - raise Fail("DataNode {0} was not found in the list of live DataNodes".format(params.hostname)) - - # return_code is not 0, fail - raise Fail("Unable to determine if the DataNode has started after upgrade (result code {0})".format(str(return_code))) http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs.py deleted file mode 100644 index fbee92a..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs.py +++ /dev/null @@ -1,131 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -""" - -from resource_management import * -import sys -import os - - -def hdfs(name=None): - import params - - if params.create_lib_snappy_symlinks: - install_snappy() - - # On some OS this folder could be not exists, so we will create it before pushing there files - Directory(params.limits_conf_dir, - recursive=True, - owner='root', - group='root' - ) - - File(os.path.join(params.limits_conf_dir, 'hdfs.conf'), - owner='root', - group='root', - mode=0644, - content=Template("hdfs.conf.j2") - ) - - if params.security_enabled: - tc_mode = 0644 - tc_owner = "root" - else: - tc_mode = None - tc_owner = params.hdfs_user - - if "hadoop-policy" in params.config['configurations']: - XmlConfig("hadoop-policy.xml", - conf_dir=params.hadoop_conf_dir, - configurations=params.config['configurations']['hadoop-policy'], - configuration_attributes=params.config['configuration_attributes']['hadoop-policy'], - owner=params.hdfs_user, - group=params.user_group - ) - - if "ssl-client" in params.config['configurations']: - XmlConfig("ssl-client.xml", - conf_dir=params.hadoop_conf_dir, - configurations=params.config['configurations']['ssl-client'], - configuration_attributes=params.config['configuration_attributes']['ssl-client'], - owner=params.hdfs_user, - group=params.user_group - ) - - Directory(params.hadoop_conf_secure_dir, - recursive=True, - owner='root', - group=params.user_group, - cd_access='a', - ) - - XmlConfig("ssl-client.xml", - conf_dir=params.hadoop_conf_secure_dir, - configurations=params.config['configurations']['ssl-client'], - configuration_attributes=params.config['configuration_attributes']['ssl-client'], - owner=params.hdfs_user, - group=params.user_group - ) - - if "ssl-server" in params.config['configurations']: - XmlConfig("ssl-server.xml", - conf_dir=params.hadoop_conf_dir, - configurations=params.config['configurations']['ssl-server'], - configuration_attributes=params.config['configuration_attributes']['ssl-server'], - owner=params.hdfs_user, - group=params.user_group - ) - - XmlConfig("hdfs-site.xml", - conf_dir=params.hadoop_conf_dir, - configurations=params.config['configurations']['hdfs-site'], - configuration_attributes=params.config['configuration_attributes']['hdfs-site'], - owner=params.hdfs_user, - group=params.user_group - ) - - XmlConfig("core-site.xml", - conf_dir=params.hadoop_conf_dir, - configurations=params.config['configurations']['core-site'], - configuration_attributes=params.config['configuration_attributes']['core-site'], - owner=params.hdfs_user, - group=params.user_group, - mode=0644 - ) - - File(os.path.join(params.hadoop_conf_dir, 'slaves'), - owner=tc_owner, - content=Template("slaves.j2") - ) - - if params.lzo_enabled and len(params.lzo_packages) > 0: - Package(params.lzo_packages) - -def install_snappy(): - import params - Directory([params.so_target_dir_x86, params.so_target_dir_x64], - recursive=True, - ) - Link(params.so_target_x86, - to=params.so_src_x86, - ) - Link(params.so_target_x64, - to=params.so_src_x64, - ) http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_client.py deleted file mode 100644 index c3b667e..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_client.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -from resource_management import * -from resource_management.libraries.functions import conf_select -from resource_management.libraries.functions import iop_select -from resource_management.libraries.functions.security_commons import build_expectations, \ - cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ - FILE_TYPE_XML -from hdfs import hdfs -from utils import service - - -class HdfsClient(Script): - - def get_stack_to_component(self): - return {"BigInsights": "hadoop-client"} - - def install(self, env): - import params - - self.install_packages(env, params.exclude_packages) - env.set_params(params) - self.configure(env) - - def pre_upgrade_restart(self, env, upgrade_type=None): - import params - env.set_params(params) - if params.version and compare_versions(format_hdp_stack_version(params.version), '4.0.0.0') >= 0: - conf_select.select(params.stack_name, "hadoop", params.version) - conf_select.link_configs("hadoop", params.version, True) - iop_select.select("hadoop-client", params.version) - #Execute(format("iop-select set hadoop-client {version}")) - - def start(self, env, upgrade_type=None): - import params - - env.set_params(params) - - def stop(self, env, upgrade_type=None): - import params - - env.set_params(params) - - def status(self, env): - raise ClientComponentHasNoStatus() - - def configure(self, env): - import params - env.set_params(params) - hdfs() - - def security_status(self, env): - import status_params - env.set_params(status_params) - - props_value_check = {"hadoop.security.authentication": "kerberos", - "hadoop.security.authorization": "true"} - props_empty_check = ["hadoop.security.auth_to_local"] - props_read_check = None - core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, - props_read_check) - hdfs_expectations ={} - hdfs_expectations.update(core_site_expectations) - - security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, - {'core-site.xml': FILE_TYPE_XML}) - - if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ - security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': - result_issues = validate_security_config_properties(security_params, hdfs_expectations) - if not result_issues: # If all validations passed successfully - if status_params.hdfs_user_principal or status_params.hdfs_user_keytab: - try: - cached_kinit_executor(status_params.kinit_path_local, - status_params.hdfs_user, - status_params.hdfs_user_keytab, - status_params.hdfs_user_principal, - status_params.hostname, - status_params.tmp_dir) - self.put_structured_out({"securityState": "SECURED_KERBEROS"}) - except Exception as e: - self.put_structured_out({"securityState": "ERROR"}) - self.put_structured_out({"securityStateErrorInfo": str(e)}) - else: - self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"}) - self.put_structured_out({"securityState": "UNSECURED"}) - else: - issues = [] - for cf in result_issues: - issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) - self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) - self.put_structured_out({"securityState": "UNSECURED"}) - - else: - self.put_structured_out({"securityState": "UNSECURED"}) - -if __name__ == "__main__": - HdfsClient().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/44e21f8e/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_datanode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_datanode.py deleted file mode 100644 index 79750bd..0000000 --- a/ambari-server/src/main/resources/stacks/BigInsights/4.2/services/HDFS/package/scripts/hdfs_datanode.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" -import os -from resource_management import * -from resource_management.libraries.functions.dfs_datanode_helper import handle_dfs_data_dir -from utils import service - - -def create_dirs(data_dir, params): - """ - :param data_dir: The directory to create - :param params: parameters - """ - Directory(data_dir, - recursive=True, - cd_access="a", - mode=0755, - owner=params.hdfs_user, - group=params.user_group, - ignore_failures=True - ) - - -def datanode(action=None): - import params - if action == "configure": - Directory(params.dfs_domain_socket_dir, - recursive=True, - mode=0751, - owner=params.hdfs_user, - group=params.user_group) - - if not os.path.isdir(os.path.dirname(params.data_dir_mount_file)): - Directory(os.path.dirname(params.data_dir_mount_file), - recursive=True, - mode=0755, - owner=params.hdfs_user, - group=params.user_group) - - data_dir_to_mount_file_content = handle_dfs_data_dir(create_dirs, params) - File(params.data_dir_mount_file, - owner=params.hdfs_user, - group=params.user_group, - mode=0644, - content=data_dir_to_mount_file_content - ) - - elif action == "start" or action == "stop": - Directory(params.hadoop_pid_dir_prefix, - mode=0755, - owner=params.hdfs_user, - group=params.user_group - ) - service( - action=action, name="datanode", - user=params.hdfs_user, - create_pid_dir=True, - create_log_dir=True - ) - elif action == "status": - import status_params - check_process_status(status_params.datanode_pid_file) - -