http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_client.py new file mode 100644 index 0000000..95d1603 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_client.py @@ -0,0 +1,123 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML +from hdfs import hdfs +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst +from resource_management.core.exceptions import ClientComponentHasNoStatus + +class HdfsClient(Script): + + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + hdfs() + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + def status(self, env): + raise ClientComponentHasNoStatus() + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class HdfsClientDefault(HdfsClient): + + def get_component_name(self): + return "hadoop-client" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-client", params.version) + + def security_status(self, env): + import status_params + env.set_params(status_params) + + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + hdfs_expectations ={} + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML}) + + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + if status_params.hdfs_user_principal or status_params.hdfs_user_keytab: + try: + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + status_params.hdfs_user_keytab, + status_params.hdfs_user_principal, + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + + else: + self.put_structured_out({"securityState": "UNSECURED"}) + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class HdfsClientWindows(HdfsClient): + def install(self, env): + import install_params + self.install_packages(env) + self.configure(env) + +if __name__ == "__main__": + HdfsClient().execute()
http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_datanode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_datanode.py new file mode 100644 index 0000000..1a54be0 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_datanode.py @@ -0,0 +1,84 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os +from resource_management import * +from resource_management.libraries.functions.mounted_dirs_helper import handle_mounted_dirs +from utils import service +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from ambari_commons import OSConst + + +def create_dirs(data_dir): + """ + :param data_dir: The directory to create + :param params: parameters + """ + import params + Directory(data_dir, + create_parents = True, + cd_access="a", + mode=0755, + owner=params.hdfs_user, + group=params.user_group, + ignore_failures=True + ) + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def datanode(action=None): + if action == "configure": + import params + Directory(params.dfs_domain_socket_dir, + create_parents = True, + mode=0751, + owner=params.hdfs_user, + group=params.user_group) + + # handle_mounted_dirs ensures that we don't create dfs data dirs which are temporary unavailable (unmounted), and intended to reside on a different mount. + data_dir_to_mount_file_content = handle_mounted_dirs(create_dirs, params.dfs_data_dirs, params.data_dir_mount_file, params) + # create a history file used by handle_mounted_dirs + File(params.data_dir_mount_file, + owner=params.hdfs_user, + group=params.user_group, + mode=0644, + content=data_dir_to_mount_file_content + ) + + elif action == "start" or action == "stop": + import params + service( + action=action, name="datanode", + user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + elif action == "status": + import status_params + check_process_status(status_params.datanode_pid_file) + + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def datanode(action=None): + if action == "configure": + pass + elif(action == "start" or action == "stop"): + import params + Service(params.datanode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.datanode_win_service_name) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_namenode.py new file mode 100644 index 0000000..6e0f5dc --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_namenode.py @@ -0,0 +1,562 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +import os.path +import time + +from resource_management.core import shell +from resource_management.core.source import Template +from resource_management.core.resources.system import File, Execute, Directory +from resource_management.core.resources.service import Service +from resource_management.libraries.functions import namenode_ha_utils +from resource_management.libraries.functions.decorator import retry +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.resources.execute_hadoop import ExecuteHadoop +from resource_management.libraries.functions import Direction +from ambari_commons import OSCheck, OSConst +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from utils import get_dfsadmin_base_command + +if OSCheck.is_windows_family(): + from resource_management.libraries.functions.windows_service_utils import check_windows_service_status + +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger + +from utils import service, safe_zkfc_op, is_previous_fs_image +from setup_ranger_hdfs import setup_ranger_hdfs, create_ranger_audit_hdfs_directories + +import namenode_upgrade + +def wait_for_safemode_off(hdfs_binary, afterwait_sleep=0, execute_kinit=False): + """ + During NonRolling (aka Express Upgrade), after starting NameNode, which is still in safemode, and then starting + all of the DataNodes, we need for NameNode to receive all of the block reports and leave safemode. + If HA is present, then this command will run individually on each NameNode, which checks for its own address. + """ + import params + + retries = 115 + sleep_seconds = 10 + sleep_minutes = int(sleep_seconds * retries / 60) + + Logger.info("Waiting up to {0} minutes for the NameNode to leave Safemode...".format(sleep_minutes)) + + if params.security_enabled and execute_kinit: + kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") + Execute(kinit_command, user=params.hdfs_user, logoutput=True) + + try: + # Note, this fails if namenode_address isn't prefixed with "params." + + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary, use_specific_namenode=True) + is_namenode_safe_mode_off = dfsadmin_base_command + " -safemode get | grep 'Safe mode is OFF'" + + # Wait up to 30 mins + Execute(is_namenode_safe_mode_off, tries=retries, try_sleep=sleep_seconds, + user=params.hdfs_user, logoutput=True) + + # Wait a bit more since YARN still depends on block reports coming in. + # Also saw intermittent errors with HBASE service check if it was done too soon. + time.sleep(afterwait_sleep) + except Fail: + Logger.error("The NameNode is still in Safemode. Please be careful with commands that need Safemode OFF.") + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, + upgrade_suspended=False, env=None): + + if action is None: + raise Fail('"action" parameter is required for function namenode().') + + if action in ["start", "stop"] and hdfs_binary is None: + raise Fail('"hdfs_binary" parameter is required for function namenode().') + + if action == "configure": + import params + #we need this directory to be present before any action(HA manual steps for + #additional namenode) + create_name_dirs(params.dfs_name_dir) + elif action == "start": + Logger.info("Called service {0} with upgrade_type: {1}".format(action, str(upgrade_type))) + setup_ranger_hdfs(upgrade_type=upgrade_type) + import params + if do_format and not params.hdfs_namenode_format_disabled: + format_namenode() + pass + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + group=params.user_group + ) + + if params.dfs_ha_enabled and \ + params.dfs_ha_namenode_standby is not None and \ + params.hostname == params.dfs_ha_namenode_standby: + # if the current host is the standby NameNode in an HA deployment + # run the bootstrap command, to start the NameNode in standby mode + # this requires that the active NameNode is already up and running, + # so this execute should be re-tried upon failure, up to a timeout + success = bootstrap_standby_namenode(params) + if not success: + raise Fail("Could not bootstrap standby namenode") + + if upgrade_type == "rolling" and params.dfs_ha_enabled: + # Most likely, ZKFC is up since RU will initiate the failover command. However, if that failed, it would have tried + # to kill ZKFC manually, so we need to start it if not already running. + safe_zkfc_op(action, env) + + options = "" + if upgrade_type == "rolling": + if params.upgrade_direction == Direction.UPGRADE: + options = "-rollingUpgrade started" + elif params.upgrade_direction == Direction.DOWNGRADE: + options = "-rollingUpgrade downgrade" + elif upgrade_type == "nonrolling": + is_previous_image_dir = is_previous_fs_image() + Logger.info("Previous file system image dir present is {0}".format(str(is_previous_image_dir))) + + if params.upgrade_direction == Direction.UPGRADE: + options = "-rollingUpgrade started" + elif params.upgrade_direction == Direction.DOWNGRADE: + options = "-rollingUpgrade downgrade" + elif upgrade_type is None and upgrade_suspended is True: + # the rollingUpgrade flag must be passed in during a suspended upgrade when starting NN + if os.path.exists(namenode_upgrade.get_upgrade_in_progress_marker()): + options = "-rollingUpgrade started" + else: + Logger.info("The NameNode upgrade marker file {0} does not exist, yet an upgrade is currently suspended. " + "Assuming that the upgrade of NameNode has not occurred yet.".format(namenode_upgrade.get_upgrade_in_progress_marker())) + + Logger.info("Options for start command are: {0}".format(options)) + + service( + action="start", + name="namenode", + user=params.hdfs_user, + options=options, + create_pid_dir=True, + create_log_dir=True + ) + + if params.security_enabled: + Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), + user = params.hdfs_user) + + # ___Scenario___________|_Expected safemode state__|_Wait for safemode OFF____| + # no-HA | ON -> OFF | Yes | + # HA and active | ON -> OFF | Yes | + # HA and standby | no change | No | + # RU with HA on active | ON -> OFF | Yes | + # RU with HA on standby | ON -> OFF | Yes | + # EU with HA on active | ON -> OFF | No | + # EU with HA on standby | ON -> OFF | No | + # EU non-HA | ON -> OFF | No | + + # because we do things like create directories after starting NN, + # the vast majority of the time this should be True - it should only + # be False if this is HA and we are the Standby NN + ensure_safemode_off = True + + # True if this is the only NameNode (non-HA) or if its the Active one in HA + is_active_namenode = True + + if params.dfs_ha_enabled: + Logger.info("Waiting for the NameNode to broadcast whether it is Active or Standby...") + + if is_this_namenode_active() is False: + # we are the STANDBY NN + is_active_namenode = False + + # we are the STANDBY NN and this restart is not part of an upgrade + if upgrade_type is None: + ensure_safemode_off = False + + + # During an Express Upgrade, NameNode will not leave SafeMode until the DataNodes are started, + # so always disable the Safemode check + if upgrade_type == "nonrolling": + ensure_safemode_off = False + + # some informative logging separate from the above logic to keep things a little cleaner + if ensure_safemode_off: + Logger.info("Waiting for this NameNode to leave Safemode due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format( + params.dfs_ha_enabled, is_active_namenode, upgrade_type)) + else: + Logger.info("Skipping Safemode check due to the following conditions: HA: {0}, isActive: {1}, upgradeType: {2}".format( + params.dfs_ha_enabled, is_active_namenode, upgrade_type)) + + + # wait for Safemode to end + if ensure_safemode_off: + wait_for_safemode_off(hdfs_binary) + + # Always run this on the "Active" NN unless Safemode has been ignored + # in the case where safemode was ignored (like during an express upgrade), then + # NN will be in SafeMode and cannot have directories created + if is_active_namenode and ensure_safemode_off: + create_hdfs_directories() + create_ranger_audit_hdfs_directories() + else: + Logger.info("Skipping creation of HDFS directories since this is either not the Active NameNode or we did not wait for Safemode to finish.") + + elif action == "stop": + import params + service( + action="stop", name="namenode", + user=params.hdfs_user + ) + elif action == "status": + import status_params + check_process_status(status_params.namenode_pid_file) + elif action == "decommission": + decommission() + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def namenode(action=None, hdfs_binary=None, do_format=True, upgrade_type=None, + upgrade_suspended=False, env=None): + + if action is None: + raise Fail('"action" parameter is required for function namenode().') + + if action in ["start", "stop"] and hdfs_binary is None: + raise Fail('"hdfs_binary" parameter is required for function namenode().') + + if action == "configure": + pass + elif action == "start": + import params + #TODO: Replace with format_namenode() + namenode_format_marker = os.path.join(params.hadoop_conf_dir,"NN_FORMATTED") + if not os.path.exists(namenode_format_marker): + hadoop_cmd = "cmd /C %s" % (os.path.join(params.hadoop_home, "bin", "hadoop.cmd")) + Execute("%s namenode -format" % (hadoop_cmd)) + open(namenode_format_marker, 'a').close() + Service(params.namenode_win_service_name, action=action) + elif action == "stop": + import params + Service(params.namenode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.namenode_win_service_name) + elif action == "decommission": + decommission() + +def create_name_dirs(directories): + import params + + dirs = directories.split(",") + Directory(dirs, + mode=0755, + owner=params.hdfs_user, + group=params.user_group, + create_parents = True, + cd_access="a", + ) + + +def create_hdfs_directories(): + import params + + params.HdfsResource(params.hdfs_tmp_dir, + type="directory", + action="create_on_execute", + owner=params.hdfs_user, + mode=0777, + ) + params.HdfsResource(params.smoke_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.smoke_user, + mode=params.smoke_hdfs_user_mode, + ) + params.HdfsResource(None, + action="execute", + ) + +def format_namenode(force=None): + import params + + old_mark_dir = params.namenode_formatted_old_mark_dirs + mark_dir = params.namenode_formatted_mark_dirs + dfs_name_dir = params.dfs_name_dir + hdfs_user = params.hdfs_user + hadoop_conf_dir = params.hadoop_conf_dir + + if not params.dfs_ha_enabled: + if force: + ExecuteHadoop('namenode -format', + bin_dir=params.hadoop_bin_dir, + conf_dir=hadoop_conf_dir) + else: + if not is_namenode_formatted(params): + Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"), + user = params.hdfs_user, + path = [params.hadoop_bin_dir] + ) + for m_dir in mark_dir: + Directory(m_dir, + create_parents = True + ) + else: + if params.dfs_ha_namenode_active is not None and \ + params.hostname == params.dfs_ha_namenode_active: + # check and run the format command in the HA deployment scenario + # only format the "active" namenode in an HA deployment + if force: + ExecuteHadoop('namenode -format', + bin_dir=params.hadoop_bin_dir, + conf_dir=hadoop_conf_dir) + else: + nn_name_dirs = params.dfs_name_dir.split(',') + if not is_namenode_formatted(params): + try: + Execute(format("hdfs --config {hadoop_conf_dir} namenode -format -nonInteractive"), + user = params.hdfs_user, + path = [params.hadoop_bin_dir] + ) + except Fail: + # We need to clean-up mark directories, so we can re-run format next time. + for nn_name_dir in nn_name_dirs: + Execute(format("rm -rf {nn_name_dir}/*"), + user = params.hdfs_user, + ) + raise + for m_dir in mark_dir: + Directory(m_dir, + create_parents = True + ) + +def is_namenode_formatted(params): + old_mark_dirs = params.namenode_formatted_old_mark_dirs + mark_dirs = params.namenode_formatted_mark_dirs + nn_name_dirs = params.dfs_name_dir.split(',') + marked = False + # Check if name directories have been marked as formatted + for mark_dir in mark_dirs: + if os.path.isdir(mark_dir): + marked = True + Logger.info(format("{mark_dir} exists. Namenode DFS already formatted")) + + # Ensure that all mark dirs created for all name directories + if marked: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + return marked + + # Move all old format markers to new place + for old_mark_dir in old_mark_dirs: + if os.path.isdir(old_mark_dir): + for mark_dir in mark_dirs: + Execute(('cp', '-ar', old_mark_dir, mark_dir), + sudo = True + ) + marked = True + Directory(old_mark_dir, + action = "delete" + ) + elif os.path.isfile(old_mark_dir): + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True, + ) + Directory(old_mark_dir, + action = "delete" + ) + marked = True + + if marked: + return True + + # Check if name dirs are not empty + for name_dir in nn_name_dirs: + code, out = shell.call(("ls", name_dir)) + dir_exists_and_valid = bool(not code) + + if not dir_exists_and_valid: # situations if disk exists but is crashed at the moment (ls: reading directory ...: Input/output error) + Logger.info(format("NameNode will not be formatted because the directory {name_dir} is missing or cannot be checked for content. {out}")) + return True + + try: + Execute(format("ls {name_dir} | wc -l | grep -q ^0$"), + ) + except Fail: + Logger.info(format("NameNode will not be formatted since {name_dir} exists and contains content")) + return True + + return False + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def decommission(): + import params + + hdfs_user = params.hdfs_user + conf_dir = params.hadoop_conf_dir + user_group = params.user_group + nn_kinit_cmd = params.nn_kinit_cmd + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=hdfs_user, + group=user_group + ) + + if not params.update_exclude_file_only: + Execute(nn_kinit_cmd, + user=hdfs_user + ) + + if params.dfs_ha_enabled: + # due to a bug in hdfs, refreshNodes will not run on both namenodes so we + # need to execute each command scoped to a particular namenode + nn_refresh_cmd = format('dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes') + else: + nn_refresh_cmd = format('dfsadmin -fs {namenode_address} -refreshNodes') + ExecuteHadoop(nn_refresh_cmd, + user=hdfs_user, + conf_dir=conf_dir, + bin_dir=params.hadoop_bin_dir) + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def decommission(): + import params + hdfs_user = params.hdfs_user + conf_dir = params.hadoop_conf_dir + + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=hdfs_user + ) + + if params.dfs_ha_enabled: + # due to a bug in hdfs, refreshNodes will not run on both namenodes so we + # need to execute each command scoped to a particular namenode + nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs hdfs://{namenode_rpc} -refreshNodes') + else: + nn_refresh_cmd = format('cmd /c hadoop dfsadmin -fs {namenode_address} -refreshNodes') + Execute(nn_refresh_cmd, user=hdfs_user) + + +def bootstrap_standby_namenode(params, use_path=False): + mark_dirs = params.namenode_bootstrapped_mark_dirs + bin_path = os.path.join(params.hadoop_bin_dir, '') if use_path else "" + try: + iterations = 50 + bootstrapped = False + bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive") + # Blue print based deployments start both NN in parallel and occasionally + # the first attempt to bootstrap may fail. Depending on how it fails the + # second attempt may not succeed (e.g. it may find the folder and decide that + # bootstrap succeeded). The solution is to call with -force option but only + # during initial start + if params.command_phase == "INITIAL_START": + # force bootstrap in INITIAL_START phase + bootstrap_cmd = format("{bin_path}hdfs namenode -bootstrapStandby -nonInteractive -force") + elif is_namenode_bootstrapped(params): + # Once out of INITIAL_START phase bootstrap only if we couldnt bootstrap during cluster deployment + return True + Logger.info("Boostrapping standby namenode: %s" % (bootstrap_cmd)) + for i in range(iterations): + Logger.info('Try %d out of %d' % (i+1, iterations)) + code, out = shell.call(bootstrap_cmd, logoutput=False, user=params.hdfs_user) + if code == 0: + Logger.info("Standby namenode bootstrapped successfully") + bootstrapped = True + break + elif code == 5: + Logger.info("Standby namenode already bootstrapped") + bootstrapped = True + break + else: + Logger.warning('Bootstrap standby namenode failed with %d error code. Will retry' % (code)) + except Exception as ex: + Logger.error('Bootstrap standby namenode threw an exception. Reason %s' %(str(ex))) + if bootstrapped: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + return bootstrapped + +def is_namenode_bootstrapped(params): + mark_dirs = params.namenode_bootstrapped_mark_dirs + nn_name_dirs = params.dfs_name_dir.split(',') + marked = False + # Check if name directories have been marked as formatted + for mark_dir in mark_dirs: + if os.path.isdir(mark_dir): + marked = True + Logger.info(format("{mark_dir} exists. Standby Namenode already bootstrapped")) + break + + # Ensure that all mark dirs created for all name directories + if marked: + for mark_dir in mark_dirs: + Directory(mark_dir, + create_parents = True + ) + + return marked + + +@retry(times=5, sleep_time=5, backoff_factor=2, err_class=Fail) +def is_this_namenode_active(): + """ + Gets whether the current NameNode is Active. This function will wait until the NameNode is + listed as being either Active or Standby before returning a value. This is to ensure that + that if the other NameNode is Active, we ensure that this NameNode has fully loaded and + registered in the event that the other NameNode is going to be restarted. This prevents + a situation where we detect the other NameNode as Active before this NameNode has fully booted. + If the other Active NameNode is then restarted, there can be a loss of service if this + NameNode has not entered Standby. + """ + import params + + # returns ([('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], []) + # 0 1 2 + # or + # returns ([], [('nn1', 'c6401.ambari.apache.org:50070')], [('nn2', 'c6402.ambari.apache.org:50070')], []) + # 0 1 2 + # + namenode_states = namenode_ha_utils.get_namenode_states(params.hdfs_site, params.security_enabled, + params.hdfs_user, times=5, sleep_time=5, backoff_factor=2) + + # unwraps [('nn1', 'c6401.ambari.apache.org:50070')] + active_namenodes = [] if len(namenode_states[0]) < 1 else namenode_states[0] + + # unwraps [('nn2', 'c6402.ambari.apache.org:50070')] + standby_namenodes = [] if len(namenode_states[1]) < 1 else namenode_states[1] + + # check to see if this is the active NameNode + for entry in active_namenodes: + if params.namenode_id in entry: + return True + + # if this is not the active NameNode, then we must wait for it to register as standby + for entry in standby_namenodes: + if params.namenode_id in entry: + return False + + # this this point, this NameNode is neither active nor standby - we must wait to ensure it + # enters at least one of these roles before returning a verdict - the annotation will catch + # this failure and retry the fuction automatically + raise Fail(format("The NameNode {namenode_id} is not listed as Active or Standby, waiting...")) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_nfsgateway.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_nfsgateway.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_nfsgateway.py new file mode 100644 index 0000000..672312a --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_nfsgateway.py @@ -0,0 +1,75 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core.exceptions import Fail +from resource_management.core.logger import Logger +from resource_management.core.resources import Directory +from resource_management.core import shell +from utils import service +import subprocess,os + +# NFS GATEWAY is always started by root using jsvc due to rpcbind bugs +# on Linux such as CentOS6.2. https://bugzilla.redhat.com/show_bug.cgi?id=731542 + +def prepare_rpcbind(): + Logger.info("check if native nfs server is running") + p, output = shell.call("pgrep nfsd") + if p == 0 : + Logger.info("native nfs server is running. shutting it down...") + # shutdown nfs + shell.call("service nfs stop") + shell.call("service nfs-kernel-server stop") + Logger.info("check if the native nfs server is down...") + p, output = shell.call("pgrep nfsd") + if p == 0 : + raise Fail("Failed to shutdown native nfs service") + + Logger.info("check if rpcbind or portmap is running") + p, output = shell.call("pgrep rpcbind") + q, output = shell.call("pgrep portmap") + + if p!=0 and q!=0 : + Logger.info("no portmap or rpcbind running. starting one...") + p, output = shell.call(("service", "rpcbind", "start"), sudo=True) + q, output = shell.call(("service", "portmap", "start"), sudo=True) + if p!=0 and q!=0 : + raise Fail("Failed to start rpcbind or portmap") + + Logger.info("now we are ready to start nfs gateway") + + +def nfsgateway(action=None, format=False): + import params + + if action== "start": + prepare_rpcbind() + + if action == "configure": + Directory(params.nfs_file_dump_dir, + owner = params.hdfs_user, + group = params.user_group, + ) + elif action == "start" or action == "stop": + service( + action=action, + name="nfs3", + user=params.root_user, + create_pid_dir=True, + create_log_dir=True + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_rebalance.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_rebalance.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_rebalance.py new file mode 100644 index 0000000..1dc545e --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_rebalance.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import re + +class HdfsParser(): + def __init__(self): + self.initialLine = None + self.state = None + + def parseLine(self, line): + hdfsLine = HdfsLine() + type, matcher = hdfsLine.recognizeType(line) + if(type == HdfsLine.LineType.HeaderStart): + self.state = 'PROCESS_STARTED' + elif (type == HdfsLine.LineType.Progress): + self.state = 'PROGRESS' + hdfsLine.parseProgressLog(line, matcher) + if(self.initialLine == None): self.initialLine = hdfsLine + + return hdfsLine + elif (type == HdfsLine.LineType.ProgressEnd): + self.state = 'PROCESS_FINISED' + return None + +class HdfsLine(): + + class LineType: + HeaderStart, Progress, ProgressEnd, Unknown = range(4) + + + MEMORY_SUFFIX = ['B','KB','MB','GB','TB','PB','EB'] + MEMORY_PATTERN = '(?P<memmult_%d>(?P<memory_%d>(\d+)(.|,)?(\d+)?) (?P<mult_%d>'+"|".join(MEMORY_SUFFIX)+'))' + + HEADER_BEGIN_PATTERN = re.compile('Time Stamp\w+Iteration#\w+Bytes Already Moved\w+Bytes Left To Move\w+Bytes Being Moved') + PROGRESS_PATTERN = re.compile( + "(?P<date>.*?)\s+" + + "(?P<iteration>\d+)\s+" + + MEMORY_PATTERN % (1,1,1) + "\s+" + + MEMORY_PATTERN % (2,2,2) + "\s+" + + MEMORY_PATTERN % (3,3,3) + ) + PROGRESS_END_PATTERN = re.compile('(The cluster is balanced. Exiting...|The cluster is balanced. Exiting...)') + + def __init__(self): + self.date = None + self.iteration = None + self.bytesAlreadyMoved = None + self.bytesLeftToMove = None + self.bytesBeingMoved = None + self.bytesAlreadyMovedStr = None + self.bytesLeftToMoveStr = None + self.bytesBeingMovedStr = None + + def recognizeType(self, line): + for (type, pattern) in ( + (HdfsLine.LineType.HeaderStart, self.HEADER_BEGIN_PATTERN), + (HdfsLine.LineType.Progress, self.PROGRESS_PATTERN), + (HdfsLine.LineType.ProgressEnd, self.PROGRESS_END_PATTERN) + ): + m = re.match(pattern, line) + if m: + return type, m + return HdfsLine.LineType.Unknown, None + + def parseProgressLog(self, line, m): + ''' + Parse the line of 'hdfs rebalancer' output. The example output being parsed: + + Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved + Jul 28, 2014 5:01:49 PM 0 0 B 5.74 GB 9.79 GB + Jul 28, 2014 5:03:00 PM 1 0 B 5.58 GB 9.79 GB + + Throws AmbariException in case of parsing errors + + ''' + m = re.match(self.PROGRESS_PATTERN, line) + if m: + self.date = m.group('date') + self.iteration = int(m.group('iteration')) + + self.bytesAlreadyMoved = self.parseMemory(m.group('memory_1'), m.group('mult_1')) + self.bytesLeftToMove = self.parseMemory(m.group('memory_2'), m.group('mult_2')) + self.bytesBeingMoved = self.parseMemory(m.group('memory_3'), m.group('mult_3')) + + self.bytesAlreadyMovedStr = m.group('memmult_1') + self.bytesLeftToMoveStr = m.group('memmult_2') + self.bytesBeingMovedStr = m.group('memmult_3') + else: + raise AmbariException("Failed to parse line [%s]") + + def parseMemory(self, memorySize, multiplier_type): + try: + factor = self.MEMORY_SUFFIX.index(multiplier_type) + except ValueError: + raise AmbariException("Failed to memory value [%s %s]" % (memorySize, multiplier_type)) + + return float(memorySize) * (1024 ** factor) + def toJson(self): + return { + 'timeStamp' : self.date, + 'iteration' : self.iteration, + + 'dataMoved': self.bytesAlreadyMovedStr, + 'dataLeft' : self.bytesLeftToMoveStr, + 'dataBeingMoved': self.bytesBeingMovedStr, + + 'bytesMoved': self.bytesAlreadyMoved, + 'bytesLeft' : self.bytesLeftToMove, + 'bytesBeingMoved': self.bytesBeingMoved, + } + def __str__(self): + return "[ date=%s,iteration=%d, bytesAlreadyMoved=%d, bytesLeftToMove=%d, bytesBeingMoved=%d]"%(self.date, self.iteration, self.bytesAlreadyMoved, self.bytesLeftToMove, self.bytesBeingMoved) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_snamenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_snamenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_snamenode.py new file mode 100644 index 0000000..500ed15 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/hdfs_snamenode.py @@ -0,0 +1,64 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from utils import service +from ambari_commons.os_family_impl import OsFamilyImpl, OsFamilyFuncImpl +from ambari_commons import OSConst + +@OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) +def snamenode(action=None, format=False): + if action == "configure": + import params + for fs_checkpoint_dir in params.fs_checkpoint_dirs: + Directory(fs_checkpoint_dir, + create_parents = True, + cd_access="a", + mode=0755, + owner=params.hdfs_user, + group=params.user_group) + File(params.exclude_file_path, + content=Template("exclude_hosts_list.j2"), + owner=params.hdfs_user, + group=params.user_group) + elif action == "start" or action == "stop": + import params + service( + action=action, + name="secondarynamenode", + user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + elif action == "status": + import status_params + check_process_status(status_params.snamenode_pid_file) + + +@OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) +def snamenode(action=None, format=False): + if action == "configure": + pass + elif action == "start" or action == "stop": + import params + Service(params.snamenode_win_service_name, action=action) + elif action == "status": + import status_params + check_windows_service_status(status_params.snamenode_win_service_name) + http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/install_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/install_params.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/install_params.py new file mode 100644 index 0000000..da8de46 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/install_params.py @@ -0,0 +1,38 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" +from ambari_commons import OSCheck + +# These parameters are supposed to be referenced at installation time, before the Hadoop environment variables have been set +if OSCheck.is_windows_family(): + exclude_packages = [] +else: + from resource_management.libraries.functions.default import default + from resource_management.libraries.script.script import Script + + _config = Script.get_config() + stack_version_unformatted = str(_config['hostLevelParams']['stack_version']) + + # The logic for LZO also exists in OOZIE's params.py + io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None) + lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower() + lzo_packages = ["lzo", "hadoop-lzo", "hadoop-lzo-native"] + + exclude_packages = [] + if not lzo_enabled: + exclude_packages += lzo_packages http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode.py new file mode 100644 index 0000000..9c5a124 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode.py @@ -0,0 +1,198 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management import * +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML + +from utils import service +from hdfs import hdfs +import journalnode_upgrade +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst + +class JournalNode(Script): + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class JournalNodeDefault(JournalNode): + + def get_component_name(self): + return "hadoop-hdfs-journalnode" + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-journalnode", params.version) + + def start(self, env, upgrade_type=None): + import params + + env.set_params(params) + self.configure(env) + service( + action="start", name="journalnode", user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + + def post_upgrade_restart(self, env, upgrade_type=None): + if upgrade_type == "nonrolling": + return + + Logger.info("Executing Stack Upgrade post-restart") + import params + env.set_params(params) + journalnode_upgrade.post_upgrade_check() + + def stop(self, env, upgrade_type=None): + import params + + env.set_params(params) + service( + action="stop", name="journalnode", user=params.hdfs_user, + create_pid_dir=True, + create_log_dir=True + ) + + def configure(self, env): + import params + + Directory(params.jn_edits_dir, + create_parents = True, + cd_access="a", + owner=params.hdfs_user, + group=params.user_group + ) + env.set_params(params) + hdfs() + pass + + def status(self, env): + import status_params + + env.set_params(status_params) + check_process_status(status_params.journalnode_pid_file) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + + props_value_check = None + props_empty_check = ['dfs.journalnode.keytab.file', + 'dfs.journalnode.kerberos.principal'] + props_read_check = ['dfs.journalnode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(hdfs_site_expectations) + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML}) + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or + 'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'], + security_params['hdfs-site']['dfs.journalnode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + def get_log_folder(self): + import params + return params.hdfs_log_dir + + def get_user(self): + import params + return params.hdfs_user + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class JournalNodeWindows(JournalNode): + def install(self, env): + import install_params + self.install_packages(env) + + def start(self, env): + import params + self.configure(env) + Service(params.journalnode_win_service_name, action="start") + + def stop(self, env): + import params + Service(params.journalnode_win_service_name, action="stop") + + def configure(self, env): + import params + env.set_params(params) + hdfs("journalnode") + pass + + def status(self, env): + import status_params + env.set_params(status_params) + check_windows_service_status(status_params.journalnode_win_service_name) + +if __name__ == "__main__": + JournalNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode_upgrade.py new file mode 100644 index 0000000..7585107 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/journalnode_upgrade.py @@ -0,0 +1,152 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import time + +from resource_management.core.logger import Logger +from resource_management.core.resources.system import Execute +from resource_management.libraries.functions.default import default +from resource_management.core.exceptions import Fail +import utils +from resource_management.libraries.functions.jmx import get_value_from_jmx +import namenode_ha_state +from namenode_ha_state import NAMENODE_STATE, NamenodeHAState +from utils import get_dfsadmin_base_command + + +def post_upgrade_check(): + """ + Ensure all journal nodes are up and quorum is established during Rolling Upgrade. + :return: + """ + import params + Logger.info("Ensuring Journalnode quorum is established") + + if params.security_enabled: + # We establish HDFS identity instead of JN Kerberos identity + # since this is an administrative HDFS call that requires the HDFS administrator user to perform. + Execute(params.hdfs_kinit_cmd, user=params.hdfs_user) + + time.sleep(5) + hdfs_roll_edits() + time.sleep(5) + + all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", []) + + if len(all_journal_node_hosts) < 3: + raise Fail("Need at least 3 Journalnodes to maintain a quorum") + + try: + namenode_ha = namenode_ha_state.NamenodeHAState() + except ValueError, err: + raise Fail("Could not retrieve Namenode HA addresses. Error: " + str(err)) + + Logger.info(str(namenode_ha)) + nn_address = namenode_ha.get_address(NAMENODE_STATE.ACTIVE) + + nn_data = utils.get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo', + namenode_ha.is_encrypted(), params.security_enabled) + if not nn_data: + raise Fail("Could not retrieve JournalTransactionInfo from JMX") + + try: + last_txn_id = int(nn_data['LastAppliedOrWrittenTxId']) + success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id) + + if not success: + raise Fail("Could not ensure that all Journal nodes have a new log transaction id") + except KeyError: + raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info") + + +def hdfs_roll_edits(): + """ + HDFS_CLIENT needs to be a dependency of JOURNALNODE + Roll the logs so that Namenode will be able to connect to the Journalnode. + Must kinit before calling this command. + """ + import params + + # TODO, this will need to be doc'ed since existing clusters will need HDFS_CLIENT on all JOURNALNODE hosts + dfsadmin_base_command = get_dfsadmin_base_command('hdfs') + command = dfsadmin_base_command + ' -rollEdits' + Execute(command, user=params.hdfs_user, tries=1) + + +def ensure_jns_have_new_txn(nodelist, last_txn_id): + """ + :param nodelist: List of Journalnodes + :param last_txn_id: Integer of last transaction id + :return: Return true on success, false otherwise + """ + import params + + jn_uri = default("/configurations/hdfs-site/dfs.namenode.shared.edits.dir", None) + + if jn_uri is None: + raise Fail("No JournalNode URI found at hdfs-site/dfs.namenode.shared.edits.dir") + + nodes = [] + for node in nodelist: + if node in jn_uri: + nodes.append(node) + + num_of_jns = len(nodes) + actual_txn_ids = {} + jns_updated = 0 + + if params.journalnode_address is None: + raise Fail("Could not retrieve JournalNode address") + + if params.journalnode_port is None: + raise Fail("Could not retrieve JournalNode port") + + time_out_secs = 3 * 60 + step_time_secs = 10 + iterations = int(time_out_secs/step_time_secs) + + protocol = "https" if params.https_only else "http" + + Logger.info("Checking if all JournalNodes are updated.") + for i in range(iterations): + Logger.info('Try %d out of %d' % (i+1, iterations)) + for node in nodes: + # if all JNS are updated break + if jns_updated == num_of_jns: + Logger.info("All journal nodes are updated") + return True + + # JN already meets condition, skip it + if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id: + continue + + url = '%s://%s:%s' % (protocol, node, params.journalnode_port) + data = utils.get_jmx_data(url, 'Journal-', 'LastWrittenTxId', params.https_only, params.security_enabled) + if data: + actual_txn_ids[node] = int(data) + if actual_txn_ids[node] >= last_txn_id: + Logger.info("JournalNode %s has a higher transaction id: %s" % (node, str(data))) + jns_updated += 1 + else: + Logger.info("JournalNode %s is still on transaction id: %s" % (node, str(data))) + + Logger.info("Sleeping for %d secs" % step_time_secs) + time.sleep(step_time_secs) + + return jns_updated == num_of_jns \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode.py new file mode 100644 index 0000000..17d8107 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode.py @@ -0,0 +1,420 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os +import time +import json +import tempfile +from datetime import datetime +import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. + +from resource_management import Script +from resource_management.core.resources.system import Execute, File +from resource_management.core import shell +from resource_management.libraries.functions import conf_select +from resource_management.libraries.functions import stack_select +from resource_management.libraries.functions import Direction +from resource_management.libraries.functions import StackFeature +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ + FILE_TYPE_XML + +from resource_management.core.exceptions import Fail +from resource_management.core.shell import as_user +from resource_management.core.logger import Logger + + +from ambari_commons.os_family_impl import OsFamilyImpl +from ambari_commons import OSConst + + +import namenode_upgrade +from hdfs_namenode import namenode, wait_for_safemode_off +from hdfs import hdfs +import hdfs_rebalance +from utils import initiate_safe_zkfc_failover, get_hdfs_binary, get_dfsadmin_base_command + + + +# hashlib is supplied as of Python 2.5 as the replacement interface for md5 +# and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if +# available, avoiding a deprecation warning under 2.6. Import md5 otherwise, +# preserving 2.4 compatibility. +try: + import hashlib + _md5 = hashlib.md5 +except ImportError: + import md5 + _md5 = md5.new + +class NameNode(Script): + + def get_component_name(self): + return "hadoop-hdfs-namenode" + + def get_hdfs_binary(self): + """ + Get the name or path to the hdfs binary depending on the component name. + """ + component_name = self.get_component_name() + return get_hdfs_binary(component_name) + + def install(self, env): + import params + env.set_params(params) + self.install_packages(env) + #TODO we need this for HA because of manual steps + self.configure(env) + + def configure(self, env): + import params + env.set_params(params) + hdfs("namenode") + hdfs_binary = self.get_hdfs_binary() + namenode(action="configure", hdfs_binary=hdfs_binary, env=env) + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + self.configure(env) + hdfs_binary = self.get_hdfs_binary() + namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, + upgrade_suspended=params.upgrade_suspended, env=env) + + # after starting NN in an upgrade, touch the marker file + if upgrade_type is not None: + # place a file on the system indicating that we've submitting the command that + # instructs NN that it is now part of an upgrade + namenode_upgrade.create_upgrade_marker() + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + hdfs_binary = self.get_hdfs_binary() + if upgrade_type == "rolling" and params.dfs_ha_enabled: + if params.dfs_ha_automatic_failover_enabled: + initiate_safe_zkfc_failover() + else: + raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart") + namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env) + + def status(self, env): + import status_params + env.set_params(status_params) + namenode(action="status", env=env) + + def decommission(self, env): + import params + env.set_params(params) + hdfs_binary = self.get_hdfs_binary() + namenode(action="decommission", hdfs_binary=hdfs_binary) + +@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) +class NameNodeDefault(NameNode): + + def restore_snapshot(self, env): + """ + Restore the snapshot during a Downgrade. + """ + print "TODO AMBARI-12698" + pass + + def prepare_express_upgrade(self, env): + """ + During an Express Upgrade. + If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and + make sure that there is no "/previous" directory. + + Create a list of all the DataNodes in the cluster. + hdfs dfsadmin -report > dfs-old-report-1.log + + hdfs dfsadmin -safemode enter + hdfs dfsadmin -saveNamespace + + Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory. + + Finalize any prior HDFS upgrade, + hdfs dfsadmin -finalizeUpgrade + + Prepare for a NameNode rolling upgrade in order to not lose any data. + hdfs dfsadmin -rollingUpgrade prepare + """ + import params + Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.") + + if params.security_enabled: + kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") + Execute(kinit_command, user=params.hdfs_user, logoutput=True) + + hdfs_binary = self.get_hdfs_binary() + namenode_upgrade.prepare_upgrade_check_for_previous_dir() + namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary) + namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary) + namenode_upgrade.prepare_upgrade_backup_namenode_dir() + namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary) + + # Call -rollingUpgrade prepare + namenode_upgrade.prepare_rolling_upgrade(hdfs_binary) + + def prepare_rolling_upgrade(self, env): + hfds_binary = self.get_hdfs_binary() + namenode_upgrade.prepare_rolling_upgrade(hfds_binary) + + def wait_for_safemode_off(self, env): + wait_for_safemode_off(self.get_hdfs_binary(), 30, True) + + def finalize_non_rolling_upgrade(self, env): + hfds_binary = self.get_hdfs_binary() + namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary) + + def finalize_rolling_upgrade(self, env): + hfds_binary = self.get_hdfs_binary() + namenode_upgrade.finalize_upgrade("rolling", hfds_binary) + + def pre_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade pre-restart") + import params + env.set_params(params) + + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + # When downgrading an Express Upgrade, the first thing we do is to revert the symlinks. + # Therefore, we cannot call this code in that scenario. + call_if = [("rolling", "upgrade"), ("rolling", "downgrade"), ("nonrolling", "upgrade")] + for e in call_if: + if (upgrade_type, params.upgrade_direction) == e: + conf_select.select(params.stack_name, "hadoop", params.version) + stack_select.select("hadoop-hdfs-namenode", params.version) + + def post_upgrade_restart(self, env, upgrade_type=None): + Logger.info("Executing Stack Upgrade post-restart") + import params + env.set_params(params) + + hdfs_binary = self.get_hdfs_binary() + dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) + dfsadmin_cmd = dfsadmin_base_command + " -report -live" + Execute(dfsadmin_cmd, + user=params.hdfs_user, + tries=60, + try_sleep=10 + ) + + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal', + 'dfs.namenode.keytab.file', + 'dfs.namenode.kerberos.principal'] + props_read_check = ['dfs.namenode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + {'core-site.xml': FILE_TYPE_XML, + 'hdfs-site.xml': FILE_TYPE_XML}) + if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ + security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ( 'hdfs-site' not in security_params + or 'dfs.namenode.keytab.file' not in security_params['hdfs-site'] + or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.namenode.keytab.file'], + security_params['hdfs-site']['dfs.namenode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = [] + for cf in result_issues: + issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) + self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + + def rebalancehdfs(self, env): + import params + env.set_params(params) + + name_node_parameters = json.loads( params.name_node_params ) + threshold = name_node_parameters['threshold'] + _print("Starting balancer with threshold = %s\n" % threshold) + + rebalance_env = {'PATH': params.hadoop_bin_dir} + + if params.security_enabled: + # Create the kerberos credentials cache (ccache) file and set it in the environment to use + # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file + # to generate a (relatively) unique cache filename so that we can use it as needed. + # TODO: params.tmp_dir=/var/lib/ambari-agent/tmp. However hdfs user doesn't have access to this path. + # TODO: Hence using /tmp + ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest() + ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name) + rebalance_env['KRB5CCNAME'] = ccache_file_path + + # If there are no tickets in the cache or they are expired, perform a kinit, else use what + # is in the cache + klist_cmd = format("{klist_path_local} -s {ccache_file_path}") + kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}") + if shell.call(klist_cmd, user=params.hdfs_user)[0] != 0: + Execute(kinit_cmd, user=params.hdfs_user) + + def calculateCompletePercent(first, current): + # avoid division by zero + try: + division_result = current.bytesLeftToMove/first.bytesLeftToMove + except ZeroDivisionError: + Logger.warning("Division by zero. Bytes Left To Move = {0}. Return 1.0".format(first.bytesLeftToMove)) + return 1.0 + return 1.0 - division_result + + + def startRebalancingProcess(threshold, rebalance_env): + rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}') + return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env) + + command = startRebalancingProcess(threshold, rebalance_env) + + basedir = os.path.join(env.config.basedir, 'scripts') + if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD + basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator') + command = ['ambari-python-wrap','hdfs-command.py'] + + _print("Executing command %s\n" % command) + + parser = hdfs_rebalance.HdfsParser() + + def handle_new_line(line, is_stderr): + if is_stderr: + return + + _print('[balancer] %s' % (line)) + pl = parser.parseLine(line) + if pl: + res = pl.toJson() + res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) + + self.put_structured_out(res) + elif parser.state == 'PROCESS_FINISED' : + _print('[balancer] %s' % ('Process is finished' )) + self.put_structured_out({'completePercent' : 1}) + return + + Execute(command, + on_new_line = handle_new_line, + logoutput = False, + ) + + if params.security_enabled: + # Delete the kerberos credentials cache (ccache) file + File(ccache_file_path, + action = "delete", + ) + + def get_log_folder(self): + import params + return params.hdfs_log_dir + + def get_user(self): + import params + return params.hdfs_user + +@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) +class NameNodeWindows(NameNode): + def install(self, env): + import install_params + self.install_packages(env) + #TODO we need this for HA because of manual steps + self.configure(env) + + def rebalancehdfs(self, env): + from ambari_commons.os_windows import UserHelper, run_os_command_impersonated + import params + env.set_params(params) + + hdfs_username, hdfs_domain = UserHelper.parse_user_name(params.hdfs_user, ".") + + name_node_parameters = json.loads( params.name_node_params ) + threshold = name_node_parameters['threshold'] + _print("Starting balancer with threshold = %s\n" % threshold) + + def calculateCompletePercent(first, current): + return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove + + def startRebalancingProcess(threshold): + rebalanceCommand = 'hdfs balancer -threshold %s' % threshold + return ['cmd', '/C', rebalanceCommand] + + command = startRebalancingProcess(threshold) + basedir = os.path.join(env.config.basedir, 'scripts') + + _print("Executing command %s\n" % command) + + parser = hdfs_rebalance.HdfsParser() + returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_username, Script.get_password(params.hdfs_user), hdfs_domain) + + for line in stdout.split('\n'): + _print('[balancer] %s %s' % (str(datetime.now()), line )) + pl = parser.parseLine(line) + if pl: + res = pl.toJson() + res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) + + self.put_structured_out(res) + elif parser.state == 'PROCESS_FINISED' : + _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) + self.put_structured_out({'completePercent' : 1}) + break + + if returncode != None and returncode != 0: + raise Fail('Hdfs rebalance process exited with error. See the log output') + +def _print(line): + sys.stdout.write(line) + sys.stdout.flush() + +if __name__ == "__main__": + NameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/8f051fc5/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode_ha_state.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode_ha_state.py new file mode 100644 index 0000000..259af2e --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.2.5/services/HDFS/package/scripts/namenode_ha_state.py @@ -0,0 +1,219 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.core import shell +from resource_management.core.logger import Logger +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions.jmx import get_value_from_jmx + + +class NAMENODE_STATE: + ACTIVE = "active" + STANDBY = "standby" + UNKNOWN = "unknown" + + +class NamenodeHAState: + """ + Represents the current state of the Namenode Hosts in High Availability Mode + """ + + def __init__(self): + """ + Initializes all fields by querying the Namenode state. + Raises a ValueError if unable to construct the object. + """ + import params + + self.name_service = default('/configurations/hdfs-site/dfs.internal.nameservices', None) + if self.name_service is None: + self.name_service = default('/configurations/hdfs-site/dfs.nameservices', None) + + if not self.name_service: + raise ValueError("Could not retrieve property dfs.nameservices or dfs.internal.nameservices") + + nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service) + # List of the nn unique ids + self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None) + if not self.nn_unique_ids: + raise ValueError("Could not retrieve property " + nn_unique_ids_key) + + self.nn_unique_ids = self.nn_unique_ids.split(",") + self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids] + + policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY") + self.encrypted = policy.upper() == "HTTPS_ONLY" + + jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem" + namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}" + namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}" + + # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames + self.namenode_state_to_hostnames = {} + + # Dictionary from nn unique id name to a tuple of (http address, https address) + self.nn_unique_id_to_addresses = {} + for nn_unique_id in self.nn_unique_ids: + http_key = namenode_http_fragment.format(self.name_service, nn_unique_id) + https_key = namenode_https_fragment.format(self.name_service, nn_unique_id) + + http_value = default("/configurations/hdfs-site/" + http_key, None) + https_value = default("/configurations/hdfs-site/" + https_key, None) + actual_value = https_value if self.encrypted else http_value + hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None + + self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value) + try: + if not hostname: + raise Exception("Could not retrieve hostname from address " + actual_value) + + jmx_uri = jmx_uri_fragment.format(actual_value) + state = get_value_from_jmx(jmx_uri, "tag.HAState", params.security_enabled, params.hdfs_user, params.is_https_enabled) + + # If JMX parsing failed + if not state: + run_user = default("/configurations/hadoop-env/hdfs_user", "hdfs") + check_service_cmd = "hdfs haadmin -ns {dfs_ha_nameservices} -getServiceState {0}".format(nn_unique_id) + code, out = shell.call(check_service_cmd, logoutput=True, user=run_user) + if code == 0 and out: + if NAMENODE_STATE.STANDBY in out: + state = NAMENODE_STATE.STANDBY + elif NAMENODE_STATE.ACTIVE in out: + state = NAMENODE_STATE.ACTIVE + + if not state: + raise Exception("Could not retrieve Namenode state from URL " + jmx_uri) + + state = state.lower() + + if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]: + state = NAMENODE_STATE.UNKNOWN + + if state in self.namenode_state_to_hostnames: + self.namenode_state_to_hostnames[state].add(hostname) + else: + hostnames = set([hostname, ]) + self.namenode_state_to_hostnames[state] = hostnames + except: + Logger.error("Could not get namenode state for " + nn_unique_id) + + def __str__(self): + return "Namenode HA State: {\n" + \ + ("IDs: %s\n" % ", ".join(self.nn_unique_ids)) + \ + ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \ + ("States: %s\n" % str(self.namenode_state_to_hostnames)) + \ + ("Encrypted: %s\n" % str(self.encrypted)) + \ + ("Healthy: %s\n" % str(self.is_healthy())) + \ + "}" + + def is_encrypted(self): + """ + :return: Returns a bool indicating if HTTPS is enabled + """ + return self.encrypted + + def get_nn_unique_ids(self): + """ + :return Returns a list of the nn unique ids + """ + return self.nn_unique_ids + + def get_nn_unique_id_to_addresses(self): + """ + :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address) + Each address is of the form, hostname:port + """ + return self.nn_unique_id_to_addresses + + def get_address_for_nn_id(self, id): + """ + :param id: Namenode ID + :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id. + """ + if id in self.nn_unique_id_to_addresses: + addresses = self.nn_unique_id_to_addresses[id] + if addresses and len(addresses) == 2: + return addresses[1] if self.encrypted else addresses[0] + return None + + def get_address_for_host(self, hostname): + """ + :param hostname: Host name + :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host. + """ + for id, addresses in self.nn_unique_id_to_addresses.iteritems(): + if addresses and len(addresses) == 2: + if ":" in addresses[0]: + nn_hostname = addresses[0].split(":")[0].strip() + if nn_hostname == hostname: + # Found the host + return addresses[1] if self.encrypted else addresses[0] + return None + + def get_namenode_state_to_hostnames(self): + """ + :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames. + """ + return self.namenode_state_to_hostnames + + def get_address(self, namenode_state): + """ + @param namenode_state: Member of NAMENODE_STATE + :return Get the address that corresponds to the first host with the given state + """ + hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else [] + if hosts and len(hosts) > 0: + hostname = list(hosts)[0] + return self.get_address_for_host(hostname) + return None + + def is_active(self, host_name): + """ + :param host_name: Host name + :return: Return True if this is the active NameNode, otherwise, False. + """ + return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE) + + def is_standby(self, host_name): + """ + :param host_name: Host name + :return: Return True if this is the standby NameNode, otherwise, False. + """ + return self._is_in_state(host_name, NAMENODE_STATE.STANDBY) + + def _is_in_state(self, host_name, state): + """ + :param host_name: Host name + :param state: State to check + :return: Return True if this NameNode is in the specified state, otherwise, False. + """ + mapping = self.get_namenode_state_to_hostnames() + if state in mapping: + hosts_in_state = mapping[state] + if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower(): + return True + return False + + def is_healthy(self): + """ + :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist. + """ + active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else [] + standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else [] + return len(active_hosts) == 1 and len(standby_hosts) == 1
