http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py new file mode 100755 index 0000000..46bd926 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode.py @@ -0,0 +1,319 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os +import json +import tempfile +from resource_management import * +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from resource_management.libraries.functions.version import compare_versions, \ + format_stack_version +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.core.exceptions import Fail +from resource_management.libraries.functions import get_klist_path + +import namenode_upgrade +from hdfs_namenode import namenode, wait_for_safemode_off +from hdfs import hdfs +import hdfs_rebalance +from utils import failover_namenode, get_dfsadmin_base_command +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst + +# hashlib is supplied as of Python 2.5 as the replacement interface for md5 +# and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if +# available, avoiding a deprecation warning under 2.6. Import md5 otherwise, +# preserving 2.4 compatibility. +try: + import hashlib + _md5 = hashlib.md5 +except ImportError: + import md5 + _md5 = md5.new + +class NameNode(Script): + + def get_component_name(self): + return "hadoop-hdfs-namenode" + + def install(self, env): + import params + + self.install_packages(env) + env.set_params(params) + #TODO we need this for HA because of manual steps + self.configure(env) + + def prepare_rolling_upgrade(self, env): + namenode_upgrade.prepare_rolling_upgrade() + + def finalize_rolling_upgrade(self, env): + namenode_upgrade.finalize_rolling_upgrade() + + def wait_for_safemode_off(self, env): + wait_for_safemode_off(30, True) + + def finalize_non_rolling_upgrade(self, env): + namenode_upgrade.finalize_upgrade("nonrolling") + + def finalize_rolling_upgrade(self, env): + namenode_upgrade.finalize_upgrade("rolling") + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and compare_versions(format_stack_version(params.version), '4.0.0.0') >= 0: + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-namenode", params.version) + #Execute(format("stack-select set hadoop-hdfs-namenode {version}")) + + def start(self, env, upgrade_type=None): + import params + + env.set_params(params) + self.configure(env) + namenode(action="start", upgrade_type=upgrade_type, env=env) + + def post_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade post-restart") + import params + env.set_params(params) + + dfsadmin_base_command = get_dfsadmin_base_command('hdfs') + dfsadmin_cmd = dfsadmin_base_command + " -report -live" + Execute(dfsadmin_cmd, + user=params.hdfs_user, + tries=60, + try_sleep=10 + ) + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + if upgrade_type == "rolling" and params.dfs_ha_enabled: + if params.dfs_ha_automatic_failover_enabled: + failover_namenode() + else: + raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart") + + namenode(action="stop", upgrade_type=upgrade_type, env=env) + + def configure(self, env): + import params + + env.set_params(params) + hdfs() + namenode(action="configure", env=env) + pass + + def status(self, env): + import status_params + + env.set_params(status_params) + check_process_status(status_params.namenode_pid_file) + pass + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal', + 'dfs.namenode.keytab.file', + 'dfs.namenode.kerberos.principal'] + props_read_check = ['dfs.namenode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ( 'hdfs-site' not in security_params + or 'dfs.namenode.keytab.file' not in security_params['hdfs-site'] + or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.namenode.keytab.file'], + security_params['hdfs-site']['dfs.namenode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + + def decommission(self, env): + import params + + env.set_params(params) + namenode(action="decommission") + pass + + + def rebalancehdfs(self, env): + import params + env.set_params(params) + + name_node_parameters = json.loads( params.name_node_params ) + threshold = name_node_parameters['threshold'] + _print("Starting balancer with threshold = %s\n" % threshold) + + rebalance_env = {'PATH': params.hadoop_bin_dir} + + if params.security_enabled: + # Create the kerberos credentials cache (ccache) file and set it in the environment to use + # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file + # to generate a (relatively) unique cache filename so that we can use it as needed. + # TODO: params.tmp_dir=/var/lib/ambari-agent/data/tmp. However hdfs user doesn't have access to this path. + # TODO: Hence using /tmp + ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest() + ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name) + rebalance_env['KRB5CCNAME'] = ccache_file_path + + # If there are no tickets in the cache or they are expired, perform a kinit, else use what + # is in the cache + klist_cmd = format("{klist_path_local} -s {ccache_file_path}") + kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}") + if os.system(klist_cmd) != 0: + Execute(kinit_cmd, user=params.hdfs_user) + + def calculateCompletePercent(first, current): + return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove + + + def startRebalancingProcess(threshold, rebalance_env): + rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}') + return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env) + + command = startRebalancingProcess(threshold, rebalance_env) + + basedir = os.path.join(env.config.basedir, 'scripts') + if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD + basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator') + command = ['python','hdfs-command.py'] + + _print("Executing command %s\n" % command) + + parser = hdfs_rebalance.HdfsParser() + + def handle_new_line(line, is_stderr): + if is_stderr: + return + + _print('[balancer] %s' % (line)) + pl = parser.parseLine(line) + if pl: + res = pl.toJson() + res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) + + self.put_structured_out(res) + elif parser.state == 'PROCESS_FINISHED' : + _print('[balancer] %s' % ('Process is finished' )) + self.put_structured_out({'completePercent' : 1}) + return + + Execute(command, + on_new_line = handle_new_line, + logoutput = False, + ) + + if params.security_enabled and os.path.exists(ccache_file_path): + # Delete the kerberos credentials cache (ccache) file + os.remove(ccache_file_path) + + def prepare_express_upgrade(self, env): + """ + During an Express Upgrade. + If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and + make sure that there is no "/previous" directory. + + Create a list of all the DataNodes in the cluster. + hdfs dfsadmin -report > dfs-old-report-1.log + + hdfs dfsadmin -safemode enter + hdfs dfsadmin -saveNamespace + + Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory. + + Finalize any prior HDFS upgrade, + hdfs dfsadmin -finalizeUpgrade + + Prepare for a NameNode rolling upgrade in order to not lose any data. + hdfs dfsadmin -rollingUpgrade prepare + """ + import params + Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.") + + if params.security_enabled: + kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") + Execute(kinit_command, user=params.hdfs_user, logoutput=True) + + namenode_upgrade.prepare_upgrade_check_for_previous_dir() + namenode_upgrade.prepare_upgrade_enter_safe_mode() + namenode_upgrade.prepare_upgrade_save_namespace() + namenode_upgrade.prepare_upgrade_backup_namenode_dir() + namenode_upgrade.prepare_upgrade_finalize_previous_upgrades() + + # Call -rollingUpgrade prepare + namenode_upgrade.prepare_rolling_upgrade() + +def _print(line): + sys.stdout.write(line) + sys.stdout.flush() + +if __name__ == "__main__": + NameNode().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py new file mode 100755 index 0000000..a5893b6 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_ha_state.py @@ -0,0 +1,205 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + + +from resource_management.core.logger import Logger +from resource_management.libraries.functions.default import default +from utils import get_value_from_jmx + + +class NAMENODE_STATE: + ACTIVE = "active" + STANDBY = "standby" + UNKNOWN = "unknown" + + +class NamenodeHAState: + """ + Represents the current state of the Namenode Hosts in High Availability Mode + """ + + def __init__(self): + """ + Initializes all fields by querying the Namenode state. + Raises a ValueError if unable to construct the object. + """ + import params + + self.name_service = default("/configurations/hdfs-site/dfs.nameservices", None) + if not self.name_service: + raise ValueError("Could not retrieve property dfs.nameservices") + + nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service) + # List of the nn unique ids + self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None) + if not self.nn_unique_ids: + raise ValueError("Could not retrieve property " + nn_unique_ids_key) + + self.nn_unique_ids = self.nn_unique_ids.split(",") + self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids] + + policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY") + self.encrypted = policy.upper() == "HTTPS_ONLY" + + jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus" + namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}" + namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}" + + # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames + self.namenode_state_to_hostnames = {} + + # Dictionary from nn unique id name to a tuple of (http address, https address) + self.nn_unique_id_to_addresses = {} + for nn_unique_id in self.nn_unique_ids: + http_key = namenode_http_fragment.format(self.name_service, nn_unique_id) + https_key = namenode_https_fragment.format(self.name_service, nn_unique_id) + + http_value = default("/configurations/hdfs-site/" + http_key, None) + https_value = default("/configurations/hdfs-site/" + https_key, None) + actual_value = https_value if self.encrypted else http_value + hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None + + self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value) + try: + if not hostname: + raise Exception("Could not retrieve hostname from address " + actual_value) + + jmx_uri = jmx_uri_fragment.format(actual_value) + state = get_value_from_jmx(jmx_uri, "State") + + if not state: + raise Exception("Could not retrieve Namenode state from URL " + jmx_uri) + + state = state.lower() + + if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]: + state = NAMENODE_STATE.UNKNOWN + + if state in self.namenode_state_to_hostnames: + self.namenode_state_to_hostnames[state].add(hostname) + else: + hostnames = set([hostname, ]) + self.namenode_state_to_hostnames[state] = hostnames + except: + Logger.error("Could not get namenode state for " + nn_unique_id) + + def __str__(self): + return "Namenode HA State: {\n" + \ + ("IDs: %s\n" % ", ".join(self.nn_unique_ids)) + \ + ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \ + ("States: %s\n" % str(self.namenode_state_to_hostnames)) + \ + ("Encrypted: %s\n" % str(self.encrypted)) + \ + ("Healthy: %s\n" % str(self.is_healthy())) + \ + "}" + + def is_encrypted(self): + """ + :return: Returns a bool indicating if HTTPS is enabled + """ + return self.encrypted + + def get_nn_unique_ids(self): + """ + :return Returns a list of the nn unique ids + """ + return self.nn_unique_ids + + def get_nn_unique_id_to_addresses(self): + """ + :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address) + Each address is of the form, hostname:port + """ + return self.nn_unique_id_to_addresses + + def get_address_for_nn_id(self, id): + """ + :param id: Namenode ID + :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id. + """ + if id in self.nn_unique_id_to_addresses: + addresses = self.nn_unique_id_to_addresses[id] + if addresses and len(addresses) == 2: + return addresses[1] if self.encrypted else addresses[0] + return None + + def get_address_for_host(self, hostname): + """ + :param hostname: Host name + :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host. + """ + for id, addresses in self.nn_unique_id_to_addresses.iteritems(): + if addresses and len(addresses) == 2: + if ":" in addresses[0]: + nn_hostname = addresses[0].split(":")[0].strip() + if nn_hostname == hostname: + # Found the host + return addresses[1] if self.encrypted else addresses[0] + return None + + def get_namenode_state_to_hostnames(self): + """ + :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames. + """ + return self.namenode_state_to_hostnames + + def get_address(self, namenode_state): + """ + @param namenode_state: Member of NAMENODE_STATE + :return Get the address that corresponds to the first host with the given state + """ + hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else [] + if hosts and len(hosts) > 0: + hostname = list(hosts)[0] + return self.get_address_for_host(hostname) + return None + + def is_active(self, host_name): + """ + :param host_name: Host name + :return: Return True if this is the active NameNode, otherwise, False. + """ + return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE) + + def is_standby(self, host_name): + """ + :param host_name: Host name + :return: Return True if this is the standby NameNode, otherwise, False. + """ + return self._is_in_state(host_name, NAMENODE_STATE.STANDBY) + + def _is_in_state(self, host_name, state): + """ + :param host_name: Host name + :param state: State to check + :return: Return True if this NameNode is in the specified state, otherwise, False. + """ + mapping = self.get_namenode_state_to_hostnames() + if state in mapping: + hosts_in_state = mapping[state] + if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower(): + return True + return False + + def is_healthy(self): + """ + :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist. + """ + active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else [] + standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else [] + return len(active_hosts) == 1 and len(standby_hosts) == 1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py new file mode 100755 index 0000000..5969fcf --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/namenode_upgrade.py @@ -0,0 +1,262 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import re +import os + +from resource_management.core.logger import Logger +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.default import default +from resource_management.core.shell import call +from resource_management.core.shell import as_user +from resource_management.core.exceptions import Fail +from resource_management.libraries.functions import Direction, SafeMode +from resource_management.libraries.functions import get_unique_id_and_date +from resource_management.core.exceptions import Fail +from utils import get_dfsadmin_base_command +from namenode_ha_state import NamenodeHAState + +safemode_to_instruction = {SafeMode.ON: "enter", + SafeMode.OFF: "leave"} + +def prepare_upgrade_check_for_previous_dir(): + """ + During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data. + Check that there is no "previous" folder inside the NameNode Name Dir. + """ + import params + + if params.dfs_ha_enabled: + namenode_ha = NamenodeHAState() + if namenode_ha.is_active(params.hostname): + Logger.info("NameNode High Availability is enabled and this is the Active NameNode.") + + problematic_previous_namenode_dirs = set() + nn_name_dirs = params.dfs_name_dir.split(',') + for nn_dir in nn_name_dirs: + if os.path.isdir(nn_dir): + # Check for a previous folder, which is not allowed. + previous_dir = os.path.join(nn_dir, "previous") + if os.path.isdir(previous_dir): + problematic_previous_namenode_dirs.add(previous_dir) + + if len(problematic_previous_namenode_dirs) > 0: + message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \ + 'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \ + 'NameNode Name Dir(s): {0}\n' \ + '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs)) + Logger.error(message) + raise Fail(message) + +def prepare_upgrade_enter_safe_mode(): + """ + During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode. + """ + import params + + dfsadmin_base_command = get_dfsadmin_base_command('hdfs') + safe_mode_enter_cmd = dfsadmin_base_command + " -safemode enter" + try: + # Safe to call if already in Safe Mode + desired_state = SafeMode.ON + safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, params.dfs_ha_enabled) + Logger.info("Transition successful: {0}, original state: {1}".format(str(safemode_transition_successful), str(original_state))) + if not safemode_transition_successful: + raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state)) + except Exception, e: + message = "Could not enter safemode. Error: {0}. As the HDFS user, call this command: {1}".format(str(e), safe_mode_enter_cmd) + Logger.error(message) + raise Fail(message) + +def prepare_upgrade_save_namespace(): + """ + During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace. + """ + import params + + dfsadmin_base_command = get_dfsadmin_base_command('hdfs') + save_namespace_cmd = dfsadmin_base_command + " -saveNamespace" + try: + Logger.info("Checkpoint the current namespace.") + as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir}) + except Exception, e: + message = format("Could not save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}") + Logger.error(message) + raise Fail(message) + +def prepare_upgrade_backup_namenode_dir(): + """ + During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs. + """ + import params + + i = 0 + failed_paths = [] + nn_name_dirs = params.dfs_name_dir.split(',') + backup_destination_root_dir = "/tmp/upgrades/{0}".format(params.stack_version_unformatted) + if len(nn_name_dirs) > 0: + Logger.info("Backup the NameNode name directory's CURRENT folder.") + for nn_dir in nn_name_dirs: + i += 1 + namenode_current_image = os.path.join(nn_dir, "current") + unique = get_unique_id_and_date() + "_" + str(i) + # Note that /tmp may not be writeable. + backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique) + + if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder): + try: + os.makedirs(backup_current_folder) + Execute(('cp', '-ar', namenode_current_image, backup_current_folder), + sudo=True + ) + except Exception, e: + failed_paths.append(namenode_current_image) + if len(failed_paths) > 0: + Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is " + "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir, + ", ".join(failed_paths))) + +def prepare_upgrade_finalize_previous_upgrades(): + """ + During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress. + """ + import params + + dfsadmin_base_command = get_dfsadmin_base_command('hdfs') + finalize_command = dfsadmin_base_command + " -rollingUpgrade finalize" + try: + Logger.info("Attempt to Finalize if there are any in-progress upgrades. " + "This will return 255 if no upgrades are in progress.") + code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user) + if out: + expected_substring = "there is no rolling upgrade in progress" + if expected_substring not in out.lower(): + Logger.warning('Finalize command did not contain substring: %s' % expected_substring) + else: + Logger.warning("Finalize command did not return any output.") + except Exception, e: + Logger.warning("Ensure no upgrades are in progress.") + + +def reach_safemode_state(user, safemode_state, in_ha): + """ + Enter or leave safemode for the Namenode. + @param user: user to perform action as + @param safemode_state: Desired state of ON or OFF + @param in_ha: bool indicating if Namenode High Availability is enabled + @:return Returns a tuple of (transition success, original state). If no change is needed, the indicator of + success will be True + """ + Logger.info("Prepare to transition into safemode state %s" % safemode_state) + import params + original_state = SafeMode.UNKNOWN + + hostname = params.hostname + safemode_check = format("su - {user} -c 'hdfs dfsadmin -safemode get'") + + grep_pattern = format("Safe mode is {safemode_state} in {hostname}") if in_ha else format("Safe mode is {safemode_state}") + safemode_check_with_grep = format("su - {user} -c 'hdfs dfsadmin -safemode get | grep \"{grep_pattern}\"'") + code, out = call(safemode_check) + Logger.info("Command: %s\nCode: %d." % (safemode_check, code)) + if code == 0 and out is not None: + Logger.info(out) + re_pattern = r"Safe mode is (\S*) in " + hostname.replace(".", "\\.") if in_ha else r"Safe mode is (\S*)" + m = re.search(re_pattern, out, re.IGNORECASE) + if m and len(m.groups()) >= 1: + original_state = m.group(1).upper() + + if original_state == safemode_state: + return (True, original_state) + else: + # Make a transition + command = "hdfs dfsadmin -safemode %s" % (safemode_to_instruction[safemode_state]) + Execute(command, + user=user, + logoutput=True, + path=[params.hadoop_bin_dir]) + + code, out = call(safemode_check_with_grep) + Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check_with_grep, code, out)) + if code == 0: + return (True, original_state) + return (False, original_state) + + +def prepare_rolling_upgrade(): + """ + Perform either an upgrade or a downgrade. + + Rolling Upgrade for HDFS Namenode requires the following. + 0. Namenode must be up + 1. Leave safemode if the safemode status is not OFF + 2. Execute a rolling upgrade "prepare" + 3. Execute a rolling upgrade "query" + """ + import params + + if not params.upgrade_direction or params.upgrade_direction not in [Direction.UPGRADE, Direction.DOWNGRADE]: + raise Fail("Could not retrieve upgrade direction: %s" % str(params.upgrade_direction)) + Logger.info(format("Performing a(n) {params.upgrade_direction} of HDFS")) + + if params.security_enabled: + Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")) + + + if params.upgrade_direction == Direction.UPGRADE: + if params.dfs_ha_enabled: + Logger.info('High Availability is enabled, must leave safemode before calling "-rollingUpgrade prepare"') + desired_state = SafeMode.OFF + safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, SafeMode.OFF, True) + if not safemode_transition_successful: + raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(SafeMode.OFF)) + + prepare = "hdfs dfsadmin -rollingUpgrade prepare" + query = "hdfs dfsadmin -rollingUpgrade query" + Execute(prepare, + user=params.hdfs_user, + logoutput=True) + Execute(query, + user=params.hdfs_user, + logoutput=True) + elif params.upgrade_direction == Direction.DOWNGRADE: + pass + +def finalize_upgrade(upgrade_type): + """ + Finalize the Namenode upgrade, at which point it cannot be downgraded. + :param upgrade_type rolling or nonrolling + """ + Logger.info("Executing Rolling Upgrade finalize") + import params + + if params.security_enabled: + Execute(format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}")) + + finalize_cmd = "hdfs dfsadmin -rollingUpgrade finalize" + query_cmd = "hdfs dfsadmin -rollingUpgrade query" + + Execute(query_cmd, + user=params.hdfs_user, + logoutput=True) + Execute(finalize_cmd, + user=params.hdfs_user, + logoutput=True) + Execute(query_cmd, + user=params.hdfs_user, + logoutput=True) http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py new file mode 100755 index 0000000..ff4778a --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/nfsgateway.py @@ -0,0 +1,138 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.script import Script +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from hdfs_nfsgateway import nfsgateway +from hdfs import hdfs +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions.version import compare_versions, format_stack_version + + +class NFSGateway(Script): + + def get_component_name(self): + return "hadoop-hdfs-nfs3" + + def install(self, env): + import params + + env.set_params(params) + + self.install_packages(env) + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + + if Script.is_stack_greater_or_equal('4.1.0.0'): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-nfs3", params.version) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + self.configure(env) + nfsgateway(action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + nfsgateway(action="stop") + + def configure(self, env): + import params + + env.set_params(params) + hdfs() + nfsgateway(action="configure") + + def status(self, env): + import status_params + + env.set_params(status_params) + + check_process_status(status_params.nfsgateway_pid_file) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['nfs.keytab.file', + 'nfs.kerberos.principal'] + props_read_check = ['nfs.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'nfs.keytab.file' not in security_params['hdfs-site'] or + 'nfs.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['nfs.keytab.file'], + security_params['hdfs-site'][ + 'nfs.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + +if __name__ == "__main__": + NFSGateway().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py new file mode 100755 index 0000000..1f76f80 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/params.py @@ -0,0 +1,326 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.version import format_stack_version, compare_versions +from resource_management.libraries.functions.default import default +from resource_management import * +import status_params +import utils +import os +import itertools +import re + +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() + +stack_name = default("/hostLevelParams/stack_name", None) +upgrade_direction = default("/commandParams/upgrade_direction", None) + +stack_version_unformatted = str(config['hostLevelParams']['stack_version']) +stack_version = format_stack_version(stack_version_unformatted) + +# New Cluster Stack Version that is defined during the RESTART of a Rolling Upgrade +version = default("/commandParams/version", None) + +security_enabled = config['configurations']['cluster-env']['security_enabled'] +hdfs_user = status_params.hdfs_user +root_user = "root" +hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix + +# Some datanode settings +dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None) +dfs_dn_http_addr = default('/configurations/hdfs-site/dfs.datanode.http.address', None) +dfs_dn_https_addr = default('/configurations/hdfs-site/dfs.datanode.https.address', None) +dfs_http_policy = default('/configurations/hdfs-site/dfs.http.policy', None) +dfs_dn_ipc_address = config['configurations']['hdfs-site']['dfs.datanode.ipc.address'] +secure_dn_ports_are_in_use = False + +#hadoop params +mapreduce_libs_path = "/usr/iop/current/hadoop-mapreduce-client/*" +hadoop_libexec_dir = stack_select.get_hadoop_dir("libexec") +hadoop_bin = stack_select.get_hadoop_dir("sbin") +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") +hadoop_home = "/usr/iop/current/hadoop-client" +if not security_enabled: + hadoop_secure_dn_user = '""' +else: + dfs_dn_port = utils.get_port(dfs_dn_addr) + dfs_dn_http_port = utils.get_port(dfs_dn_http_addr) + dfs_dn_https_port = utils.get_port(dfs_dn_https_addr) + # We try to avoid inability to start datanode as a plain user due to usage of root-owned ports + if dfs_http_policy == "HTTPS_ONLY": + secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_https_port) + elif dfs_http_policy == "HTTP_AND_HTTPS": + secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) or utils.is_secure_port(dfs_dn_https_port) + else: # params.dfs_http_policy == "HTTP_ONLY" or not defined: + secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) + if secure_dn_ports_are_in_use: + hadoop_secure_dn_user = hdfs_user + else: + hadoop_secure_dn_user = '""' + +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() +hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure") +hadoop_conf_empty_dir = "/etc/hadoop/conf.empty" +limits_conf_dir = "/etc/security/limits.d" +hadoop_lib_home = stack_select.get_hadoop_dir("lib") +ambari_libs_dir = "/var/lib/ambari-agent/lib" + +#snappy +create_lib_snappy_symlinks = False +snappy_so = "libsnappy.so" +so_target_dir_x86 = format("{hadoop_lib_home}/native/Linux-i386-32") +so_target_dir_x64 = format("{hadoop_lib_home}/native/Linux-amd64-64") +so_target_x86 = format("{so_target_dir_x86}/{snappy_so}") +so_target_x64 = format("{so_target_dir_x64}/{snappy_so}") +so_src_dir_x86 = format("{hadoop_home}/lib") +so_src_dir_x64 = format("{hadoop_home}/lib/native") +so_src_x86 = format("{so_src_dir_x86}/{snappy_so}") +so_src_x64 = format("{so_src_dir_x64}/{snappy_so}") + +execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir +ulimit_cmd = "ulimit -c unlimited ; " + +#security params +smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +falcon_user = config['configurations']['falcon-env']['falcon_user'] + +#exclude file +hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", []) +exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude'] +update_exclude_file_only = default("/commandParams/update_exclude_file_only",False) + +klist_path_local = functions.get_klist_path() +kinit_path_local = functions.get_kinit_path() +#hosts +hostname = config["hostname"] +rm_host = default("/clusterHostInfo/rm_host", []) +slave_hosts = default("/clusterHostInfo/slave_hosts", []) +oozie_servers = default("/clusterHostInfo/oozie_server", []) +hcat_server_hosts = default("/clusterHostInfo/webhcat_server_host", []) +hive_server_host = default("/clusterHostInfo/hive_server_host", []) +hbase_master_hosts = default("/clusterHostInfo/hbase_master_hosts", []) +hs_host = default("/clusterHostInfo/hs_host", []) +jtnode_host = default("/clusterHostInfo/jtnode_host", []) +namenode_host = default("/clusterHostInfo/namenode_host", []) +nm_host = default("/clusterHostInfo/nm_host", []) +ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", []) +journalnode_hosts = default("/clusterHostInfo/journalnode_hosts", []) +zkfc_hosts = default("/clusterHostInfo/zkfc_hosts", []) +falcon_host = default("/clusterHostInfo/falcon_server_hosts", []) + +has_ganglia_server = not len(ganglia_server_hosts) == 0 +has_namenodes = not len(namenode_host) == 0 +has_jobtracker = not len(jtnode_host) == 0 +has_resourcemanager = not len(rm_host) == 0 +has_histroryserver = not len(hs_host) == 0 +has_hbase_masters = not len(hbase_master_hosts) == 0 +has_slaves = not len(slave_hosts) == 0 +has_oozie_server = not len(oozie_servers) == 0 +has_hcat_server_host = not len(hcat_server_hosts) == 0 +has_hive_server_host = not len(hive_server_host) == 0 +has_journalnode_hosts = not len(journalnode_hosts) == 0 +has_zkfc_hosts = not len(zkfc_hosts) == 0 +has_falcon_host = not len(falcon_host) == 0 + + +is_namenode_master = hostname in namenode_host +is_jtnode_master = hostname in jtnode_host +is_rmnode_master = hostname in rm_host +is_hsnode_master = hostname in hs_host +is_hbase_master = hostname in hbase_master_hosts +is_slave = hostname in slave_hosts + +if has_ganglia_server: + ganglia_server_host = ganglia_server_hosts[0] + +#users and groups +yarn_user = config['configurations']['yarn-env']['yarn_user'] +hbase_user = config['configurations']['hbase-env']['hbase_user'] +oozie_user = config['configurations']['oozie-env']['oozie_user'] +webhcat_user = config['configurations']['hive-env']['hcat_user'] +hcat_user = config['configurations']['hive-env']['hcat_user'] +hive_user = config['configurations']['hive-env']['hive_user'] +smoke_user = config['configurations']['cluster-env']['smokeuser'] +smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name'] +mapred_user = config['configurations']['mapred-env']['mapred_user'] +hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None) + +user_group = config['configurations']['cluster-env']['user_group'] +root_group = "root" +proxyuser_group = config['configurations']['hadoop-env']['proxyuser_group'] + +#hadoop params +hdfs_log_dir_prefix = config['configurations']['hadoop-env']['hdfs_log_dir_prefix'] +hadoop_root_logger = config['configurations']['hadoop-env']['hadoop_root_logger'] + +dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path'] +dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path) + +jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir'] + +dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir'] + +namenode_dirs_created_stub_dir = format("{hdfs_log_dir_prefix}/{hdfs_user}") +namenode_dirs_stub_filename = "namenode_dirs_created" + +smoke_hdfs_user_dir = format("/user/{smoke_user}") +smoke_hdfs_user_mode = 0770 + + +hdfs_namenode_formatted_mark_suffix = "/namenode-formatted/" +namenode_formatted_old_mark_dirs = ["/var/run/hadoop/hdfs/namenode-formatted", + format("{hadoop_pid_dir_prefix}/hdfs/namenode/formatted"), + "/var/lib/hdfs/namenode/formatted"] +dfs_name_dirs = dfs_name_dir.split(",") +namenode_formatted_mark_dirs = [] +for dn_dir in dfs_name_dirs: + tmp_mark_dir = format("{dn_dir}{hdfs_namenode_formatted_mark_suffix}") + namenode_formatted_mark_dirs.append(tmp_mark_dir) + +# Use the namenode RPC address if configured, otherwise, fallback to the default file system +namenode_address = None +if 'dfs.namenode.rpc-address' in config['configurations']['hdfs-site']: + namenode_rpcaddress = config['configurations']['hdfs-site']['dfs.namenode.rpc-address'] + namenode_address = format("hdfs://{namenode_rpcaddress}") +else: + namenode_address = config['configurations']['core-site']['fs.defaultFS'] + +fs_checkpoint_dirs = default("/configurations/hdfs-site/dfs.namenode.checkpoint.dir", "").split(',') + +dfs_data_dirs = config['configurations']['hdfs-site']['dfs.datanode.data.dir'] + +data_dir_mount_file = config['configurations']['hadoop-env']['dfs.datanode.data.dir.mount.file'] + +# HDFS High Availability properties +dfs_ha_enabled = False +dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.nameservices", None) +dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None) +dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False) + +# hostname of the active HDFS HA Namenode (only used when HA is enabled) +dfs_ha_namenode_active = default("/configurations/hadoop-env/dfs_ha_initial_namenode_active", None) +# hostname of the standby HDFS HA Namenode (only used when HA is enabled) +dfs_ha_namenode_standby = default("/configurations/hadoop-env/dfs_ha_initial_namenode_standby", None) + +namenode_id = None +namenode_rpc = None + +dfs_ha_namemodes_ids_list = [] +other_namenode_id = None + +if dfs_ha_namenode_ids: + dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",") + dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list) + if dfs_ha_namenode_ids_array_len > 1: + dfs_ha_enabled = True +if dfs_ha_enabled: + for nn_id in dfs_ha_namemodes_ids_list: + nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')] + if hostname in nn_host: + namenode_id = nn_id + namenode_rpc = nn_host + +if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY": + https_only = True + journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None) +else: + https_only = False + journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.http-address', None) + +if journalnode_address: + journalnode_port = journalnode_address.split(":")[1] + + +if security_enabled: + _dn_principal_name = config['configurations']['hdfs-site']['dfs.datanode.kerberos.principal'] + _dn_keytab = config['configurations']['hdfs-site']['dfs.datanode.keytab.file'] + _dn_principal_name = _dn_principal_name.replace('_HOST',hostname.lower()) + + dn_kinit_cmd = format("{kinit_path_local} -kt {_dn_keytab} {_dn_principal_name};") + + _nn_principal_name = config['configurations']['hdfs-site']['dfs.namenode.kerberos.principal'] + _nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file'] + _nn_principal_name = _nn_principal_name.replace('_HOST',hostname.lower()) + + nn_kinit_cmd = format("{kinit_path_local} -kt {_nn_keytab} {_nn_principal_name};") + + _jn_principal_name = default("/configurations/hdfs-site/dfs.journalnode.kerberos.principal", None) + if _jn_principal_name: + _jn_principal_name = _jn_principal_name.replace('_HOST', hostname.lower()) + _jn_keytab = default("/configurations/hdfs-site/dfs.journalnode.keytab.file", None) + jn_kinit_cmd = format("{kinit_path_local} -kt {_jn_keytab} {_jn_principal_name};") +else: + dn_kinit_cmd = "" + nn_kinit_cmd = "" + jn_kinit_cmd = "" + + +hdfs_site = config['configurations']['hdfs-site'] +default_fs = config['configurations']['core-site']['fs.defaultFS'] + +import functools +#create partial functions with common arguments for every HdfsDirectory call +#to create hdfs directory we need to call params.HdfsDirectory in code +HdfsResource = functools.partial( + HdfsResource, + user=hdfs_user, + security_enabled = security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs +) + +io_compression_codecs = config['configurations']['core-site']['io.compression.codecs'] +if not "com.hadoop.compression.lzo" in io_compression_codecs: + exclude_packages = ["lzo", "hadoop-lzo", "hadoop-lzo-native", "liblzo2-2"] +else: + exclude_packages = [] +name_node_params = default("/commandParams/namenode", None) + +#hadoop params +hadoop_env_sh_template = config['configurations']['hadoop-env']['content'] + +#hadoop-env.sh +java_home = config['hostLevelParams']['java_home'] +java_version = int(config['hostLevelParams']['java_version']) + +jsvc_path = "/usr/lib/bigtop-utils" + +hadoop_heapsize = config['configurations']['hadoop-env']['hadoop_heapsize'] +namenode_heapsize = config['configurations']['hadoop-env']['namenode_heapsize'] +namenode_opt_newsize = config['configurations']['hadoop-env']['namenode_opt_newsize'] +namenode_opt_maxnewsize = config['configurations']['hadoop-env']['namenode_opt_maxnewsize'] +namenode_opt_permsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_permsize","128m") +namenode_opt_maxpermsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_maxpermsize","256m") + +jtnode_opt_newsize = "200m" +jtnode_opt_maxnewsize = "200m" +jtnode_heapsize = "1024m" +ttnode_heapsize = "1024m" + +dtnode_heapsize = config['configurations']['hadoop-env']['dtnode_heapsize'] +mapred_pid_dir_prefix = default("/configurations/mapred-env/mapred_pid_dir_prefix","/var/run/hadoop-mapreduce") +mapred_log_dir_prefix = default("/configurations/mapred-env/mapred_log_dir_prefix","/var/log/hadoop-mapreduce") http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py new file mode 100755 index 0000000..ffbe658 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/service_check.py @@ -0,0 +1,119 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * + + +class HdfsServiceCheck(Script): + def service_check(self, env): + import params + + env.set_params(params) + unique = functions.get_unique_id_and_date() + dir = '/tmp' + tmp_file = format("{dir}/{unique}") + + safemode_command = "dfsadmin -safemode get | grep OFF" + + create_dir_cmd = format("fs -mkdir {dir}") + chmod_command = format("fs -chmod 777 {dir}") + test_dir_exists = as_user(format("{hadoop_bin_dir}/hadoop --config {hadoop_conf_dir} fs -test -e {dir}"), params.hdfs_user) + cleanup_cmd = format("fs -rm {tmp_file}") + #cleanup put below to handle retries; if retrying there wil be a stale file + #that needs cleanup; exit code is fn of second command + create_file_cmd = format( + "{cleanup_cmd}; hadoop --config {hadoop_conf_dir} fs -put /etc/passwd {tmp_file}") + test_cmd = format("fs -test -e {tmp_file}") + if params.security_enabled: + Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), + user=params.hdfs_user + ) + ExecuteHadoop(safemode_command, + user=params.hdfs_user, + logoutput=True, + conf_dir=params.hadoop_conf_dir, + try_sleep=3, + tries=20, + bin_dir=params.hadoop_bin_dir + ) + ExecuteHadoop(create_dir_cmd, + user=params.hdfs_user, + logoutput=True, + not_if=test_dir_exists, + conf_dir=params.hadoop_conf_dir, + try_sleep=3, + tries=5, + bin_dir=params.hadoop_bin_dir + ) + ExecuteHadoop(chmod_command, + user=params.hdfs_user, + logoutput=True, + conf_dir=params.hadoop_conf_dir, + try_sleep=3, + tries=5, + bin_dir=params.hadoop_bin_dir + ) + ExecuteHadoop(create_file_cmd, + user=params.hdfs_user, + logoutput=True, + conf_dir=params.hadoop_conf_dir, + try_sleep=3, + tries=5, + bin_dir=params.hadoop_bin_dir + ) + ExecuteHadoop(test_cmd, + user=params.hdfs_user, + logoutput=True, + conf_dir=params.hadoop_conf_dir, + try_sleep=3, + tries=5, + bin_dir=params.hadoop_bin_dir + ) + if params.has_journalnode_hosts: + journalnode_port = params.journalnode_port + checkWebUIFileName = "checkWebUI.py" + checkWebUIFilePath = format("{tmp_dir}/{checkWebUIFileName}") + comma_sep_jn_hosts = ",".join(params.journalnode_hosts) + checkWebUICmd = format("python {checkWebUIFilePath} -m {comma_sep_jn_hosts} -p {journalnode_port} -s {https_only}") + File(checkWebUIFilePath, + content=StaticFile(checkWebUIFileName), + mode=0775) + + Execute(checkWebUICmd, + logoutput=True, + try_sleep=3, + tries=5, + user=params.smoke_user + ) + + if params.is_namenode_master: + if params.has_zkfc_hosts: + pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}") + pid_file = format("{pid_dir}/hadoop-{hdfs_user}-zkfc.pid") + check_zkfc_process_cmd = format( + "ls {pid_file} >/dev/null 2>&1 && ps -p `cat {pid_file}` >/dev/null 2>&1") + Execute(check_zkfc_process_cmd, + logoutput=True, + try_sleep=3, + tries=5 + ) + + +if __name__ == "__main__": + HdfsServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py new file mode 100755 index 0000000..4224a9e --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/snamenode.py @@ -0,0 +1,142 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions.version import compare_versions, format_stack_version +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from resource_management.core.logger import Logger + +from hdfs_snamenode import snamenode +from hdfs import hdfs + +class SNameNode(Script): + + def get_component_name(self): + return "hadoop-hdfs-secondarynamenode" + + def install(self, env): + import params + + env.set_params(params) + + self.install_packages(env) + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and compare_versions(format_stack_version(params.version), '4.0.0.0') >= 0: + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-secondarynamenode", params.version) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + self.configure(env) + snamenode(action="start") + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + snamenode(action="stop") + + def configure(self, env): + import params + + env.set_params(params) + hdfs() + snamenode(action="configure") + + def status(self, env): + import status_params + + env.set_params(status_params) + + check_process_status(status_params.snamenode_pid_file) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.secondary.namenode.kerberos.internal.spnego.principal', + 'dfs.secondary.namenode.keytab.file', + 'dfs.secondary.namenode.kerberos.principal'] + props_read_check = ['dfs.secondary.namenode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.secondary.namenode.keytab.file' not in security_params['hdfs-site'] or + 'dfs.secondary.namenode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.secondary.namenode.keytab.file'], + security_params['hdfs-site'][ + 'dfs.secondary.namenode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + +if __name__ == "__main__": + SNameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py new file mode 100755 index 0000000..f1abf08 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/status_params.py @@ -0,0 +1,42 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from resource_management.libraries.functions import conf_select + +config = Script.get_config() + +hadoop_pid_dir_prefix = config['configurations']['hadoop-env']['hadoop_pid_dir_prefix'] +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] +hadoop_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}") +datanode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-datanode.pid") +namenode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-namenode.pid") +snamenode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-secondarynamenode.pid") +journalnode_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-journalnode.pid") +zkfc_pid_file = format("{hadoop_pid_dir}/hadoop-{hdfs_user}-zkfc.pid") +nfsgateway_pid_file = format("{hadoop_pid_dir_prefix}/root/hadoop_privileged_nfs3.pid") + +# Security related/required params +hostname = config['hostname'] +security_enabled = config['configurations']['cluster-env']['security_enabled'] +hdfs_user_principal = config['configurations']['hadoop-env']['hdfs_principal_name'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() +kinit_path_local = functions.get_kinit_path() +tmp_dir = Script.get_tmp_dir() http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py new file mode 100755 index 0000000..5cd1735 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/services/HDFS/package/scripts/utils.py @@ -0,0 +1,357 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +import re +import urllib2 +import json + +from resource_management import * +from resource_management.libraries.functions.format import format +from resource_management.core.shell import call, checked_call +from resource_management.core.exceptions import ComponentIsNotRunning +from resource_management.libraries.functions.curl_krb_request import curl_krb_request + +from zkfc_slave import ZkfcSlave + +def safe_zkfc_op(action, env): + """ + Idempotent operation on the zkfc process to either start or stop it. + :param action: start or stop + :param env: environment + """ + zkfc = None + if action == "start": + try: + zkfc = ZkfcSlave() + zkfc.status(env) + except ComponentIsNotRunning: + if zkfc: + zkfc.start(env) + + if action == "stop": + try: + zkfc = ZkfcSlave() + zkfc.status(env) + except ComponentIsNotRunning: + pass + else: + if zkfc: + zkfc.stop(env) + + +def failover_namenode(): + """ + Failover the primary namenode by killing zkfc if it exists on this host (assuming this host is the primary). + """ + import params + check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}") + code, out = call(check_service_cmd, logoutput=True, user=params.hdfs_user) + + state = "unknown" + if code == 0 and out: + state = "active" if "active" in out else ("standby" if "standby" in out else state) + Logger.info("Namenode service state: %s" % state) + + if state == "active": + Logger.info("Rolling Upgrade - Initiating namenode failover by killing zkfc on active namenode") + + # Forcefully kill ZKFC on this host to initiate a failover + # If ZKFC is already dead, then potentially this node can still be the active one. + was_zkfc_killed = kill_zkfc(params.hdfs_user) + + # Wait until it transitions to standby + check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby") + + # process may already be down. try one time, then proceed + code, out = call(check_standby_cmd, user=params.hdfs_user, logoutput=True) + Logger.info(format("Rolling Upgrade - check for standby returned {code}")) + + if code == 255 and out: + Logger.info("Rolling Upgrade - namenode is already down") + else: + if was_zkfc_killed: + # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover. + Execute(check_standby_cmd, + user=params.hdfs_user, + tries=50, + try_sleep=6, + logoutput=True) + + else: + Logger.info("Rolling Upgrade - Host %s is the standby namenode." % str(params.hostname)) + + +def kill_zkfc(zkfc_user): + """ + There are two potential methods for failing over the namenode, especially during a Rolling Upgrade. + Option 1. Kill zkfc on primary namenode provided that the secondary is up and has zkfc running on it. + Option 2. Silent failover (not supported as of IOP 4.0.0.0) + :param zkfc_user: User that started the ZKFC process. + :return: Return True if ZKFC was killed, otherwise, false. + """ + import params + if params.dfs_ha_enabled: + zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user) + if zkfc_pid_file: + check_process = format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1") + code, out = call(check_process) + if code == 0: + Logger.debug("ZKFC is running and will be killed to initiate namenode failover.") + kill_command = format("{check_process} && kill -9 `cat {zkfc_pid_file}` > /dev/null 2>&1") + Execute(kill_command) + Execute(format("rm -f {zkfc_pid_file}")) + return True + return False + + +def get_service_pid_file(name, user): + """ + Get the pid file path that was used to start the service by the user. + :param name: Service name + :param user: User that started the service. + :return: PID file path + """ + import params + pid_dir = format("{hadoop_pid_dir_prefix}/{user}") + pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid") + return pid_file + + +def service(action=None, name=None, user=None, options="", create_pid_dir=False, + create_log_dir=False): + """ + :param action: Either "start" or "stop" + :param name: Component name, e.g., "namenode", "datanode", "secondarynamenode", "zkfc" + :param user: User to run the command as + :param options: Additional options to pass to command as a string + :param create_pid_dir: Create PID directory + :param create_log_dir: Crate log file directory + """ + import params + + options = options if options else "" + pid_dir = format("{hadoop_pid_dir_prefix}/{user}") + pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid") + hadoop_env_exports = { + 'HADOOP_LIBEXEC_DIR': params.hadoop_libexec_dir + } + log_dir = format("{hdfs_log_dir_prefix}/{user}") + + # NFS GATEWAY is always started by root using jsvc due to rpcbind bugs + # on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542 + if name == "nfs3" : + pid_file = format("{pid_dir}/hadoop_privileged_nfs3.pid") + custom_export = { + 'HADOOP_PRIVILEGED_NFS_USER': params.hdfs_user, + 'HADOOP_PRIVILEGED_NFS_PID_DIR': pid_dir, + 'HADOOP_PRIVILEGED_NFS_LOG_DIR': log_dir + } + hadoop_env_exports.update(custom_export) + + check_process = format( + "ls {pid_file} >/dev/null 2>&1 &&" + " ps -p `cat {pid_file}` >/dev/null 2>&1") + + # on STOP directories shouldn't be created + # since during stop still old dirs are used (which were created during previous start) + if action != "stop": + if name == "nfs3": + Directory(params.hadoop_pid_dir_prefix, + mode=0755, + owner=params.root_user, + group=params.root_group + ) + else: + Directory(params.hadoop_pid_dir_prefix, + mode=0755, + owner=params.hdfs_user, + group=params.user_group + ) + if create_pid_dir: + Directory(pid_dir, + owner=user, + create_parents=True) + if create_log_dir: + if name == "nfs3": + Directory(log_dir, + mode=0775, + owner=params.root_user, + group=params.user_group) + else: + Directory(log_dir, + owner=user, + create_parents=True) + + if params.security_enabled and name == "datanode": + ## The directory where pid files are stored in the secure data environment. + hadoop_secure_dn_pid_dir = format("{hadoop_pid_dir_prefix}/{hdfs_user}") + hadoop_secure_dn_pid_file = format("{hadoop_secure_dn_pid_dir}/hadoop_secure_dn.pid") + + if params.secure_dn_ports_are_in_use: + user = "root" + pid_file = format( + "{hadoop_pid_dir_prefix}/{hdfs_user}/hadoop-{hdfs_user}-{name}.pid") + + if action == 'stop' and os.path.isfile(hadoop_secure_dn_pid_file): + # We need special handling for this case to handle the situation + # when we configure non-root secure DN and then restart it + # to handle new configs. Otherwise we will not be able to stop + # a running instance + user = "root" + + try: + check_process_status(hadoop_secure_dn_pid_file) + + custom_export = { + 'HADOOP_SECURE_DN_USER': params.hdfs_user + } + hadoop_env_exports.update(custom_export) + + except ComponentIsNotRunning: + pass + + hadoop_daemon = format("{hadoop_bin}/hadoop-daemon.sh") + + if user == "root": + cmd = [hadoop_daemon, "--config", params.hadoop_conf_dir, action, name] + if options: + cmd += [options, ] + daemon_cmd = as_sudo(cmd) + else: + cmd = format("{ulimit_cmd} {hadoop_daemon} --config {hadoop_conf_dir} {action} {name}") + if options: + cmd += " " + options + daemon_cmd = as_user(cmd, user) + + service_is_up = check_process if action == "start" else None + #remove pid file from dead process + File(pid_file, + action="delete", + not_if=check_process + ) + Execute(daemon_cmd, + not_if=service_is_up, + environment=hadoop_env_exports + ) + + if action == "stop": + File(pid_file, + action="delete", + ) + + +def get_value_from_jmx(qry, property): + try: + response = urllib2.urlopen(qry) + data = response.read() + if data: + data_dict = json.loads(data) + return data_dict["beans"][0][property] + except: + return None + +def get_jmx_data(nn_address, modeler_type, metric, encrypted=False, security_enabled=False): + """ + :param nn_address: Namenode Address, e.g., host:port, ** MAY ** be preceded with "http://" or "https://" already. + If not preceded, will use the encrypted param to determine. + :param modeler_type: Modeler type to query using startswith function + :param metric: Metric to return + :return: Return an object representation of the metric, or None if it does not exist + """ + if not nn_address or not modeler_type or not metric: + return None + + nn_address = nn_address.strip() + if not nn_address.startswith("http"): + nn_address = ("https://" if encrypted else "http://") + nn_address + if not nn_address.endswith("/"): + nn_address = nn_address + "/" + + nn_address = nn_address + "jmx" + Logger.info("Retrieve modeler: %s, metric: %s from JMX endpoint %s" % (modeler_type, metric, nn_address)) + + if security_enabled: + import params + data, error_msg, time_millis = curl_krb_request(params.tmp_dir, params.smoke_user_keytab, params.smokeuser_principal, nn_address, + "jn_upgrade", params.kinit_path_local, False, None, params.smoke_user) + else: + data = urllib2.urlopen(nn_address).read() + my_data = None + if data: + data_dict = json.loads(data) + if data_dict: + for el in data_dict['beans']: + if el is not None and el['modelerType'] is not None and el['modelerType'].startswith(modeler_type): + if metric in el: + my_data = el[metric] + if my_data: + my_data = json.loads(str(my_data)) + break + return my_data + +def get_port(address): + """ + Extracts port from the address like 0.0.0.0:1019 + """ + if address is None: + return None + m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address) + if m is not None and len(m.groups()) >= 2: + return int(m.group(2)) + else: + return None + + +def is_secure_port(port): + """ + Returns True if port is root-owned at *nix systems + """ + if port is not None: + return port < 1024 + else: + return False + +def get_dfsadmin_base_command(hdfs_binary, use_specific_namenode = False): + """ + Get the dfsadmin base command constructed using hdfs_binary path and passing namenode address as explicit -fs argument + :param hdfs_binary: path to hdfs binary to use + :param use_specific_namenode: flag if set and Namenode HA is enabled, then the dfsadmin command will use + current namenode's address + :return: the constructed dfsadmin base command + """ + import params + dfsadmin_base_command = "" + if params.dfs_ha_enabled and use_specific_namenode: + dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs hdfs://{params.namenode_rpc}") + else: + dfsadmin_base_command = format("{hdfs_binary} dfsadmin -fs {params.namenode_address}") + return dfsadmin_base_command + +def is_previous_fs_image(): + """ + Return true if there's a previous folder in the HDFS namenode directories. + """ + import params + if params.dfs_name_dir: + nn_name_dirs = params.dfs_name_dir.split(',') + for nn_dir in nn_name_dirs: + prev_dir = os.path.join(nn_dir, "previous") + if os.path.isdir(prev_dir): + return True + return False