http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_checkpoint_time.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_checkpoint_time.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_checkpoint_time.py deleted file mode 100644 index 26127c3..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_checkpoint_time.py +++ /dev/null @@ -1,255 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import time -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging -import traceback - -from resource_management.libraries.functions.namenode_ha_utils import get_all_namenode_addresses -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS -from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER -from resource_management.core.environment import Environment - -LABEL = 'Last Checkpoint: [{h} hours, {m} minutes, {tx} transactions]' -HDFS_SITE_KEY = '{{hdfs-site}}' - -RESULT_STATE_UNKNOWN = 'UNKNOWN' -RESULT_STATE_SKIPPED = 'SKIPPED' - -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' -NN_CHECKPOINT_TX_KEY = '{{hdfs-site/dfs.namenode.checkpoint.txns}}' -NN_CHECKPOINT_PERIOD_KEY = '{{hdfs-site/dfs.namenode.checkpoint.period}}' - -PERCENT_WARNING_KEY = 'checkpoint.time.warning.threshold' -PERCENT_WARNING_DEFAULT = 200 - -PERCENT_CRITICAL_KEY = 'checkpoint.time.critical.threshold' -PERCENT_CRITICAL_DEFAULT = 200 - -CHECKPOINT_TX_MULTIPLIER_WARNING_KEY = 'checkpoint.txns.multiplier.warning.threshold' -CHECKPOINT_TX_MULTIPLIER_WARNING_DEFAULT = 2 - -CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY = 'checkpoint.txns.multiplier.critical.threshold' -CHECKPOINT_TX_MULTIPLIER_CRITICAL_DEFAULT = 4 - -CHECKPOINT_TX_DEFAULT = 1000000 -CHECKPOINT_PERIOD_DEFAULT = 21600 - -CONNECTION_TIMEOUT_KEY = 'connection.timeout' -CONNECTION_TIMEOUT_DEFAULT = 5.0 - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' - -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (HDFS_SITE_KEY, NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, - NN_CHECKPOINT_TX_KEY, NN_CHECKPOINT_PERIOD_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - """ - - if configurations is None: - return (('UNKNOWN', ['There were no configurations supplied to the script.'])) - - uri = None - scheme = 'http' - http_uri = None - https_uri = None - http_policy = 'HTTP_ONLY' - checkpoint_tx = CHECKPOINT_TX_DEFAULT - checkpoint_period = CHECKPOINT_PERIOD_DEFAULT - - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - if NN_HTTP_POLICY_KEY in configurations: - http_policy = configurations[NN_HTTP_POLICY_KEY] - - if NN_CHECKPOINT_TX_KEY in configurations: - checkpoint_tx = configurations[NN_CHECKPOINT_TX_KEY] - - if NN_CHECKPOINT_PERIOD_KEY in configurations: - checkpoint_period = configurations[NN_CHECKPOINT_PERIOD_KEY] - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - # parse script arguments - connection_timeout = CONNECTION_TIMEOUT_DEFAULT - if CONNECTION_TIMEOUT_KEY in parameters: - connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) - - percent_warning = PERCENT_WARNING_DEFAULT - if PERCENT_WARNING_KEY in parameters: - percent_warning = float(parameters[PERCENT_WARNING_KEY]) - - percent_critical = PERCENT_CRITICAL_DEFAULT - if PERCENT_CRITICAL_KEY in parameters: - percent_critical = float(parameters[PERCENT_CRITICAL_KEY]) - - checkpoint_txn_multiplier_warning = CHECKPOINT_TX_MULTIPLIER_WARNING_DEFAULT - if CHECKPOINT_TX_MULTIPLIER_WARNING_KEY in parameters: - checkpoint_txn_multiplier_warning = float(parameters[CHECKPOINT_TX_MULTIPLIER_WARNING_KEY]) - - checkpoint_txn_multiplier_critical = CHECKPOINT_TX_MULTIPLIER_CRITICAL_DEFAULT - if CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY in parameters: - checkpoint_txn_multiplier_critical = float(parameters[CHECKPOINT_TX_MULTIPLIER_CRITICAL_KEY]) - - kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) - - # determine the right URI and whether to use SSL - hdfs_site = configurations[HDFS_SITE_KEY] - - scheme = "https" if http_policy == "HTTPS_ONLY" else "http" - - nn_addresses = get_all_namenode_addresses(hdfs_site) - for nn_address in nn_addresses: - if nn_address.startswith(host_name + ":"): - uri = nn_address - break - if not uri: - return (RESULT_STATE_SKIPPED, ['NameNode on host {0} not found (namenode adresses = {1})'.format(host_name, ', '.join(nn_addresses))]) - - current_time = int(round(time.time() * 1000)) - - last_checkpoint_time_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem".format(scheme,uri) - journal_transaction_info_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme,uri) - - # start out assuming an OK status - label = None - result_code = "OK" - - try: - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - # curl requires an integer timeout - curl_connection_timeout = int(connection_timeout) - - last_checkpoint_time_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, - kerberos_principal, last_checkpoint_time_qry,"checkpoint_time_alert", executable_paths, False, - "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout, - kinit_timer_ms = kinit_timer_ms) - - last_checkpoint_time_response_json = json.loads(last_checkpoint_time_response) - last_checkpoint_time = int(last_checkpoint_time_response_json["beans"][0]["LastCheckpointTime"]) - - journal_transaction_info_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, kerberos_keytab, - kerberos_principal, journal_transaction_info_qry,"checkpoint_time_alert", executable_paths, - False, "NameNode Last Checkpoint", smokeuser, connection_timeout=curl_connection_timeout, - kinit_timer_ms = kinit_timer_ms) - - journal_transaction_info_response_json = json.loads(journal_transaction_info_response) - journal_transaction_info = journal_transaction_info_response_json["beans"][0]["JournalTransactionInfo"] - else: - last_checkpoint_time = int(get_value_from_jmx(last_checkpoint_time_qry, - "LastCheckpointTime", connection_timeout)) - - journal_transaction_info = get_value_from_jmx(journal_transaction_info_qry, - "JournalTransactionInfo", connection_timeout) - - journal_transaction_info_dict = json.loads(journal_transaction_info) - - last_tx = int(journal_transaction_info_dict['LastAppliedOrWrittenTxId']) - most_recent_tx = int(journal_transaction_info_dict['MostRecentCheckpointTxId']) - transaction_difference = last_tx - most_recent_tx - - delta = (current_time - last_checkpoint_time)/1000 - - label = LABEL.format(h=get_time(delta)['h'], m=get_time(delta)['m'], tx=transaction_difference) - - is_checkpoint_txn_warning = transaction_difference > checkpoint_txn_multiplier_warning * int(checkpoint_tx) - is_checkpoint_txn_critical = transaction_difference > checkpoint_txn_multiplier_critical * int(checkpoint_tx) - - # Either too many uncommitted transactions or missed check-pointing for - # long time decided by the thresholds - if is_checkpoint_txn_critical or (float(delta) / int(checkpoint_period)*100 >= int(percent_critical)): - logger.debug('Raising critical alert: transaction_difference = {0}, checkpoint_tx = {1}'.format(transaction_difference, checkpoint_tx)) - result_code = 'CRITICAL' - elif is_checkpoint_txn_warning or (float(delta) / int(checkpoint_period)*100 >= int(percent_warning)): - logger.debug('Raising warning alert: transaction_difference = {0}, checkpoint_tx = {1}'.format(transaction_difference, checkpoint_tx)) - result_code = 'WARNING' - - except: - label = traceback.format_exc() - result_code = 'UNKNOWN' - - return ((result_code, [label])) - -def get_time(delta): - h = int(delta/3600) - m = int((delta % 3600)/60) - return {'h':h, 'm':m} - - -def get_value_from_jmx(query, jmx_property, connection_timeout): - response = None - - try: - response = urllib2.urlopen(query, timeout=connection_timeout) - data = response.read() - data_dict = json.loads(data) - return data_dict["beans"][0][jmx_property] - finally: - if response is not None: - try: - response.close() - except: - pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_datanode_unmounted_data_dir.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_datanode_unmounted_data_dir.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_datanode_unmounted_data_dir.py deleted file mode 100644 index 765831d..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_datanode_unmounted_data_dir.py +++ /dev/null @@ -1,177 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import os -import logging -import urlparse - -from resource_management.libraries.functions import file_system -from resource_management.libraries.functions import mounted_dirs_helper - -RESULT_STATE_OK = 'OK' -RESULT_STATE_WARNING = 'WARNING' -RESULT_STATE_CRITICAL = 'CRITICAL' -RESULT_STATE_UNKNOWN = 'UNKNOWN' - -DFS_DATA_DIR = '{{hdfs-site/dfs.datanode.data.dir}}' -DATA_STORAGE_TAGS = ['[DISK]','[SSD]','[RAM_DISK]','[ARCHIVE]'] -DATA_DIR_MOUNT_FILE = "/var/lib/ambari-agent/data/datanode/dfs_data_dir_mount.hist" - -logger = logging.getLogger() - - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (DFS_DATA_DIR, DATA_DIR_MOUNT_FILE) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - - DataNode directories can be of the following formats and each needs to be supported: - /grid/dn/archive0 - [SSD]/grid/dn/archive0 - [ARCHIVE]file:///grid/dn/archive0 - """ - warnings = [] - errors = [] - - if configurations is None: - return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) - - # Check required properties - if DFS_DATA_DIR not in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(DFS_DATA_DIR)]) - - dfs_data_dir = configurations[DFS_DATA_DIR] - - if dfs_data_dir is None: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script and the value is null'.format(DFS_DATA_DIR)]) - - # This follows symlinks and will return False for a broken link (even in the middle of the linked list) - data_dir_mount_file_exists = True - if not os.path.exists(DATA_DIR_MOUNT_FILE): - data_dir_mount_file_exists = False - warnings.append("{0} was not found.".format(DATA_DIR_MOUNT_FILE)) - - normalized_data_dirs = set() # data dirs that have been normalized - data_dirs_not_exist = set() # data dirs that do not exist - data_dirs_unknown = set() # data dirs for which could not determine mount - data_dirs_on_root = set() # set of data dirs that are on root mount - data_dirs_on_mount = set() # set of data dirs that are mounted on a device - data_dirs_unmounted = [] # list of data dirs that are known to have become unmounted - - # transform each data directory into something that we can use - for data_dir in dfs_data_dir.split(","): - if data_dir is None or data_dir.strip() == "": - continue - - data_dir = data_dir.strip() - - # filter out data storage tags - for tag in DATA_STORAGE_TAGS: - if data_dir.startswith(tag): - data_dir = data_dir.replace(tag, "") - continue - - # parse the path in case it contains a URI scheme - data_dir = urlparse.urlparse(data_dir).path - - normalized_data_dirs.add(data_dir) - - # Sort the data dirs, which is needed for deterministic behavior when running the unit tests. - normalized_data_dirs = sorted(normalized_data_dirs) - for data_dir in normalized_data_dirs: - # This follows symlinks and will return False for a broken link (even in the middle of the linked list) - if os.path.isdir(data_dir): - curr_mount_point = file_system.get_mount_point_for_dir(data_dir) - curr_mount_point = curr_mount_point.strip() if curr_mount_point else curr_mount_point - - if curr_mount_point is not None and curr_mount_point != "": - if curr_mount_point == "/": - data_dirs_on_root.add(data_dir) - else: - data_dirs_on_mount.add(data_dir) - else: - data_dirs_unknown.add(data_dir) - else: - data_dirs_not_exist.add(data_dir) - - # To keep the messages consistent for all hosts, sort the sets into lists - normalized_data_dirs = sorted(normalized_data_dirs) - data_dirs_not_exist = sorted(data_dirs_not_exist) - data_dirs_unknown = sorted(data_dirs_unknown) - data_dirs_on_root = sorted(data_dirs_on_root) - - if data_dirs_not_exist: - errors.append("The following data dir(s) were not found: {0}\n".format("\n".join(data_dirs_not_exist))) - - if data_dirs_unknown: - errors.append("Cannot find the mount point for the following data dir(s):\n{0}".format("\n".join(data_dirs_unknown))) - - if data_dir_mount_file_exists: - # This dictionary contains the expected values of <data_dir, mount_point> - # Hence, we only need to analyze the data dirs that are currently on the root partition - # and report an error if they were expected to be on a mount. - # - # If one of the data dirs is not present in the file, it means that DataNode has not been restarted after - # the configuration was changed on the server, so we cannot make any assertions about it. - expected_data_dir_to_mount = mounted_dirs_helper.get_dir_to_mount_from_file(DATA_DIR_MOUNT_FILE) - for data_dir in data_dirs_on_root: - if data_dir in expected_data_dir_to_mount and expected_data_dir_to_mount[data_dir] != "/": - data_dirs_unmounted.append(data_dir) - - if len(data_dirs_unmounted) > 0: - errors.append("Detected data dir(s) that became unmounted and are now writing to the root partition:\n{0}".format("\n".join(data_dirs_unmounted))) - else: - # Couldn't make guarantees about the expected value of mount points, so rely on this strategy that is likely to work. - # It will report false positives (aka false alarms) if the user actually intended to have - # 1+ data dirs on a mount and 1+ data dirs on the root partition. - if len(data_dirs_on_mount) >= 1 and len(data_dirs_on_root) >= 1: - errors.append("Detected at least one data dir on a mount point, but these are writing to the root partition:\n{0}".format("\n".join(data_dirs_on_root))) - - # Determine the status based on warnings and errors. - if len(errors) == 0: - status = RESULT_STATE_OK - messages = [] - - # Check for warnings - if len(warnings) > 0: - status = RESULT_STATE_WARNING - messages += warnings - - if len(normalized_data_dirs) > 0: - messages.append("The following data dir(s) are valid:\n{0}".format("\n".join(normalized_data_dirs))) - else: - messages.append("There are no data directories to analyze.") - - return (status, ["\n".join(messages)]) - else: - # Report errors - return (RESULT_STATE_CRITICAL, ["\n".join(errors)]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_ha_namenode_health.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_ha_namenode_health.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_ha_namenode_health.py deleted file mode 100644 index 28b3f22..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_ha_namenode_health.py +++ /dev/null @@ -1,243 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS -from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER -from resource_management.core.environment import Environment - -RESULT_STATE_OK = 'OK' -RESULT_STATE_CRITICAL = 'CRITICAL' -RESULT_STATE_UNKNOWN = 'UNKNOWN' -RESULT_STATE_SKIPPED = 'SKIPPED' - -HDFS_NN_STATE_ACTIVE = 'active' -HDFS_NN_STATE_STANDBY = 'standby' - -HDFS_SITE_KEY = '{{hdfs-site}}' -NAMESERVICE_KEY = '{{hdfs-site/dfs.internal.nameservices}}' -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = '{{cluster-env/smokeuser}}' -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' -INADDR_ANY = '0.0.0.0' -NAMENODE_HTTP_FRAGMENT = 'dfs.namenode.http-address.{0}.{1}' -NAMENODE_HTTPS_FRAGMENT = 'dfs.namenode.https-address.{0}.{1}' -NAMENODE_RPC_FRAGMENT = 'dfs.namenode.rpc-address.{0}.{1}' - -CONNECTION_TIMEOUT_KEY = 'connection.timeout' -CONNECTION_TIMEOUT_DEFAULT = 5.0 - -LOGGER_EXCEPTION_MESSAGE = "[Alert] NameNode High Availability Health on {0} fails:" -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (HDFS_SITE_KEY, NAMESERVICE_KEY, NN_HTTP_ADDRESS_KEY, EXECUTABLE_SEARCH_PATHS, - NN_HTTPS_ADDRESS_KEY, DFS_POLICY_KEY, SMOKEUSER_KEY, KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations (dictionary): a mapping of configuration key to value - parameters (dictionary): a mapping of script parameter key to value - host_name (string): the name of this host where the alert is running - """ - if configurations is None: - return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) - - # if not in HA mode, then SKIP - if not NAMESERVICE_KEY in configurations: - return (RESULT_STATE_SKIPPED, ['NameNode HA is not enabled']) - - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - # parse script arguments - connection_timeout = CONNECTION_TIMEOUT_DEFAULT - if CONNECTION_TIMEOUT_KEY in parameters: - connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) - - # determine whether or not SSL is enabled - is_ssl_enabled = False - if DFS_POLICY_KEY in configurations: - dfs_policy = configurations[DFS_POLICY_KEY] - if dfs_policy == "HTTPS_ONLY": - is_ssl_enabled = True - - name_service = configurations[NAMESERVICE_KEY] - hdfs_site = configurations[HDFS_SITE_KEY] - - # look for dfs.ha.namenodes.foo - nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service - if not nn_unique_ids_key in hdfs_site: - return (RESULT_STATE_UNKNOWN, ['Unable to find unique namenode alias key {0}'.format(nn_unique_ids_key)]) - - namenode_http_fragment = NAMENODE_HTTP_FRAGMENT - jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - if is_ssl_enabled: - namenode_http_fragment = NAMENODE_HTTPS_FRAGMENT - jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - - active_namenodes = [] - standby_namenodes = [] - unknown_namenodes = [] - - # now we have something like 'nn1,nn2,nn3,nn4' - # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id] - # ie dfs.namenode.http-address.hacluster.nn1 - nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') - for nn_unique_id in nn_unique_ids: - key = namenode_http_fragment.format(name_service,nn_unique_id) - rpc_key = NAMENODE_RPC_FRAGMENT.format(name_service,nn_unique_id) - - if key in hdfs_site: - # use str() to ensure that unicode strings do not have the u' in them - value = str(hdfs_site[key]) - if INADDR_ANY in value and rpc_key in hdfs_site: - rpc_value = str(hdfs_site[rpc_key]) - if INADDR_ANY not in rpc_value: - rpc_host = rpc_value.split(":")[0] - value = value.replace(INADDR_ANY, rpc_host) - - try: - jmx_uri = jmx_uri_fragment.format(value) - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - # curl requires an integer timeout - curl_connection_timeout = int(connection_timeout) - - state_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, - kerberos_keytab, kerberos_principal, jmx_uri,"ha_nn_health", executable_paths, False, - "NameNode High Availability Health", smokeuser, connection_timeout=curl_connection_timeout, - kinit_timer_ms = kinit_timer_ms) - - state = _get_ha_state_from_json(state_response) - else: - state_response = get_jmx(jmx_uri, connection_timeout) - state = _get_ha_state_from_json(state_response) - - if state == HDFS_NN_STATE_ACTIVE: - active_namenodes.append(value) - elif state == HDFS_NN_STATE_STANDBY: - standby_namenodes.append(value) - else: - unknown_namenodes.append(value) - except: - logger.exception(LOGGER_EXCEPTION_MESSAGE.format(host_name)) - unknown_namenodes.append(value) - - # there's only one scenario here; there is exactly 1 active and 1 standby - is_topology_healthy = len(active_namenodes) == 1 and len(standby_namenodes) == 1 - - result_label = 'Active{0}, Standby{1}, Unknown{2}'.format(str(active_namenodes), - str(standby_namenodes), str(unknown_namenodes)) - - if is_topology_healthy: - # if there is exactly 1 active and 1 standby NN - return (RESULT_STATE_OK, [result_label]) - else: - # other scenario - return (RESULT_STATE_CRITICAL, [result_label]) - - -def get_jmx(query, connection_timeout): - response = None - - try: - response = urllib2.urlopen(query, timeout=connection_timeout) - json_data = response.read() - return json_data - finally: - if response is not None: - try: - response.close() - except: - pass - - -def _get_ha_state_from_json(string_json): - """ - Searches through the specified JSON string looking for HA state - enumerations. - :param string_json: the string JSON - :return: the value of the HA state (active, standby, etc) - """ - json_data = json.loads(string_json) - jmx_beans = json_data["beans"] - - # look for NameNodeStatus-State first - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=NameNodeStatus" and "State" in jmx_bean: - return jmx_bean["State"] - - # look for FSNamesystem-tag.HAState last - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=FSNamesystem": - return jmx_bean["tag.HAState"] http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_metrics_deviation.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_metrics_deviation.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_metrics_deviation.py deleted file mode 100644 index 8a06f56..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_metrics_deviation.py +++ /dev/null @@ -1,470 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import httplib -import locale -import json -import logging -import urllib -import time -import urllib2 - -from resource_management import Environment -from ambari_commons.aggregate_functions import sample_standard_deviation, mean - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS -from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER -from ambari_commons.ambari_metrics_helper import select_metric_collector_for_sink - - -RESULT_STATE_OK = 'OK' -RESULT_STATE_CRITICAL = 'CRITICAL' -RESULT_STATE_WARNING = 'WARNING' -RESULT_STATE_UNKNOWN = 'UNKNOWN' -RESULT_STATE_SKIPPED = 'SKIPPED' - -HDFS_NN_STATE_ACTIVE = 'active' -HDFS_NN_STATE_STANDBY = 'standby' - -HDFS_SITE_KEY = '{{hdfs-site}}' -NAMESERVICE_KEY = '{{hdfs-site/dfs.internal.nameservices}}' -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -DFS_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' - -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = '{{cluster-env/smokeuser}}' -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' - -METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY = '{{ams-site/timeline.metrics.service.webapp.address}}' -METRICS_COLLECTOR_VIP_HOST_KEY = '{{cluster-env/metrics_collector_vip_host}}' -METRICS_COLLECTOR_VIP_PORT_KEY = '{{cluster-env/metrics_collector_vip_port}}' - -CONNECTION_TIMEOUT_KEY = 'connection.timeout' -CONNECTION_TIMEOUT_DEFAULT = 5.0 - -MERGE_HA_METRICS_PARAM_KEY = 'mergeHaMetrics' -MERGE_HA_METRICS_PARAM_DEFAULT = False -METRIC_NAME_PARAM_KEY = 'metricName' -METRIC_NAME_PARAM_DEFAULT = '' -METRIC_UNITS_PARAM_KEY = 'metric.units' -METRIC_UNITS_DEFAULT = '' -APP_ID_PARAM_KEY = 'appId' -APP_ID_PARAM_DEFAULT = 'NAMENODE' - -# the interval to check the metric (should be cast to int but could be a float) -INTERVAL_PARAM_KEY = 'interval' -INTERVAL_PARAM_DEFAULT = 60 - -# the default threshold to trigger a CRITICAL (should be cast to int but could a float) -DEVIATION_CRITICAL_THRESHOLD_KEY = 'metric.deviation.critical.threshold' -DEVIATION_CRITICAL_THRESHOLD_DEFAULT = 10 - -# the default threshold to trigger a WARNING (should be cast to int but could be a float) -DEVIATION_WARNING_THRESHOLD_KEY = 'metric.deviation.warning.threshold' -DEVIATION_WARNING_THRESHOLD_DEFAULT = 5 -NAMENODE_SERVICE_RPC_PORT_KEY = '' - -MINIMUM_VALUE_THRESHOLD_KEY = 'minimumValue' - -AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s" - -# The variance for this alert is 27MB which is 27% of the 100MB average (20MB is the limit) -DEVIATION_THRESHOLD_MESSAGE = "The variance for this alert is {0}{1} which is {2:.0f}% of the {3}{4} average ({5}{6} is the limit)" - -# The variance for this alert is 15MB which is within 20% of the 904ms average (20MB is the limit) -DEVIATION_OK_MESSAGE = "The variance for this alert is {0}{1} which is within {2:.0f}% of the {3}{4} average ({5}{6} is the limit)" - -logger = logging.getLogger() - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - """ - return (HDFS_SITE_KEY, NAMESERVICE_KEY, NN_HTTP_ADDRESS_KEY, DFS_POLICY_KEY, - EXECUTABLE_SEARCH_PATHS, NN_HTTPS_ADDRESS_KEY, SMOKEUSER_KEY, - KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, - METRICS_COLLECTOR_VIP_HOST_KEY, METRICS_COLLECTOR_VIP_PORT_KEY, - METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY) - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations : a mapping of configuration key to value - parameters : a mapping of script parameter key to value - host_name : the name of this host where the alert is running - - :type configurations dict - :type parameters dict - :type host_name str - """ - hostnames = host_name - current_time = int(time.time()) * 1000 - - # parse script arguments - connection_timeout = CONNECTION_TIMEOUT_DEFAULT - if CONNECTION_TIMEOUT_KEY in parameters: - connection_timeout = float(parameters[CONNECTION_TIMEOUT_KEY]) - - merge_ha_metrics = MERGE_HA_METRICS_PARAM_DEFAULT - if MERGE_HA_METRICS_PARAM_KEY in parameters: - merge_ha_metrics = parameters[MERGE_HA_METRICS_PARAM_KEY].lower() == 'true' - - metric_name = METRIC_NAME_PARAM_DEFAULT - if METRIC_NAME_PARAM_KEY in parameters: - metric_name = parameters[METRIC_NAME_PARAM_KEY] - - metric_units = METRIC_UNITS_DEFAULT - if METRIC_UNITS_PARAM_KEY in parameters: - metric_units = parameters[METRIC_UNITS_PARAM_KEY] - - app_id = APP_ID_PARAM_DEFAULT - if APP_ID_PARAM_KEY in parameters: - app_id = parameters[APP_ID_PARAM_KEY] - - interval = INTERVAL_PARAM_DEFAULT - if INTERVAL_PARAM_KEY in parameters: - interval = _coerce_to_integer(parameters[INTERVAL_PARAM_KEY]) - - warning_threshold = DEVIATION_WARNING_THRESHOLD_DEFAULT - if DEVIATION_WARNING_THRESHOLD_KEY in parameters: - warning_threshold = _coerce_to_integer(parameters[DEVIATION_WARNING_THRESHOLD_KEY]) - - critical_threshold = DEVIATION_CRITICAL_THRESHOLD_DEFAULT - if DEVIATION_CRITICAL_THRESHOLD_KEY in parameters: - critical_threshold = _coerce_to_integer(parameters[DEVIATION_CRITICAL_THRESHOLD_KEY]) - - minimum_value_threshold = None - if MINIMUM_VALUE_THRESHOLD_KEY in parameters: - minimum_value_threshold = _coerce_to_integer(parameters[MINIMUM_VALUE_THRESHOLD_KEY]) - - #parse configuration - if configurations is None: - return (RESULT_STATE_UNKNOWN, ['There were no configurations supplied to the script.']) - - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - if METRICS_COLLECTOR_VIP_HOST_KEY in configurations and METRICS_COLLECTOR_VIP_PORT_KEY in configurations: - collector_host = configurations[METRICS_COLLECTOR_VIP_HOST_KEY] - collector_port = int(configurations[METRICS_COLLECTOR_VIP_PORT_KEY]) - else: - # ams-site/timeline.metrics.service.webapp.address is required - if not METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY)]) - else: - collector_webapp_address = configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY].split(":") - if valid_collector_webapp_address(collector_webapp_address): - collector_host = select_metric_collector_for_sink(app_id.lower()) - collector_port = int(collector_webapp_address[1]) - else: - return (RESULT_STATE_UNKNOWN, ['{0} value should be set as "fqdn_hostname:port", but set to {1}'.format( - METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY, configurations[METRICS_COLLECTOR_WEBAPP_ADDRESS_KEY])]) - - namenode_service_rpc_address = None - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - hdfs_site = configurations[HDFS_SITE_KEY] - - if 'dfs.namenode.servicerpc-address' in hdfs_site: - namenode_service_rpc_address = hdfs_site['dfs.namenode.servicerpc-address'] - - # if namenode alert and HA mode - if NAMESERVICE_KEY in configurations and app_id.lower() == 'namenode': - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return (RESULT_STATE_UNKNOWN, ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)]) - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - # parse script arguments - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - # determine whether or not SSL is enabled - is_ssl_enabled = False - if DFS_POLICY_KEY in configurations: - dfs_policy = configurations[DFS_POLICY_KEY] - if dfs_policy == "HTTPS_ONLY": - is_ssl_enabled = True - - kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) - - name_service = configurations[NAMESERVICE_KEY] - - # look for dfs.ha.namenodes.foo - nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service - if not nn_unique_ids_key in hdfs_site: - return (RESULT_STATE_UNKNOWN, ['Unable to find unique NameNode alias key {0}'.format(nn_unique_ids_key)]) - - namenode_http_fragment = 'dfs.namenode.http-address.{0}.{1}' - jmx_uri_fragment = "http://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - if is_ssl_enabled: - namenode_http_fragment = 'dfs.namenode.https-address.{0}.{1}' - jmx_uri_fragment = "https://{0}/jmx?qry=Hadoop:service=NameNode,name=*" - - # now we have something like 'nn1,nn2,nn3,nn4' - # turn it into dfs.namenode.[property].[dfs.nameservices].[nn_unique_id] - # ie dfs.namenode.http-address.hacluster.nn1 - namenodes = [] - active_namenodes = [] - nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') - for nn_unique_id in nn_unique_ids: - key = namenode_http_fragment.format(name_service, nn_unique_id) - - if key in hdfs_site: - # use str() to ensure that unicode strings do not have the u' in them - value = str(hdfs_site[key]) - namenode = str(hdfs_site[key]).split(":")[0] - - namenodes.append(namenode) - try: - jmx_uri = jmx_uri_fragment.format(value) - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - # curl requires an integer timeout - curl_connection_timeout = int(connection_timeout) - state_response, error_msg, time_millis = curl_krb_request(env.tmp_dir, - kerberos_keytab, kerberos_principal, jmx_uri,"ha_nn_health", executable_paths, False, - "NameNode High Availability Health", smokeuser, connection_timeout=curl_connection_timeout, - kinit_timer_ms = kinit_timer_ms) - - state = _get_ha_state_from_json(state_response) - else: - state_response = get_jmx(jmx_uri, connection_timeout) - state = _get_ha_state_from_json(state_response) - - if state == HDFS_NN_STATE_ACTIVE: - active_namenodes.append(namenode) - - # Only check active NN - nn_service_rpc_address_key = 'dfs.namenode.servicerpc-address.{0}.{1}'.format(name_service, nn_unique_id) - if nn_service_rpc_address_key in hdfs_site: - namenode_service_rpc_address = hdfs_site[nn_service_rpc_address_key] - pass - except: - logger.exception("Unable to determine the active NameNode") - pass - - if merge_ha_metrics: - hostnames = ",".join(namenodes) - # run only on active NN, no need to run the same requests from the standby - if host_name not in active_namenodes: - return (RESULT_STATE_SKIPPED, ['This alert will be reported by another host.']) - pass - - # Skip service rpc alert if port is not enabled - if not namenode_service_rpc_address and 'rpc.rpc.datanode' in metric_name: - return (RESULT_STATE_SKIPPED, ['Service RPC port is not enabled.']) - - get_metrics_parameters = { - "metricNames": metric_name, - "appId": app_id, - "hostname": hostnames, - "startTime": current_time - interval * 60 * 1000, - "endTime": current_time, - "grouped": "true", - } - - encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) - - try: - conn = httplib.HTTPConnection(collector_host, int(collector_port), - timeout=connection_timeout) - conn.request("GET", AMS_METRICS_GET_URL % encoded_get_metrics_parameters) - response = conn.getresponse() - data = response.read() - conn.close() - except Exception: - return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) - - if response.status != 200: - return (RESULT_STATE_UNKNOWN, ["Unable to retrieve metrics from the Ambari Metrics service."]) - - data_json = json.loads(data) - metrics = [] - # will get large standard deviation for multiple hosts, - # if host1 reports small local values, but host2 reports large local values - for metrics_data in data_json["metrics"]: - metrics += metrics_data["metrics"].values() - pass - - if not metrics or len(metrics) < 2: - number_of_data_points = len(metrics) if metrics else 0 - return (RESULT_STATE_SKIPPED, ["There are not enough data points to calculate the standard deviation ({0} sampled)".format( - number_of_data_points)]) - - minimum_value_multiplier = 1 - if 'dfs.FSNamesystem.CapacityUsed' in metric_name: - minimum_value_multiplier = 1024 * 1024 # MB to bytes - elif 'rpc.rpc.datanode' in metric_name or 'rpc.rpc.client' in metric_name: - minimum_value_multiplier = 1000 # seconds to millis - - if minimum_value_threshold: - # Filter out points below min threshold - metrics = [metric for metric in metrics if metric > (minimum_value_threshold * minimum_value_multiplier)] - if len(metrics) < 2: - return (RESULT_STATE_OK, ['There were no data points above the minimum threshold of {0} seconds'.format(minimum_value_threshold)]) - - mean_value = mean(metrics) - stddev = sample_standard_deviation(metrics) - - try: - deviation_percent = stddev / float(mean_value) * 100 - except ZeroDivisionError: - # should not be a case for this alert - return (RESULT_STATE_SKIPPED, ["Unable to calculate the standard deviation because the mean value is 0"]) - - # log the AMS request - if logger.isEnabledFor(logging.DEBUG): - logger.debug(""" - AMS request parameters - {0} - AMS response - {1} - Mean - {2} - Standard deviation - {3} - Percentage standard deviation - {4} - """.format(encoded_get_metrics_parameters, data_json, mean_value, stddev, deviation_percent)) - - mean_value_localized = locale.format("%.0f", mean_value, grouping=True) - - variance_value = (deviation_percent / 100.0) * mean_value - variance_value_localized = locale.format("%.0f", variance_value, grouping=True) - - # check for CRITICAL status - if deviation_percent > critical_threshold: - threshold_value = ((critical_threshold / 100.0) * mean_value) - threshold_value_localized = locale.format("%.0f", threshold_value, grouping=True) - - message = DEVIATION_THRESHOLD_MESSAGE.format(variance_value_localized, metric_units, deviation_percent, - mean_value_localized, metric_units, threshold_value_localized, metric_units) - - return (RESULT_STATE_CRITICAL,[message]) - - # check for WARNING status - if deviation_percent > warning_threshold: - threshold_value = ((warning_threshold / 100.0) * mean_value) - threshold_value_localized = locale.format("%.0f", threshold_value, grouping = True) - - message = DEVIATION_THRESHOLD_MESSAGE.format(variance_value_localized, metric_units, deviation_percent, - mean_value_localized, metric_units, threshold_value_localized, metric_units) - - return (RESULT_STATE_WARNING, [message]) - - # return OK status; use the warning threshold as the value to compare against - threshold_value = ((warning_threshold / 100.0) * mean_value) - threshold_value_localized = locale.format("%.0f", threshold_value, grouping = True) - - message = DEVIATION_OK_MESSAGE.format(variance_value_localized, metric_units, warning_threshold, - mean_value_localized, metric_units, threshold_value_localized, metric_units) - - return (RESULT_STATE_OK,[message]) - - -def valid_collector_webapp_address(webapp_address): - if len(webapp_address) == 2 \ - and webapp_address[0] != '127.0.0.1' \ - and webapp_address[1].isdigit(): - return True - - return False - - -def get_jmx(query, connection_timeout): - response = None - - try: - response = urllib2.urlopen(query, timeout=connection_timeout) - json_data = response.read() - return json_data - except Exception: - return {"beans": {}} - finally: - if response is not None: - try: - response.close() - except: - pass - -def _get_ha_state_from_json(string_json): - """ - Searches through the specified JSON string looking for HA state - enumerations. - :param string_json: the string JSON - :return: the value of the HA state (active, standby, etc) - """ - json_data = json.loads(string_json) - jmx_beans = json_data["beans"] - - # look for NameNodeStatus-State first - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=NameNodeStatus" and "State" in jmx_bean: - return jmx_bean["State"] - - # look for FSNamesystem-tag.HAState last - for jmx_bean in jmx_beans: - if "name" not in jmx_bean: - continue - - jmx_bean_name = jmx_bean["name"] - if jmx_bean_name == "Hadoop:service=NameNode,name=FSNamesystem": - return jmx_bean["tag.HAState"] - - -def _coerce_to_integer(value): - """ - Attempts to correctly coerce a value to an integer. For the case of an integer or a float, - this will essentially either NOOP or return a truncated value. If the parameter is a string, - then it will first attempt to be coerced from a integer, and failing that, a float. - :param value: the value to coerce - :return: the coerced value as an integer - """ - try: - return int(value) - except ValueError: - return int(float(value)) http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_upgrade_finalized.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_upgrade_finalized.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_upgrade_finalized.py deleted file mode 100644 index 427f1d1..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/alerts/alert_upgrade_finalized.py +++ /dev/null @@ -1,179 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import urllib2 -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import logging -import traceback - -from resource_management.libraries.functions.curl_krb_request import curl_krb_request -from resource_management.libraries.functions.curl_krb_request import DEFAULT_KERBEROS_KINIT_TIMER_MS -from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER -from resource_management.libraries.functions.curl_krb_request import CONNECTION_TIMEOUT_DEFAULT -from resource_management.core.environment import Environment -from resource_management.libraries.functions.namenode_ha_utils import get_all_namenode_addresses - -NN_HTTP_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.http-address}}' -NN_HTTPS_ADDRESS_KEY = '{{hdfs-site/dfs.namenode.https-address}}' -NN_HTTP_POLICY_KEY = '{{hdfs-site/dfs.http.policy}}' - -HDFS_SITE_KEY = '{{hdfs-site}}' -KERBEROS_KEYTAB = '{{hdfs-site/dfs.web.authentication.kerberos.keytab}}' -KERBEROS_PRINCIPAL = '{{hdfs-site/dfs.web.authentication.kerberos.principal}}' -SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' -SMOKEUSER_KEY = "{{cluster-env/smokeuser}}" -EXECUTABLE_SEARCH_PATHS = '{{kerberos-env/executable_search_paths}}' -logger = logging.getLogger('ambari_alerts') - -def get_tokens(): - """ - Returns a tuple of tokens in the format {{site/property}} that will be used - to build the dictionary passed into execute - - :rtype tuple - """ - return (HDFS_SITE_KEY, NN_HTTP_ADDRESS_KEY, NN_HTTPS_ADDRESS_KEY, NN_HTTP_POLICY_KEY, EXECUTABLE_SEARCH_PATHS, - KERBEROS_KEYTAB, KERBEROS_PRINCIPAL, SECURITY_ENABLED_KEY, SMOKEUSER_KEY) - - -def execute(configurations={}, parameters={}, host_name=None): - """ - Returns a tuple containing the result code and a pre-formatted result label - - Keyword arguments: - configurations : a mapping of configuration key to value - parameters : a mapping of script parameter key to value - host_name : the name of this host where the alert is running - - :type configurations dict - :type parameters dict - :type host_name str - """ - - if configurations is None: - return (('UNKNOWN', ['There were no configurations supplied to the script.'])) - - uri = None - http_policy = 'HTTP_ONLY' - - # hdfs-site is required - if not HDFS_SITE_KEY in configurations: - return 'SKIPPED', ['{0} is a required parameter for the script'.format(HDFS_SITE_KEY)] - - if NN_HTTP_POLICY_KEY in configurations: - http_policy = configurations[NN_HTTP_POLICY_KEY] - - if SMOKEUSER_KEY in configurations: - smokeuser = configurations[SMOKEUSER_KEY] - - executable_paths = None - if EXECUTABLE_SEARCH_PATHS in configurations: - executable_paths = configurations[EXECUTABLE_SEARCH_PATHS] - - security_enabled = False - if SECURITY_ENABLED_KEY in configurations: - security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE' - - kerberos_keytab = None - if KERBEROS_KEYTAB in configurations: - kerberos_keytab = configurations[KERBEROS_KEYTAB] - - kerberos_principal = None - if KERBEROS_PRINCIPAL in configurations: - kerberos_principal = configurations[KERBEROS_PRINCIPAL] - kerberos_principal = kerberos_principal.replace('_HOST', host_name) - - kinit_timer_ms = parameters.get(KERBEROS_KINIT_TIMER_PARAMETER, DEFAULT_KERBEROS_KINIT_TIMER_MS) - - # determine the right URI and whether to use SSL - hdfs_site = configurations[HDFS_SITE_KEY] - - scheme = "https" if http_policy == "HTTPS_ONLY" else "http" - - nn_addresses = get_all_namenode_addresses(hdfs_site) - for nn_address in nn_addresses: - if nn_address.startswith(host_name + ":") or nn_address == host_name: - uri = nn_address - break - if not uri: - return 'SKIPPED', [ - 'NameNode on host {0} not found (namenode adresses = {1})'.format(host_name, ', '.join(nn_addresses))] - - upgrade_finalized_qry = "{0}://{1}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo".format(scheme, uri) - - # start out assuming an OK status - label = None - result_code = "OK" - - try: - if kerberos_principal is not None and kerberos_keytab is not None and security_enabled: - env = Environment.get_instance() - - last_checkpoint_time_response, error_msg, time_millis = curl_krb_request( - env.tmp_dir, kerberos_keytab, - kerberos_principal, upgrade_finalized_qry, "upgrade_finalized_state", executable_paths, False, - "HDFS Upgrade Finalized State", smokeuser, kinit_timer_ms = kinit_timer_ms - ) - - upgrade_finalized_response_json = json.loads(last_checkpoint_time_response) - upgrade_finalized = bool(upgrade_finalized_response_json["beans"][0]["UpgradeFinalized"]) - - else: - upgrade_finalized = bool(get_value_from_jmx(upgrade_finalized_qry, - "UpgradeFinalized")) - - if upgrade_finalized: - label = "HDFS cluster is not in the upgrade state" - result_code = 'OK' - else: - label = "HDFS cluster is not finalized" - result_code = 'CRITICAL' - - except: - label = traceback.format_exc() - result_code = 'UNKNOWN' - - return ((result_code, [label])) - -def get_value_from_jmx(query, jmx_property): - """ - Read property from the jxm endpoint - - :param query: jmx uri path - :param jmx_property: property name to read - :return: jmx property value - - :type query str - :type jmx_property str - """ - response = None - - try: - response = urllib2.urlopen(query, timeout=int(CONNECTION_TIMEOUT_DEFAULT)) - data = response.read() - - data_dict = json.loads(data) - return data_dict["beans"][0][jmx_property] - finally: - if response is not None: - try: - response.close() - except: - pass http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/files/checkWebUI.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/files/checkWebUI.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/files/checkWebUI.py deleted file mode 100644 index ddeb116..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/files/checkWebUI.py +++ /dev/null @@ -1,83 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -import optparse -import httplib -import socket -import ssl - -class TLS1HTTPSConnection(httplib.HTTPSConnection): - """ - Some of python implementations does not work correctly with sslv3 but trying to use it, we need to change protocol to - tls1. - """ - def __init__(self, host, port, **kwargs): - httplib.HTTPSConnection.__init__(self, host, port, **kwargs) - - def connect(self): - sock = socket.create_connection((self.host, self.port), self.timeout) - if getattr(self, '_tunnel_host', None): - self.sock = sock - self._tunnel() - self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, ssl_version=ssl.PROTOCOL_TLSv1) - -def make_connection(host, port, https): - try: - conn = httplib.HTTPConnection(host, port) if not https else httplib.HTTPSConnection(host, port) - conn.request("GET", "/") - return conn.getresponse().status - except ssl.SSLError: - # got ssl error, lets try to use TLS1 protocol, maybe it will work - try: - tls1_conn = TLS1HTTPSConnection(host, port) - tls1_conn.request("GET", "/") - return tls1_conn.getresponse().status - except Exception as e: - print e - finally: - tls1_conn.close() - except Exception as e: - print e - finally: - conn.close() -# -# Main. -# -def main(): - parser = optparse.OptionParser(usage="usage: %prog [options] component ") - parser.add_option("-m", "--hosts", dest="hosts", help="Comma separated hosts list for WEB UI to check it availability") - parser.add_option("-p", "--port", dest="port", help="Port of WEB UI to check it availability") - parser.add_option("-s", "--https", dest="https", help="\"True\" if value of dfs.http.policy is \"HTTPS_ONLY\"") - - (options, args) = parser.parse_args() - - hosts = options.hosts.split(',') - port = options.port - https = options.https - - for host in hosts: - httpCode = make_connection(host, port, https.lower() == "true") - - if httpCode != 200: - print "Cannot access WEB UI on: http://" + host + ":" + port if not https.lower() == "true" else "Cannot access WEB UI on: https://" + host + ":" + port - exit(1) - -if __name__ == "__main__": - main() http://git-wip-us.apache.org/repos/asf/ambari/blob/ab4b864c/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/__init__.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/__init__.py deleted file mode 100644 index 35de4bb..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Ambari Agent - -"""
