http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode_upgrade.py deleted file mode 100644 index 7585107..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/journalnode_upgrade.py +++ /dev/null @@ -1,152 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import time - -from resource_management.core.logger import Logger -from resource_management.core.resources.system import Execute -from resource_management.libraries.functions.default import default -from resource_management.core.exceptions import Fail -import utils -from resource_management.libraries.functions.jmx import get_value_from_jmx -import namenode_ha_state -from namenode_ha_state import NAMENODE_STATE, NamenodeHAState -from utils import get_dfsadmin_base_command - - -def post_upgrade_check(): - """ - Ensure all journal nodes are up and quorum is established during Rolling Upgrade. - :return: - """ - import params - Logger.info("Ensuring Journalnode quorum is established") - - if params.security_enabled: - # We establish HDFS identity instead of JN Kerberos identity - # since this is an administrative HDFS call that requires the HDFS administrator user to perform. - Execute(params.hdfs_kinit_cmd, user=params.hdfs_user) - - time.sleep(5) - hdfs_roll_edits() - time.sleep(5) - - all_journal_node_hosts = default("/clusterHostInfo/journalnode_hosts", []) - - if len(all_journal_node_hosts) < 3: - raise Fail("Need at least 3 Journalnodes to maintain a quorum") - - try: - namenode_ha = namenode_ha_state.NamenodeHAState() - except ValueError, err: - raise Fail("Could not retrieve Namenode HA addresses. Error: " + str(err)) - - Logger.info(str(namenode_ha)) - nn_address = namenode_ha.get_address(NAMENODE_STATE.ACTIVE) - - nn_data = utils.get_jmx_data(nn_address, 'org.apache.hadoop.hdfs.server.namenode.FSNamesystem', 'JournalTransactionInfo', - namenode_ha.is_encrypted(), params.security_enabled) - if not nn_data: - raise Fail("Could not retrieve JournalTransactionInfo from JMX") - - try: - last_txn_id = int(nn_data['LastAppliedOrWrittenTxId']) - success = ensure_jns_have_new_txn(all_journal_node_hosts, last_txn_id) - - if not success: - raise Fail("Could not ensure that all Journal nodes have a new log transaction id") - except KeyError: - raise Fail("JournalTransactionInfo does not have key LastAppliedOrWrittenTxId from JMX info") - - -def hdfs_roll_edits(): - """ - HDFS_CLIENT needs to be a dependency of JOURNALNODE - Roll the logs so that Namenode will be able to connect to the Journalnode. - Must kinit before calling this command. - """ - import params - - # TODO, this will need to be doc'ed since existing clusters will need HDFS_CLIENT on all JOURNALNODE hosts - dfsadmin_base_command = get_dfsadmin_base_command('hdfs') - command = dfsadmin_base_command + ' -rollEdits' - Execute(command, user=params.hdfs_user, tries=1) - - -def ensure_jns_have_new_txn(nodelist, last_txn_id): - """ - :param nodelist: List of Journalnodes - :param last_txn_id: Integer of last transaction id - :return: Return true on success, false otherwise - """ - import params - - jn_uri = default("/configurations/hdfs-site/dfs.namenode.shared.edits.dir", None) - - if jn_uri is None: - raise Fail("No JournalNode URI found at hdfs-site/dfs.namenode.shared.edits.dir") - - nodes = [] - for node in nodelist: - if node in jn_uri: - nodes.append(node) - - num_of_jns = len(nodes) - actual_txn_ids = {} - jns_updated = 0 - - if params.journalnode_address is None: - raise Fail("Could not retrieve JournalNode address") - - if params.journalnode_port is None: - raise Fail("Could not retrieve JournalNode port") - - time_out_secs = 3 * 60 - step_time_secs = 10 - iterations = int(time_out_secs/step_time_secs) - - protocol = "https" if params.https_only else "http" - - Logger.info("Checking if all JournalNodes are updated.") - for i in range(iterations): - Logger.info('Try %d out of %d' % (i+1, iterations)) - for node in nodes: - # if all JNS are updated break - if jns_updated == num_of_jns: - Logger.info("All journal nodes are updated") - return True - - # JN already meets condition, skip it - if node in actual_txn_ids and actual_txn_ids[node] and actual_txn_ids[node] >= last_txn_id: - continue - - url = '%s://%s:%s' % (protocol, node, params.journalnode_port) - data = utils.get_jmx_data(url, 'Journal-', 'LastWrittenTxId', params.https_only, params.security_enabled) - if data: - actual_txn_ids[node] = int(data) - if actual_txn_ids[node] >= last_txn_id: - Logger.info("JournalNode %s has a higher transaction id: %s" % (node, str(data))) - jns_updated += 1 - else: - Logger.info("JournalNode %s is still on transaction id: %s" % (node, str(data))) - - Logger.info("Sleeping for %d secs" % step_time_secs) - time.sleep(step_time_secs) - - return jns_updated == num_of_jns \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode.py deleted file mode 100644 index 86f68e5..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode.py +++ /dev/null @@ -1,424 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import sys -import os -import time -import json -import tempfile -from datetime import datetime -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. - -from resource_management.libraries.script.script import Script -from resource_management.core.resources.system import Execute, File -from resource_management.core import shell -from resource_management.libraries.functions import conf_select -from resource_management.libraries.functions import stack_select -from resource_management.libraries.functions import Direction -from resource_management.libraries.functions import StackFeature -from resource_management.libraries.functions.stack_features import check_stack_feature -from resource_management.libraries.functions.format import format -from resource_management.libraries.functions.security_commons import build_expectations, \ - cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ - FILE_TYPE_XML - -from resource_management.core.exceptions import Fail -from resource_management.core.shell import as_user -from resource_management.core.logger import Logger - - -from ambari_commons.os_family_impl import OsFamilyImpl -from ambari_commons import OSConst - - -import namenode_upgrade -from hdfs_namenode import namenode, wait_for_safemode_off -from hdfs import hdfs -import hdfs_rebalance -from utils import initiate_safe_zkfc_failover, get_hdfs_binary, get_dfsadmin_base_command - - - -# hashlib is supplied as of Python 2.5 as the replacement interface for md5 -# and other secure hashes. In 2.6, md5 is deprecated. Import hashlib if -# available, avoiding a deprecation warning under 2.6. Import md5 otherwise, -# preserving 2.4 compatibility. -try: - import hashlib - _md5 = hashlib.md5 -except ImportError: - import md5 - _md5 = md5.new - -class NameNode(Script): - - def get_component_name(self): - return "hadoop-hdfs-namenode" - - def get_hdfs_binary(self): - """ - Get the name or path to the hdfs binary depending on the component name. - """ - component_name = self.get_component_name() - return get_hdfs_binary(component_name) - - def install(self, env): - import params - env.set_params(params) - self.install_packages(env) - #TODO we need this for HA because of manual steps - self.configure(env) - - def configure(self, env): - import params - env.set_params(params) - hdfs("namenode") - hdfs_binary = self.get_hdfs_binary() - namenode(action="configure", hdfs_binary=hdfs_binary, env=env) - - def start(self, env, upgrade_type=None): - import params - env.set_params(params) - self.configure(env) - hdfs_binary = self.get_hdfs_binary() - namenode(action="start", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, - upgrade_suspended=params.upgrade_suspended, env=env) - - # after starting NN in an upgrade, touch the marker file - if upgrade_type is not None: - # place a file on the system indicating that we've submitting the command that - # instructs NN that it is now part of an upgrade - namenode_upgrade.create_upgrade_marker() - - def stop(self, env, upgrade_type=None): - import params - env.set_params(params) - hdfs_binary = self.get_hdfs_binary() - if upgrade_type == "rolling" and params.dfs_ha_enabled: - if params.dfs_ha_automatic_failover_enabled: - initiate_safe_zkfc_failover() - else: - raise Fail("Rolling Upgrade - dfs.ha.automatic-failover.enabled must be enabled to perform a rolling restart") - namenode(action="stop", hdfs_binary=hdfs_binary, upgrade_type=upgrade_type, env=env) - - def status(self, env): - import status_params - env.set_params(status_params) - namenode(action="status", env=env) - - def decommission(self, env): - import params - env.set_params(params) - hdfs_binary = self.get_hdfs_binary() - namenode(action="decommission", hdfs_binary=hdfs_binary) - -@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) -class NameNodeDefault(NameNode): - - def restore_snapshot(self, env): - """ - Restore the snapshot during a Downgrade. - """ - print "TODO AMBARI-12698" - pass - - def prepare_express_upgrade(self, env): - """ - During an Express Upgrade. - If in HA, on the Active NameNode only, examine the directory dfs.namenode.name.dir and - make sure that there is no "/previous" directory. - - Create a list of all the DataNodes in the cluster. - hdfs dfsadmin -report > dfs-old-report-1.log - - hdfs dfsadmin -safemode enter - hdfs dfsadmin -saveNamespace - - Copy the checkpoint files located in ${dfs.namenode.name.dir}/current into a backup directory. - - Finalize any prior HDFS upgrade, - hdfs dfsadmin -finalizeUpgrade - - Prepare for a NameNode rolling upgrade in order to not lose any data. - hdfs dfsadmin -rollingUpgrade prepare - """ - import params - Logger.info("Preparing the NameNodes for a NonRolling (aka Express) Upgrade.") - - if params.security_enabled: - kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") - Execute(kinit_command, user=params.hdfs_user, logoutput=True) - - hdfs_binary = self.get_hdfs_binary() - namenode_upgrade.prepare_upgrade_check_for_previous_dir() - namenode_upgrade.prepare_upgrade_enter_safe_mode(hdfs_binary) - namenode_upgrade.prepare_upgrade_save_namespace(hdfs_binary) - namenode_upgrade.prepare_upgrade_backup_namenode_dir() - namenode_upgrade.prepare_upgrade_finalize_previous_upgrades(hdfs_binary) - - # Call -rollingUpgrade prepare - namenode_upgrade.prepare_rolling_upgrade(hdfs_binary) - - def prepare_rolling_upgrade(self, env): - hfds_binary = self.get_hdfs_binary() - namenode_upgrade.prepare_rolling_upgrade(hfds_binary) - - def wait_for_safemode_off(self, env): - wait_for_safemode_off(self.get_hdfs_binary(), 30, True) - - def finalize_non_rolling_upgrade(self, env): - hfds_binary = self.get_hdfs_binary() - namenode_upgrade.finalize_upgrade("nonrolling", hfds_binary) - - def finalize_rolling_upgrade(self, env): - hfds_binary = self.get_hdfs_binary() - namenode_upgrade.finalize_upgrade("rolling", hfds_binary) - - def pre_upgrade_restart(self, env, upgrade_type=None): - Logger.info("Executing Stack Upgrade pre-restart") - import params - env.set_params(params) - - if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): - # When downgrading an Express Upgrade, the first thing we do is to revert the symlinks. - # Therefore, we cannot call this code in that scenario. - call_if = [("rolling", "upgrade"), ("rolling", "downgrade"), ("nonrolling", "upgrade")] - for e in call_if: - if (upgrade_type, params.upgrade_direction) == e: - conf_select.select(params.stack_name, "hadoop", params.version) - stack_select.select("hadoop-hdfs-namenode", params.version) - - def post_upgrade_restart(self, env, upgrade_type=None): - Logger.info("Executing Stack Upgrade post-restart") - import params - env.set_params(params) - - hdfs_binary = self.get_hdfs_binary() - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - dfsadmin_cmd = dfsadmin_base_command + " -report -live" - Execute(dfsadmin_cmd, - user=params.hdfs_user, - tries=60, - try_sleep=10 - ) - - def security_status(self, env): - import status_params - - env.set_params(status_params) - props_value_check = {"hadoop.security.authentication": "kerberos", - "hadoop.security.authorization": "true"} - props_empty_check = ["hadoop.security.auth_to_local"] - props_read_check = None - core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, - props_read_check) - props_value_check = None - props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal', - 'dfs.namenode.keytab.file', - 'dfs.namenode.kerberos.principal'] - props_read_check = ['dfs.namenode.keytab.file'] - hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, - props_read_check) - - hdfs_expectations = {} - hdfs_expectations.update(core_site_expectations) - hdfs_expectations.update(hdfs_site_expectations) - - security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, - {'core-site.xml': FILE_TYPE_XML, - 'hdfs-site.xml': FILE_TYPE_XML}) - if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ - security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': - result_issues = validate_security_config_properties(security_params, hdfs_expectations) - if not result_issues: # If all validations passed successfully - try: - # Double check the dict before calling execute - if ( 'hdfs-site' not in security_params - or 'dfs.namenode.keytab.file' not in security_params['hdfs-site'] - or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']): - self.put_structured_out({"securityState": "UNSECURED"}) - self.put_structured_out( - {"securityIssuesFound": "Keytab file or principal are not set property."}) - return - cached_kinit_executor(status_params.kinit_path_local, - status_params.hdfs_user, - security_params['hdfs-site']['dfs.namenode.keytab.file'], - security_params['hdfs-site']['dfs.namenode.kerberos.principal'], - status_params.hostname, - status_params.tmp_dir) - self.put_structured_out({"securityState": "SECURED_KERBEROS"}) - except Exception as e: - self.put_structured_out({"securityState": "ERROR"}) - self.put_structured_out({"securityStateErrorInfo": str(e)}) - else: - issues = [] - for cf in result_issues: - issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) - self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) - self.put_structured_out({"securityState": "UNSECURED"}) - else: - self.put_structured_out({"securityState": "UNSECURED"}) - - def rebalancehdfs(self, env): - import params - env.set_params(params) - - name_node_parameters = json.loads( params.name_node_params ) - threshold = name_node_parameters['threshold'] - _print("Starting balancer with threshold = %s\n" % threshold) - - rebalance_env = {'PATH': params.hadoop_bin_dir} - - if params.security_enabled: - # Create the kerberos credentials cache (ccache) file and set it in the environment to use - # when executing HDFS rebalance command. Use the md5 hash of the combination of the principal and keytab file - # to generate a (relatively) unique cache filename so that we can use it as needed. - # TODO: params.tmp_dir=/var/lib/ambari-agent/tmp. However hdfs user doesn't have access to this path. - # TODO: Hence using /tmp - ccache_file_name = "hdfs_rebalance_cc_" + _md5(format("{hdfs_principal_name}|{hdfs_user_keytab}")).hexdigest() - ccache_file_path = os.path.join(tempfile.gettempdir(), ccache_file_name) - rebalance_env['KRB5CCNAME'] = ccache_file_path - - # If there are no tickets in the cache or they are expired, perform a kinit, else use what - # is in the cache - klist_cmd = format("{klist_path_local} -s {ccache_file_path}") - kinit_cmd = format("{kinit_path_local} -c {ccache_file_path} -kt {hdfs_user_keytab} {hdfs_principal_name}") - if shell.call(klist_cmd, user=params.hdfs_user)[0] != 0: - Execute(kinit_cmd, user=params.hdfs_user) - - def calculateCompletePercent(first, current): - # avoid division by zero - try: - division_result = current.bytesLeftToMove/first.bytesLeftToMove - except ZeroDivisionError: - Logger.warning("Division by zero. Bytes Left To Move = {0}. Return 1.0".format(first.bytesLeftToMove)) - return 1.0 - return 1.0 - division_result - - - def startRebalancingProcess(threshold, rebalance_env): - rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}') - return as_user(rebalanceCommand, params.hdfs_user, env=rebalance_env) - - command = startRebalancingProcess(threshold, rebalance_env) - - basedir = os.path.join(env.config.basedir, 'scripts') - if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD - basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator') - command = ['ambari-python-wrap','hdfs-command.py'] - - _print("Executing command %s\n" % command) - - parser = hdfs_rebalance.HdfsParser() - - def handle_new_line(line, is_stderr): - if is_stderr: - return - - _print('[balancer] %s' % (line)) - pl = parser.parseLine(line) - if pl: - res = pl.toJson() - res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) - - self.put_structured_out(res) - elif parser.state == 'PROCESS_FINISED' : - _print('[balancer] %s' % ('Process is finished' )) - self.put_structured_out({'completePercent' : 1}) - return - - Execute(command, - on_new_line = handle_new_line, - logoutput = False, - ) - - if params.security_enabled: - # Delete the kerberos credentials cache (ccache) file - File(ccache_file_path, - action = "delete", - ) - - def get_log_folder(self): - import params - return params.hdfs_log_dir - - def get_user(self): - import params - return params.hdfs_user - - def get_pid_files(self): - import status_params - return [status_params.namenode_pid_file] - -@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) -class NameNodeWindows(NameNode): - def install(self, env): - import install_params - self.install_packages(env) - #TODO we need this for HA because of manual steps - self.configure(env) - - def rebalancehdfs(self, env): - from ambari_commons.os_windows import UserHelper, run_os_command_impersonated - import params - env.set_params(params) - - hdfs_username, hdfs_domain = UserHelper.parse_user_name(params.hdfs_user, ".") - - name_node_parameters = json.loads( params.name_node_params ) - threshold = name_node_parameters['threshold'] - _print("Starting balancer with threshold = %s\n" % threshold) - - def calculateCompletePercent(first, current): - return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove - - def startRebalancingProcess(threshold): - rebalanceCommand = 'hdfs balancer -threshold %s' % threshold - return ['cmd', '/C', rebalanceCommand] - - command = startRebalancingProcess(threshold) - basedir = os.path.join(env.config.basedir, 'scripts') - - _print("Executing command %s\n" % command) - - parser = hdfs_rebalance.HdfsParser() - returncode, stdout, err = run_os_command_impersonated(' '.join(command), hdfs_username, Script.get_password(params.hdfs_user), hdfs_domain) - - for line in stdout.split('\n'): - _print('[balancer] %s %s' % (str(datetime.now()), line )) - pl = parser.parseLine(line) - if pl: - res = pl.toJson() - res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) - - self.put_structured_out(res) - elif parser.state == 'PROCESS_FINISED' : - _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) - self.put_structured_out({'completePercent' : 1}) - break - - if returncode != None and returncode != 0: - raise Fail('Hdfs rebalance process exited with error. See the log output') - -def _print(line): - sys.stdout.write(line) - sys.stdout.flush() - -if __name__ == "__main__": - NameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_ha_state.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_ha_state.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_ha_state.py deleted file mode 100644 index 259af2e..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_ha_state.py +++ /dev/null @@ -1,219 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -from resource_management.core import shell -from resource_management.core.logger import Logger -from resource_management.libraries.functions.default import default -from resource_management.libraries.functions.jmx import get_value_from_jmx - - -class NAMENODE_STATE: - ACTIVE = "active" - STANDBY = "standby" - UNKNOWN = "unknown" - - -class NamenodeHAState: - """ - Represents the current state of the Namenode Hosts in High Availability Mode - """ - - def __init__(self): - """ - Initializes all fields by querying the Namenode state. - Raises a ValueError if unable to construct the object. - """ - import params - - self.name_service = default('/configurations/hdfs-site/dfs.internal.nameservices', None) - if self.name_service is None: - self.name_service = default('/configurations/hdfs-site/dfs.nameservices', None) - - if not self.name_service: - raise ValueError("Could not retrieve property dfs.nameservices or dfs.internal.nameservices") - - nn_unique_ids_key = "dfs.ha.namenodes." + str(self.name_service) - # List of the nn unique ids - self.nn_unique_ids = default("/configurations/hdfs-site/" + nn_unique_ids_key, None) - if not self.nn_unique_ids: - raise ValueError("Could not retrieve property " + nn_unique_ids_key) - - self.nn_unique_ids = self.nn_unique_ids.split(",") - self.nn_unique_ids = [x.strip() for x in self.nn_unique_ids] - - policy = default("/configurations/hdfs-site/dfs.http.policy", "HTTP_ONLY") - self.encrypted = policy.upper() == "HTTPS_ONLY" - - jmx_uri_fragment = ("https" if self.encrypted else "http") + "://{0}/jmx?qry=Hadoop:service=NameNode,name=FSNamesystem" - namenode_http_fragment = "dfs.namenode.http-address.{0}.{1}" - namenode_https_fragment = "dfs.namenode.https-address.{0}.{1}" - - # Dictionary where the key is the Namenode State (e.g., ACTIVE), and the value is a set of hostnames - self.namenode_state_to_hostnames = {} - - # Dictionary from nn unique id name to a tuple of (http address, https address) - self.nn_unique_id_to_addresses = {} - for nn_unique_id in self.nn_unique_ids: - http_key = namenode_http_fragment.format(self.name_service, nn_unique_id) - https_key = namenode_https_fragment.format(self.name_service, nn_unique_id) - - http_value = default("/configurations/hdfs-site/" + http_key, None) - https_value = default("/configurations/hdfs-site/" + https_key, None) - actual_value = https_value if self.encrypted else http_value - hostname = actual_value.split(":")[0].strip() if actual_value and ":" in actual_value else None - - self.nn_unique_id_to_addresses[nn_unique_id] = (http_value, https_value) - try: - if not hostname: - raise Exception("Could not retrieve hostname from address " + actual_value) - - jmx_uri = jmx_uri_fragment.format(actual_value) - state = get_value_from_jmx(jmx_uri, "tag.HAState", params.security_enabled, params.hdfs_user, params.is_https_enabled) - - # If JMX parsing failed - if not state: - run_user = default("/configurations/hadoop-env/hdfs_user", "hdfs") - check_service_cmd = "hdfs haadmin -ns {dfs_ha_nameservices} -getServiceState {0}".format(nn_unique_id) - code, out = shell.call(check_service_cmd, logoutput=True, user=run_user) - if code == 0 and out: - if NAMENODE_STATE.STANDBY in out: - state = NAMENODE_STATE.STANDBY - elif NAMENODE_STATE.ACTIVE in out: - state = NAMENODE_STATE.ACTIVE - - if not state: - raise Exception("Could not retrieve Namenode state from URL " + jmx_uri) - - state = state.lower() - - if state not in [NAMENODE_STATE.ACTIVE, NAMENODE_STATE.STANDBY]: - state = NAMENODE_STATE.UNKNOWN - - if state in self.namenode_state_to_hostnames: - self.namenode_state_to_hostnames[state].add(hostname) - else: - hostnames = set([hostname, ]) - self.namenode_state_to_hostnames[state] = hostnames - except: - Logger.error("Could not get namenode state for " + nn_unique_id) - - def __str__(self): - return "Namenode HA State: {\n" + \ - ("IDs: %s\n" % ", ".join(self.nn_unique_ids)) + \ - ("Addresses: %s\n" % str(self.nn_unique_id_to_addresses)) + \ - ("States: %s\n" % str(self.namenode_state_to_hostnames)) + \ - ("Encrypted: %s\n" % str(self.encrypted)) + \ - ("Healthy: %s\n" % str(self.is_healthy())) + \ - "}" - - def is_encrypted(self): - """ - :return: Returns a bool indicating if HTTPS is enabled - """ - return self.encrypted - - def get_nn_unique_ids(self): - """ - :return Returns a list of the nn unique ids - """ - return self.nn_unique_ids - - def get_nn_unique_id_to_addresses(self): - """ - :return Returns a dictionary where the key is the nn unique id, and the value is a tuple of (http address, https address) - Each address is of the form, hostname:port - """ - return self.nn_unique_id_to_addresses - - def get_address_for_nn_id(self, id): - """ - :param id: Namenode ID - :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given namenode id. - """ - if id in self.nn_unique_id_to_addresses: - addresses = self.nn_unique_id_to_addresses[id] - if addresses and len(addresses) == 2: - return addresses[1] if self.encrypted else addresses[0] - return None - - def get_address_for_host(self, hostname): - """ - :param hostname: Host name - :return: Returns the appropriate address (HTTP if no encryption, HTTPS otherwise) for the given host. - """ - for id, addresses in self.nn_unique_id_to_addresses.iteritems(): - if addresses and len(addresses) == 2: - if ":" in addresses[0]: - nn_hostname = addresses[0].split(":")[0].strip() - if nn_hostname == hostname: - # Found the host - return addresses[1] if self.encrypted else addresses[0] - return None - - def get_namenode_state_to_hostnames(self): - """ - :return Return a dictionary where the key is a member of NAMENODE_STATE, and the value is a set of hostnames. - """ - return self.namenode_state_to_hostnames - - def get_address(self, namenode_state): - """ - @param namenode_state: Member of NAMENODE_STATE - :return Get the address that corresponds to the first host with the given state - """ - hosts = self.namenode_state_to_hostnames[namenode_state] if namenode_state in self.namenode_state_to_hostnames else [] - if hosts and len(hosts) > 0: - hostname = list(hosts)[0] - return self.get_address_for_host(hostname) - return None - - def is_active(self, host_name): - """ - :param host_name: Host name - :return: Return True if this is the active NameNode, otherwise, False. - """ - return self._is_in_state(host_name, NAMENODE_STATE.ACTIVE) - - def is_standby(self, host_name): - """ - :param host_name: Host name - :return: Return True if this is the standby NameNode, otherwise, False. - """ - return self._is_in_state(host_name, NAMENODE_STATE.STANDBY) - - def _is_in_state(self, host_name, state): - """ - :param host_name: Host name - :param state: State to check - :return: Return True if this NameNode is in the specified state, otherwise, False. - """ - mapping = self.get_namenode_state_to_hostnames() - if state in mapping: - hosts_in_state = mapping[state] - if hosts_in_state is not None and len(hosts_in_state) == 1 and next(iter(hosts_in_state)).lower() == host_name.lower(): - return True - return False - - def is_healthy(self): - """ - :return: Returns a bool indicating if exactly one ACTIVE and one STANDBY host exist. - """ - active_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.ACTIVE] if NAMENODE_STATE.ACTIVE in self.namenode_state_to_hostnames else [] - standby_hosts = self.namenode_state_to_hostnames[NAMENODE_STATE.STANDBY] if NAMENODE_STATE.STANDBY in self.namenode_state_to_hostnames else [] - return len(active_hosts) == 1 and len(standby_hosts) == 1 http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_upgrade.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_upgrade.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_upgrade.py deleted file mode 100644 index f683dcc..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/namenode_upgrade.py +++ /dev/null @@ -1,322 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" -import re -import os - -from resource_management.core.logger import Logger -from resource_management.core.resources.system import Execute -from resource_management.core.resources.system import File -from resource_management.core import shell -from resource_management.core.shell import as_user -from resource_management.core.exceptions import Fail -from resource_management.libraries.functions.format import format -from resource_management.libraries.functions import get_unique_id_and_date -from resource_management.libraries.functions import Direction, SafeMode -from utils import get_dfsadmin_base_command - -from namenode_ha_state import NamenodeHAState - - -safemode_to_instruction = {SafeMode.ON: "enter", - SafeMode.OFF: "leave"} - -NAMENODE_UPGRADE_IN_PROGRESS_MARKER_FILE = "namenode-upgrade-in-progress" - -def prepare_upgrade_check_for_previous_dir(): - """ - During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up some data. - Check that there is no "previous" folder inside the NameNode Name Dir. - """ - import params - - if params.dfs_ha_enabled: - namenode_ha = NamenodeHAState() - if namenode_ha.is_active(params.hostname): - Logger.info("NameNode High Availability is enabled and this is the Active NameNode.") - - problematic_previous_namenode_dirs = set() - nn_name_dirs = params.dfs_name_dir.split(',') - for nn_dir in nn_name_dirs: - if os.path.isdir(nn_dir): - # Check for a previous folder, which is not allowed. - previous_dir = os.path.join(nn_dir, "previous") - if os.path.isdir(previous_dir): - problematic_previous_namenode_dirs.add(previous_dir) - - if len(problematic_previous_namenode_dirs) > 0: - message = 'WARNING. The following NameNode Name Dir(s) have a "previous" folder from an older version.\n' \ - 'Please back it up first, and then delete it, OR Finalize (E.g., "hdfs dfsadmin -finalizeUpgrade").\n' \ - 'NameNode Name Dir(s): {0}\n' \ - '***** Then, retry this step. *****'.format(", ".join(problematic_previous_namenode_dirs)) - Logger.error(message) - raise Fail(message) - -def prepare_upgrade_enter_safe_mode(hdfs_binary): - """ - During a NonRolling (aka Express Upgrade), preparing the NameNode requires first entering Safemode. - :param hdfs_binary: name/path of the HDFS binary to use - """ - import params - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - safe_mode_enter_cmd = dfsadmin_base_command + " -safemode enter" - try: - # Safe to call if already in Safe Mode - desired_state = SafeMode.ON - safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, params.dfs_ha_enabled, hdfs_binary) - Logger.info("Transition successful: {0}, original state: {1}".format(str(safemode_transition_successful), str(original_state))) - if not safemode_transition_successful: - raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state)) - except Exception, e: - message = "Could not enter safemode. Error: {0}. As the HDFS user, call this command: {1}".format(str(e), safe_mode_enter_cmd) - Logger.error(message) - raise Fail(message) - -def prepare_upgrade_save_namespace(hdfs_binary): - """ - During a NonRolling (aka Express Upgrade), preparing the NameNode requires saving the namespace. - :param hdfs_binary: name/path of the HDFS binary to use - """ - import params - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - save_namespace_cmd = dfsadmin_base_command + " -saveNamespace" - try: - Logger.info("Checkpoint the current namespace.") - as_user(save_namespace_cmd, params.hdfs_user, env={'PATH': params.hadoop_bin_dir}) - except Exception, e: - message = format("Could not save the NameSpace. As the HDFS user, call this command: {save_namespace_cmd}") - Logger.error(message) - raise Fail(message) - -def prepare_upgrade_backup_namenode_dir(): - """ - During a NonRolling (aka Express Upgrade), preparing the NameNode requires backing up the NameNode Name Dirs. - """ - import params - - i = 0 - failed_paths = [] - nn_name_dirs = params.dfs_name_dir.split(',') - backup_destination_root_dir = "{0}/{1}".format(params.namenode_backup_dir, params.stack_version_unformatted) - if len(nn_name_dirs) > 0: - Logger.info("Backup the NameNode name directory's CURRENT folder.") - for nn_dir in nn_name_dirs: - i += 1 - namenode_current_image = os.path.join(nn_dir, "current") - unique = get_unique_id_and_date() + "_" + str(i) - # Note that /tmp may not be writeable. - backup_current_folder = "{0}/namenode_{1}/".format(backup_destination_root_dir, unique) - - if os.path.isdir(namenode_current_image) and not os.path.isdir(backup_current_folder): - try: - os.makedirs(backup_current_folder) - Execute(('cp', '-ar', namenode_current_image, backup_current_folder), - sudo=True - ) - except Exception, e: - failed_paths.append(namenode_current_image) - if len(failed_paths) > 0: - Logger.error("Could not backup the NameNode Name Dir(s) to {0}, make sure that the destination path is " - "writeable and copy the directories on your own. Directories: {1}".format(backup_destination_root_dir, - ", ".join(failed_paths))) - -def prepare_upgrade_finalize_previous_upgrades(hdfs_binary): - """ - During a NonRolling (aka Express Upgrade), preparing the NameNode requires Finalizing any upgrades that are in progress. - :param hdfs_binary: name/path of the HDFS binary to use - """ - import params - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - finalize_command = dfsadmin_base_command + " -rollingUpgrade finalize" - try: - Logger.info("Attempt to Finalize if there are any in-progress upgrades. " - "This will return 255 if no upgrades are in progress.") - code, out = shell.checked_call(finalize_command, logoutput=True, user=params.hdfs_user) - if out: - expected_substring = "there is no rolling upgrade in progress" - if expected_substring not in out.lower(): - Logger.warning('Finalize command did not contain substring: %s' % expected_substring) - else: - Logger.warning("Finalize command did not return any output.") - except Exception, e: - Logger.warning("Ensure no upgrades are in progress.") - -def reach_safemode_state(user, safemode_state, in_ha, hdfs_binary): - """ - Enter or leave safemode for the Namenode. - :param user: user to perform action as - :param safemode_state: Desired state of ON or OFF - :param in_ha: bool indicating if Namenode High Availability is enabled - :param hdfs_binary: name/path of the HDFS binary to use - :return: Returns a tuple of (transition success, original state). If no change is needed, the indicator of - success will be True - """ - Logger.info("Prepare to transition into safemode state %s" % safemode_state) - import params - original_state = SafeMode.UNKNOWN - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - safemode_base_command = dfsadmin_base_command + " -safemode " - safemode_check_cmd = safemode_base_command + " get" - - grep_pattern = format("Safe mode is {safemode_state}") - safemode_check_with_grep = format("{safemode_check_cmd} | grep '{grep_pattern}'") - - code, out = shell.call(safemode_check_cmd, user=user, logoutput=True) - Logger.info("Command: %s\nCode: %d." % (safemode_check_cmd, code)) - if code == 0 and out is not None: - Logger.info(out) - re_pattern = r"Safe mode is (\S*)" - Logger.info("Pattern to search: {0}".format(re_pattern)) - m = re.search(re_pattern, out, re.IGNORECASE) - if m and len(m.groups()) >= 1: - original_state = m.group(1).upper() - - if original_state == safemode_state: - return (True, original_state) - else: - # Make a transition - command = safemode_base_command + safemode_to_instruction[safemode_state] - Execute(command, - user=user, - logoutput=True, - path=[params.hadoop_bin_dir]) - - code, out = shell.call(safemode_check_with_grep, user=user) - Logger.info("Command: %s\nCode: %d. Out: %s" % (safemode_check_with_grep, code, out)) - if code == 0: - return (True, original_state) - return (False, original_state) - - -def prepare_rolling_upgrade(hdfs_binary): - """ - This can be called during either Rolling Upgrade or Express Upgrade (aka nonrolling) - - Rolling Upgrade for HDFS Namenode requires the following. - 0. Namenode must be up - 1. If HA: leave safemode if the safemode status is not OFF - 2. Execute a rolling upgrade "prepare" - 3. Execute a rolling upgrade "query" - :param hdfs_binary: name/path of the HDFS binary to use - """ - import params - - if not params.upgrade_direction or params.upgrade_direction not in [Direction.UPGRADE, Direction.DOWNGRADE]: - raise Fail("Could not retrieve upgrade direction: %s" % str(params.upgrade_direction)) - Logger.info(format("Performing a(n) {params.upgrade_direction} of HDFS")) - - if params.security_enabled: - kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") - Execute(kinit_command, user=params.hdfs_user, logoutput=True) - - if params.upgrade_direction == Direction.UPGRADE: - if params.dfs_ha_enabled: - Logger.info('High Availability is enabled, must leave safemode before calling "-rollingUpgrade prepare"') - desired_state = SafeMode.OFF - safemode_transition_successful, original_state = reach_safemode_state(params.hdfs_user, desired_state, True, hdfs_binary) - if not safemode_transition_successful: - raise Fail("Could not transition to safemode state %s. Please check logs to make sure namenode is up." % str(desired_state)) - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - prepare = dfsadmin_base_command + " -rollingUpgrade prepare" - query = dfsadmin_base_command + " -rollingUpgrade query" - Execute(prepare, - user=params.hdfs_user, - logoutput=True) - Execute(query, - user=params.hdfs_user, - logoutput=True) - -def finalize_upgrade(upgrade_type, hdfs_binary): - """ - Finalize the Namenode upgrade, at which point it cannot be downgraded. - :param upgrade_type rolling or nonrolling - :param hdfs_binary: name/path of the HDFS binary to use - """ - Logger.info("Executing Rolling Upgrade finalize") - import params - - if params.security_enabled: - kinit_command = format("{params.kinit_path_local} -kt {params.hdfs_user_keytab} {params.hdfs_principal_name}") - Execute(kinit_command, user=params.hdfs_user, logoutput=True) - - dfsadmin_base_command = get_dfsadmin_base_command(hdfs_binary) - finalize_cmd = dfsadmin_base_command + " -rollingUpgrade finalize" - query_cmd = dfsadmin_base_command + " -rollingUpgrade query" - - Execute(query_cmd, - user=params.hdfs_user, - logoutput=True) - Execute(finalize_cmd, - user=params.hdfs_user, - logoutput=True) - Execute(query_cmd, - user=params.hdfs_user, - logoutput=True) - - # upgrade is finalized; remove the upgrade marker - delete_upgrade_marker() - - -def get_upgrade_in_progress_marker(): - """ - Gets the full path of the file which indicates that NameNode has begun its stack upgrade. - :return: - """ - from resource_management.libraries.script.script import Script - return os.path.join(Script.get_tmp_dir(), NAMENODE_UPGRADE_IN_PROGRESS_MARKER_FILE) - - -def create_upgrade_marker(): - """ - Creates the marker file indicating that NameNode has begun participating in a stack upgrade. - If the file already exists, nothing will be done. This will silently log exceptions on failure. - :return: - """ - # create the marker file which indicates - try: - namenode_upgrade_in_progress_marker = get_upgrade_in_progress_marker() - if not os.path.isfile(namenode_upgrade_in_progress_marker): - File(namenode_upgrade_in_progress_marker) - except: - Logger.warning("Unable to create NameNode upgrade marker file {0}".format(namenode_upgrade_in_progress_marker)) - - -def delete_upgrade_marker(): - """ - Removes the marker file indicating that NameNode has begun participating in a stack upgrade. - If the file does not exist, then nothing will be done. - Failure to remove this file could cause problems with restarts in the future. That's why - checking to see if there is a suspended upgrade is also advised. This function will raise - an exception if the file can't be removed. - :return: - """ - # create the marker file which indicates - try: - namenode_upgrade_in_progress_marker = get_upgrade_in_progress_marker() - if os.path.isfile(namenode_upgrade_in_progress_marker): - File(namenode_upgrade_in_progress_marker, action='delete') - except: - error_message = "Unable to remove NameNode upgrade marker file {0}".format(namenode_upgrade_in_progress_marker) - Logger.error(error_message) - raise Fail(error_message) - http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/nfsgateway.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/nfsgateway.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/nfsgateway.py deleted file mode 100644 index 7ba1f96..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/nfsgateway.py +++ /dev/null @@ -1,151 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -from resource_management.libraries.script import Script -from resource_management.libraries.functions.check_process_status import check_process_status -from resource_management.libraries.functions.security_commons import build_expectations, \ - cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ - FILE_TYPE_XML -from hdfs_nfsgateway import nfsgateway -from hdfs import hdfs -from resource_management.libraries.functions import conf_select -from resource_management.libraries.functions import stack_select -from resource_management.libraries.functions import StackFeature -from resource_management.libraries.functions.stack_features import check_stack_feature - - -class NFSGateway(Script): - - def get_component_name(self): - return "hadoop-hdfs-nfs3" - - def install(self, env): - import params - - env.set_params(params) - - self.install_packages(env) - - def pre_upgrade_restart(self, env, upgrade_type=None): - import params - env.set_params(params) - - if params.stack_version_formatted and check_stack_feature(StackFeature.NFS, params.stack_version_formatted): - conf_select.select(params.stack_name, "hadoop", params.version) - stack_select.select("hadoop-hdfs-nfs3", params.version) - - def start(self, env, upgrade_type=None): - import params - env.set_params(params) - - self.configure(env) - nfsgateway(action="start") - - def stop(self, env, upgrade_type=None): - import params - env.set_params(params) - - nfsgateway(action="stop") - - def configure(self, env): - import params - - env.set_params(params) - hdfs() - nfsgateway(action="configure") - - def status(self, env): - import status_params - - env.set_params(status_params) - - check_process_status(status_params.nfsgateway_pid_file) - - def security_status(self, env): - import status_params - - env.set_params(status_params) - props_value_check = {"hadoop.security.authentication": "kerberos", - "hadoop.security.authorization": "true"} - props_empty_check = ["hadoop.security.auth_to_local"] - props_read_check = None - core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, - props_read_check) - props_value_check = None - props_empty_check = ['nfs.keytab.file', - 'nfs.kerberos.principal'] - props_read_check = ['nfs.keytab.file'] - hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, - props_read_check) - - hdfs_expectations = {} - hdfs_expectations.update(core_site_expectations) - hdfs_expectations.update(hdfs_site_expectations) - - security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, - {'core-site.xml': FILE_TYPE_XML, - 'hdfs-site.xml': FILE_TYPE_XML}) - if 'core-site' in security_params and 'hadoop.security.authentication' in security_params['core-site'] and \ - security_params['core-site']['hadoop.security.authentication'].lower() == 'kerberos': - result_issues = validate_security_config_properties(security_params, hdfs_expectations) - if not result_issues: # If all validations passed successfully - try: - # Double check the dict before calling execute - if ('hdfs-site' not in security_params or - 'nfs.keytab.file' not in security_params['hdfs-site'] or - 'nfs.kerberos.principal' not in security_params['hdfs-site']): - self.put_structured_out({"securityState": "UNSECURED"}) - self.put_structured_out( - {"securityIssuesFound": "Keytab file or principal are not set property."}) - return - - cached_kinit_executor(status_params.kinit_path_local, - status_params.hdfs_user, - security_params['hdfs-site']['nfs.keytab.file'], - security_params['hdfs-site'][ - 'nfs.kerberos.principal'], - status_params.hostname, - status_params.tmp_dir) - self.put_structured_out({"securityState": "SECURED_KERBEROS"}) - except Exception as e: - self.put_structured_out({"securityState": "ERROR"}) - self.put_structured_out({"securityStateErrorInfo": str(e)}) - else: - issues = [] - for cf in result_issues: - issues.append("Configuration file %s did not pass the validation. Reason: %s" % (cf, result_issues[cf])) - self.put_structured_out({"securityIssuesFound": ". ".join(issues)}) - self.put_structured_out({"securityState": "UNSECURED"}) - else: - self.put_structured_out({"securityState": "UNSECURED"}) - - def get_log_folder(self): - import params - return params.hdfs_log_dir - - def get_user(self): - import params - return params.hdfs_user - - def get_pid_files(self): - import status_params - return [status_params.nfsgateway_pid_file] - -if __name__ == "__main__": - NFSGateway().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params.py deleted file mode 100644 index 25231f9..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params.py +++ /dev/null @@ -1,28 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" -from ambari_commons import OSCheck -from resource_management.libraries.functions.default import default - -if OSCheck.is_windows_family(): - from params_windows import * -else: - from params_linux import * - -nfsgateway_heapsize = config['configurations']['hadoop-env']['nfsgateway_heapsize'] -retryAble = default("/commandParams/command_retry_enabled", False) http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_linux.py deleted file mode 100644 index 55544e0..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_linux.py +++ /dev/null @@ -1,527 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import status_params -import utils -import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set. -import os -import re - -from ambari_commons.os_check import OSCheck - -from resource_management.libraries.functions import conf_select -from resource_management.libraries.functions import stack_select -from resource_management.libraries.functions import StackFeature -from resource_management.libraries.functions.stack_features import check_stack_feature -from resource_management.libraries.functions.stack_features import get_stack_feature_version -from resource_management.libraries.functions import format -from resource_management.libraries.functions.version import format_stack_version -from resource_management.libraries.functions.default import default -from resource_management.libraries.functions.expect import expect -from resource_management.libraries.functions import get_klist_path -from resource_management.libraries.functions import get_kinit_path -from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources -from resource_management.libraries.script.script import Script -from resource_management.libraries.resources.hdfs_resource import HdfsResource -from resource_management.libraries.functions.format_jvm_option import format_jvm_option -from resource_management.libraries.functions.get_lzo_packages import get_lzo_packages -from resource_management.libraries.functions.hdfs_utils import is_https_enabled_in_hdfs -from resource_management.libraries.functions import is_empty - - -config = Script.get_config() -tmp_dir = Script.get_tmp_dir() - -stack_name = status_params.stack_name -stack_root = Script.get_stack_root() -upgrade_direction = default("/commandParams/upgrade_direction", None) -stack_version_unformatted = config['hostLevelParams']['stack_version'] -stack_version_formatted = format_stack_version(stack_version_unformatted) -agent_stack_retry_on_unavailability = config['hostLevelParams']['agent_stack_retry_on_unavailability'] -agent_stack_retry_count = expect("/hostLevelParams/agent_stack_retry_count", int) - -# there is a stack upgrade which has not yet been finalized; it's currently suspended -upgrade_suspended = default("roleParams/upgrade_suspended", False) - -# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade -version = default("/commandParams/version", None) - -# The desired role is only available during a Non-Rolling Upgrade in HA. -# The server calculates which of the two NameNodes will be the active, and the other the standby since they -# are started using different commands. -desired_namenode_role = default("/commandParams/desired_namenode_role", None) - -# get the correct version to use for checking stack features -version_for_stack_feature_checks = get_stack_feature_version(config) - -stack_supports_ranger_kerberos = check_stack_feature(StackFeature.RANGER_KERBEROS_SUPPORT, version_for_stack_feature_checks) -stack_supports_ranger_audit_db = check_stack_feature(StackFeature.RANGER_AUDIT_DB_SUPPORT, version_for_stack_feature_checks) - -security_enabled = config['configurations']['cluster-env']['security_enabled'] -hdfs_user = status_params.hdfs_user -root_user = "root" -hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix -namenode_pid_file = status_params.namenode_pid_file -zkfc_pid_file = status_params.zkfc_pid_file -datanode_pid_file = status_params.datanode_pid_file - -# Some datanode settings -dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None) -dfs_dn_http_addr = default('/configurations/hdfs-site/dfs.datanode.http.address', None) -dfs_dn_https_addr = default('/configurations/hdfs-site/dfs.datanode.https.address', None) -dfs_http_policy = default('/configurations/hdfs-site/dfs.http.policy', None) -dfs_dn_ipc_address = config['configurations']['hdfs-site']['dfs.datanode.ipc.address'] -secure_dn_ports_are_in_use = False - -hdfs_tmp_dir = config['configurations']['hadoop-env']['hdfs_tmp_dir'] -namenode_backup_dir = default("/configurations/hadoop-env/namenode_backup_dir", "/tmp/upgrades") - -# hadoop default parameters -mapreduce_libs_path = "/usr/lib/hadoop-mapreduce/*" -hadoop_libexec_dir = stack_select.get_hadoop_dir("libexec") -hadoop_bin = stack_select.get_hadoop_dir("sbin") -hadoop_bin_dir = stack_select.get_hadoop_dir("bin") -hadoop_home = stack_select.get_hadoop_dir("home") -hadoop_secure_dn_user = hdfs_user -hadoop_conf_dir = conf_select.get_hadoop_conf_dir() -hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure") -hadoop_lib_home = stack_select.get_hadoop_dir("lib") - -# hadoop parameters for stacks that support rolling_upgrade -if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): - mapreduce_libs_path = format("{stack_root}/current/hadoop-mapreduce-client/*") - - if not security_enabled: - hadoop_secure_dn_user = '""' - else: - dfs_dn_port = utils.get_port(dfs_dn_addr) - dfs_dn_http_port = utils.get_port(dfs_dn_http_addr) - dfs_dn_https_port = utils.get_port(dfs_dn_https_addr) - # We try to avoid inability to start datanode as a plain user due to usage of root-owned ports - if dfs_http_policy == "HTTPS_ONLY": - secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_https_port) - elif dfs_http_policy == "HTTP_AND_HTTPS": - secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) or utils.is_secure_port(dfs_dn_https_port) - else: # params.dfs_http_policy == "HTTP_ONLY" or not defined: - secure_dn_ports_are_in_use = utils.is_secure_port(dfs_dn_port) or utils.is_secure_port(dfs_dn_http_port) - if secure_dn_ports_are_in_use: - hadoop_secure_dn_user = hdfs_user - else: - hadoop_secure_dn_user = '""' - -ambari_libs_dir = "/var/lib/ambari-agent/lib" -limits_conf_dir = "/etc/security/limits.d" - -hdfs_user_nofile_limit = default("/configurations/hadoop-env/hdfs_user_nofile_limit", "128000") -hdfs_user_nproc_limit = default("/configurations/hadoop-env/hdfs_user_nproc_limit", "65536") - -create_lib_snappy_symlinks = check_stack_feature(StackFeature.SNAPPY, stack_version_formatted) -jsvc_path = "/usr/lib/bigtop-utils" - -execute_path = os.environ['PATH'] + os.pathsep + hadoop_bin_dir -ulimit_cmd = "ulimit -c unlimited ; " - -snappy_so = "libsnappy.so" -so_target_dir_x86 = format("{hadoop_lib_home}/native/Linux-i386-32") -so_target_dir_x64 = format("{hadoop_lib_home}/native/Linux-amd64-64") -so_target_x86 = format("{so_target_dir_x86}/{snappy_so}") -so_target_x64 = format("{so_target_dir_x64}/{snappy_so}") -so_src_dir_x86 = format("{hadoop_home}/lib") -so_src_dir_x64 = format("{hadoop_home}/lib64") -so_src_x86 = format("{so_src_dir_x86}/{snappy_so}") -so_src_x64 = format("{so_src_dir_x64}/{snappy_so}") - -#security params -smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab'] -hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] -falcon_user = config['configurations']['falcon-env']['falcon_user'] - -#exclude file -hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", []) -exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude'] -update_exclude_file_only = default("/commandParams/update_exclude_file_only",False) -command_phase = default("/commandParams/phase","") - -klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None)) -kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) -#hosts -hostname = config["hostname"] -rm_host = default("/clusterHostInfo/rm_host", []) -slave_hosts = default("/clusterHostInfo/slave_hosts", []) -oozie_servers = default("/clusterHostInfo/oozie_server", []) -hcat_server_hosts = default("/clusterHostInfo/webhcat_server_host", []) -hive_server_host = default("/clusterHostInfo/hive_server_host", []) -hbase_master_hosts = default("/clusterHostInfo/hbase_master_hosts", []) -hs_host = default("/clusterHostInfo/hs_host", []) -jtnode_host = default("/clusterHostInfo/jtnode_host", []) -namenode_host = default("/clusterHostInfo/namenode_host", []) -nm_host = default("/clusterHostInfo/nm_host", []) -ganglia_server_hosts = default("/clusterHostInfo/ganglia_server_host", []) -journalnode_hosts = default("/clusterHostInfo/journalnode_hosts", []) -zkfc_hosts = default("/clusterHostInfo/zkfc_hosts", []) -falcon_host = default("/clusterHostInfo/falcon_server_hosts", []) - -has_ganglia_server = not len(ganglia_server_hosts) == 0 -has_namenodes = not len(namenode_host) == 0 -has_jobtracker = not len(jtnode_host) == 0 -has_resourcemanager = not len(rm_host) == 0 -has_histroryserver = not len(hs_host) == 0 -has_hbase_masters = not len(hbase_master_hosts) == 0 -has_slaves = not len(slave_hosts) == 0 -has_oozie_server = not len(oozie_servers) == 0 -has_hcat_server_host = not len(hcat_server_hosts) == 0 -has_hive_server_host = not len(hive_server_host) == 0 -has_journalnode_hosts = not len(journalnode_hosts) == 0 -has_zkfc_hosts = not len(zkfc_hosts) == 0 -has_falcon_host = not len(falcon_host) == 0 - - -is_namenode_master = hostname in namenode_host -is_jtnode_master = hostname in jtnode_host -is_rmnode_master = hostname in rm_host -is_hsnode_master = hostname in hs_host -is_hbase_master = hostname in hbase_master_hosts -is_slave = hostname in slave_hosts - -if has_ganglia_server: - ganglia_server_host = ganglia_server_hosts[0] - -#users and groups -yarn_user = config['configurations']['yarn-env']['yarn_user'] -hbase_user = config['configurations']['hbase-env']['hbase_user'] -oozie_user = config['configurations']['oozie-env']['oozie_user'] -webhcat_user = config['configurations']['hive-env']['webhcat_user'] -hive_user = config['configurations']['hive-env']['hive_user'] -smoke_user = config['configurations']['cluster-env']['smokeuser'] -smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name'] -mapred_user = config['configurations']['mapred-env']['mapred_user'] -hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None) - -user_group = config['configurations']['cluster-env']['user_group'] -root_group = "root" -proxyuser_group = config['configurations']['hadoop-env']['proxyuser_group'] - -#hadoop params -hdfs_log_dir_prefix = config['configurations']['hadoop-env']['hdfs_log_dir_prefix'] -hadoop_root_logger = config['configurations']['hadoop-env']['hadoop_root_logger'] -nfs_file_dump_dir = config['configurations']['hdfs-site']['nfs.file.dump.dir'] - -dfs_domain_socket_path = config['configurations']['hdfs-site']['dfs.domain.socket.path'] -dfs_domain_socket_dir = os.path.dirname(dfs_domain_socket_path) - -jn_edits_dir = config['configurations']['hdfs-site']['dfs.journalnode.edits.dir'] - -dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir'] - -hdfs_log_dir = format("{hdfs_log_dir_prefix}/{hdfs_user}") -namenode_dirs_created_stub_dir = hdfs_log_dir -namenode_dirs_stub_filename = "namenode_dirs_created" - -smoke_hdfs_user_dir = format("/user/{smoke_user}") -smoke_hdfs_user_mode = 0770 - -hdfs_namenode_format_disabled = default("/configurations/cluster-env/hdfs_namenode_format_disabled", False) -hdfs_namenode_formatted_mark_suffix = "/namenode-formatted/" -hdfs_namenode_bootstrapped_mark_suffix = "/namenode-bootstrapped/" -namenode_formatted_old_mark_dirs = ["/var/run/hadoop/hdfs/namenode-formatted", - format("{hadoop_pid_dir_prefix}/hdfs/namenode/formatted"), - "/var/lib/hdfs/namenode/formatted"] -dfs_name_dirs = dfs_name_dir.split(",") -namenode_formatted_mark_dirs = [] -namenode_bootstrapped_mark_dirs = [] -for dn_dir in dfs_name_dirs: - tmp_format_mark_dir = format("{dn_dir}{hdfs_namenode_formatted_mark_suffix}") - tmp_bootstrap_mark_dir = format("{dn_dir}{hdfs_namenode_bootstrapped_mark_suffix}") - namenode_formatted_mark_dirs.append(tmp_format_mark_dir) - namenode_bootstrapped_mark_dirs.append(tmp_bootstrap_mark_dir) - -# Use the namenode RPC address if configured, otherwise, fallback to the default file system -namenode_address = None -if 'dfs.namenode.rpc-address' in config['configurations']['hdfs-site']: - namenode_rpcaddress = config['configurations']['hdfs-site']['dfs.namenode.rpc-address'] - namenode_address = format("hdfs://{namenode_rpcaddress}") -else: - namenode_address = config['configurations']['core-site']['fs.defaultFS'] - -fs_checkpoint_dirs = default("/configurations/hdfs-site/dfs.namenode.checkpoint.dir", "").split(',') - -dfs_data_dirs = config['configurations']['hdfs-site']['dfs.datanode.data.dir'] - -data_dir_mount_file = "/var/lib/ambari-agent/data/datanode/dfs_data_dir_mount.hist" - -# HDFS High Availability properties -dfs_ha_enabled = False -dfs_ha_nameservices = default('/configurations/hdfs-site/dfs.internal.nameservices', None) -if dfs_ha_nameservices is None: - dfs_ha_nameservices = default('/configurations/hdfs-site/dfs.nameservices', None) -dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None) -dfs_ha_automatic_failover_enabled = default("/configurations/hdfs-site/dfs.ha.automatic-failover.enabled", False) - -# hostname of the active HDFS HA Namenode (only used when HA is enabled) -dfs_ha_namenode_active = default("/configurations/hadoop-env/dfs_ha_initial_namenode_active", None) -# hostname of the standby HDFS HA Namenode (only used when HA is enabled) -dfs_ha_namenode_standby = default("/configurations/hadoop-env/dfs_ha_initial_namenode_standby", None) - -# Values for the current Host -namenode_id = None -namenode_rpc = None - -dfs_ha_namemodes_ids_list = [] -other_namenode_id = None - -if dfs_ha_namenode_ids: - dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",") - dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list) - if dfs_ha_namenode_ids_array_len > 1: - dfs_ha_enabled = True -if dfs_ha_enabled: - for nn_id in dfs_ha_namemodes_ids_list: - nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')] - if hostname in nn_host: - namenode_id = nn_id - namenode_rpc = nn_host - # With HA enabled namenode_address is recomputed - namenode_address = format('hdfs://{dfs_ha_nameservices}') - - # Calculate the namenode id of the other namenode. This is needed during RU to initiate an HA failover using ZKFC. - if namenode_id is not None and len(dfs_ha_namemodes_ids_list) == 2: - other_namenode_id = list(set(dfs_ha_namemodes_ids_list) - set([namenode_id]))[0] - - -if dfs_http_policy is not None and dfs_http_policy.upper() == "HTTPS_ONLY": - https_only = True - journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.https-address', None) -else: - https_only = False - journalnode_address = default('/configurations/hdfs-site/dfs.journalnode.http-address', None) - -if journalnode_address: - journalnode_port = journalnode_address.split(":")[1] - - -if security_enabled: - dn_principal_name = config['configurations']['hdfs-site']['dfs.datanode.kerberos.principal'] - dn_keytab = config['configurations']['hdfs-site']['dfs.datanode.keytab.file'] - dn_principal_name = dn_principal_name.replace('_HOST',hostname.lower()) - - dn_kinit_cmd = format("{kinit_path_local} -kt {dn_keytab} {dn_principal_name};") - - nn_principal_name = config['configurations']['hdfs-site']['dfs.namenode.kerberos.principal'] - nn_keytab = config['configurations']['hdfs-site']['dfs.namenode.keytab.file'] - nn_principal_name = nn_principal_name.replace('_HOST',hostname.lower()) - - nn_kinit_cmd = format("{kinit_path_local} -kt {nn_keytab} {nn_principal_name};") - - jn_principal_name = default("/configurations/hdfs-site/dfs.journalnode.kerberos.principal", None) - if jn_principal_name: - jn_principal_name = jn_principal_name.replace('_HOST', hostname.lower()) - jn_keytab = default("/configurations/hdfs-site/dfs.journalnode.keytab.file", None) - hdfs_kinit_cmd = format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name};") -else: - dn_kinit_cmd = "" - nn_kinit_cmd = "" - hdfs_kinit_cmd = "" - -hdfs_site = config['configurations']['hdfs-site'] -default_fs = config['configurations']['core-site']['fs.defaultFS'] - -dfs_type = default("/commandParams/dfs_type", "") - -import functools -#create partial functions with common arguments for every HdfsResource call -#to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code -HdfsResource = functools.partial( - HdfsResource, - user=hdfs_user, - hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", - security_enabled = security_enabled, - keytab = hdfs_user_keytab, - kinit_path_local = kinit_path_local, - hadoop_bin_dir = hadoop_bin_dir, - hadoop_conf_dir = hadoop_conf_dir, - principal_name = hdfs_principal_name, - hdfs_site = hdfs_site, - default_fs = default_fs, - immutable_paths = get_not_managed_resources(), - dfs_type = dfs_type -) - - -# The logic for LZO also exists in OOZIE's params.py -io_compression_codecs = default("/configurations/core-site/io.compression.codecs", None) -lzo_enabled = io_compression_codecs is not None and "com.hadoop.compression.lzo" in io_compression_codecs.lower() -lzo_packages = get_lzo_packages(stack_version_unformatted) - -name_node_params = default("/commandParams/namenode", None) - -java_home = config['hostLevelParams']['java_home'] -java_version = expect("/hostLevelParams/java_version", int) - -hadoop_heapsize = config['configurations']['hadoop-env']['hadoop_heapsize'] -namenode_heapsize = config['configurations']['hadoop-env']['namenode_heapsize'] -namenode_opt_newsize = config['configurations']['hadoop-env']['namenode_opt_newsize'] -namenode_opt_maxnewsize = config['configurations']['hadoop-env']['namenode_opt_maxnewsize'] -namenode_opt_permsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_permsize","128m") -namenode_opt_maxpermsize = format_jvm_option("/configurations/hadoop-env/namenode_opt_maxpermsize","256m") - -jtnode_opt_newsize = "200m" -jtnode_opt_maxnewsize = "200m" -jtnode_heapsize = "1024m" -ttnode_heapsize = "1024m" - -dtnode_heapsize = config['configurations']['hadoop-env']['dtnode_heapsize'] -mapred_pid_dir_prefix = default("/configurations/mapred-env/mapred_pid_dir_prefix","/var/run/hadoop-mapreduce") -mapred_log_dir_prefix = default("/configurations/mapred-env/mapred_log_dir_prefix","/var/log/hadoop-mapreduce") - -# ranger host -ranger_admin_hosts = default("/clusterHostInfo/ranger_admin_hosts", []) -has_ranger_admin = not len(ranger_admin_hosts) == 0 -xml_configurations_supported = config['configurations']['ranger-env']['xml_configurations_supported'] -ambari_server_hostname = config['clusterHostInfo']['ambari_server_host'][0] - -#ranger hdfs properties -policymgr_mgr_url = config['configurations']['admin-properties']['policymgr_external_url'] -if 'admin-properties' in config['configurations'] and 'policymgr_external_url' in config['configurations']['admin-properties'] and policymgr_mgr_url.endswith('/'): - policymgr_mgr_url = policymgr_mgr_url.rstrip('/') -xa_audit_db_name = default('/configurations/admin-properties/audit_db_name', 'ranger_audits') -xa_audit_db_user = default('/configurations/admin-properties/audit_db_user', 'rangerlogger') -xa_db_host = config['configurations']['admin-properties']['db_host'] -repo_name = str(config['clusterName']) + '_hadoop' - -hadoop_security_authentication = config['configurations']['core-site']['hadoop.security.authentication'] -hadoop_security_authorization = config['configurations']['core-site']['hadoop.security.authorization'] -fs_default_name = config['configurations']['core-site']['fs.defaultFS'] -hadoop_security_auth_to_local = config['configurations']['core-site']['hadoop.security.auth_to_local'] -hadoop_rpc_protection = config['configurations']['ranger-hdfs-plugin-properties']['hadoop.rpc.protection'] -common_name_for_certificate = config['configurations']['ranger-hdfs-plugin-properties']['common.name.for.certificate'] - -repo_config_username = config['configurations']['ranger-hdfs-plugin-properties']['REPOSITORY_CONFIG_USERNAME'] - -if security_enabled: - sn_principal_name = default("/configurations/hdfs-site/dfs.secondary.namenode.kerberos.principal", "nn/_h...@example.com") - sn_principal_name = sn_principal_name.replace('_HOST',hostname.lower()) - -ranger_env = config['configurations']['ranger-env'] -ranger_plugin_properties = config['configurations']['ranger-hdfs-plugin-properties'] -policy_user = config['configurations']['ranger-hdfs-plugin-properties']['policy_user'] - -#For curl command in ranger plugin to get db connector -jdk_location = config['hostLevelParams']['jdk_location'] -java_share_dir = '/usr/share/java' - -is_https_enabled = is_https_enabled_in_hdfs(config['configurations']['hdfs-site']['dfs.http.policy'], - config['configurations']['hdfs-site']['dfs.https.enable']) - -if has_ranger_admin: - enable_ranger_hdfs = (config['configurations']['ranger-hdfs-plugin-properties']['ranger-hdfs-plugin-enabled'].lower() == 'yes') - xa_audit_db_password = '' - if not is_empty(config['configurations']['admin-properties']['audit_db_password']) and stack_supports_ranger_audit_db: - xa_audit_db_password = unicode(config['configurations']['admin-properties']['audit_db_password']) - repo_config_password = unicode(config['configurations']['ranger-hdfs-plugin-properties']['REPOSITORY_CONFIG_PASSWORD']) - xa_audit_db_flavor = (config['configurations']['admin-properties']['DB_FLAVOR']).lower() - previous_jdbc_jar_name = None - - if stack_supports_ranger_audit_db: - - if xa_audit_db_flavor == 'mysql': - jdbc_jar_name = default("/hostLevelParams/custom_mysql_jdbc_name", None) - previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_mysql_jdbc_name", None) - audit_jdbc_url = format('jdbc:mysql://{xa_db_host}/{xa_audit_db_name}') - jdbc_driver = "com.mysql.jdbc.Driver" - elif xa_audit_db_flavor == 'oracle': - jdbc_jar_name = default("/hostLevelParams/custom_oracle_jdbc_name", None) - previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_oracle_jdbc_name", None) - colon_count = xa_db_host.count(':') - if colon_count == 2 or colon_count == 0: - audit_jdbc_url = format('jdbc:oracle:thin:@{xa_db_host}') - else: - audit_jdbc_url = format('jdbc:oracle:thin:@//{xa_db_host}') - jdbc_driver = "oracle.jdbc.OracleDriver" - elif xa_audit_db_flavor == 'postgres': - jdbc_jar_name = default("/hostLevelParams/custom_postgres_jdbc_name", None) - previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_postgres_jdbc_name", None) - audit_jdbc_url = format('jdbc:postgresql://{xa_db_host}/{xa_audit_db_name}') - jdbc_driver = "org.postgresql.Driver" - elif xa_audit_db_flavor == 'mssql': - jdbc_jar_name = default("/hostLevelParams/custom_mssql_jdbc_name", None) - previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_mssql_jdbc_name", None) - audit_jdbc_url = format('jdbc:sqlserver://{xa_db_host};databaseName={xa_audit_db_name}') - jdbc_driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" - elif xa_audit_db_flavor == 'sqla': - jdbc_jar_name = default("/hostLevelParams/custom_sqlanywhere_jdbc_name", None) - previous_jdbc_jar_name = default("/hostLevelParams/previous_custom_sqlanywhere_jdbc_name", None) - audit_jdbc_url = format('jdbc:sqlanywhere:database={xa_audit_db_name};host={xa_db_host}') - jdbc_driver = "sap.jdbc4.sqlanywhere.IDriver" - - downloaded_custom_connector = format("{tmp_dir}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None - driver_curl_source = format("{jdk_location}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None - driver_curl_target = format("{hadoop_lib_home}/{jdbc_jar_name}") if stack_supports_ranger_audit_db else None - previous_jdbc_jar = format("{hadoop_lib_home}/{previous_jdbc_jar_name}") if stack_supports_ranger_audit_db else None - - sql_connector_jar = '' - - hdfs_ranger_plugin_config = { - 'username': repo_config_username, - 'password': repo_config_password, - 'hadoop.security.authentication': hadoop_security_authentication, - 'hadoop.security.authorization': hadoop_security_authorization, - 'fs.default.name': fs_default_name, - 'hadoop.security.auth_to_local': hadoop_security_auth_to_local, - 'hadoop.rpc.protection': hadoop_rpc_protection, - 'commonNameForCertificate': common_name_for_certificate, - 'dfs.datanode.kerberos.principal': dn_principal_name if security_enabled else '', - 'dfs.namenode.kerberos.principal': nn_principal_name if security_enabled else '', - 'dfs.secondary.namenode.kerberos.principal': sn_principal_name if security_enabled else '' - } - - hdfs_ranger_plugin_repo = { - 'isActive': 'true', - 'config': json.dumps(hdfs_ranger_plugin_config), - 'description': 'hdfs repo', - 'name': repo_name, - 'repositoryType': 'hdfs', - 'assetType': '1' - } - if stack_supports_ranger_kerberos and security_enabled: - hdfs_ranger_plugin_config['policy.download.auth.users'] = hdfs_user - hdfs_ranger_plugin_config['tag.download.auth.users'] = hdfs_user - - if stack_supports_ranger_kerberos: - hdfs_ranger_plugin_config['ambari.service.check.user'] = policy_user - - hdfs_ranger_plugin_repo = { - 'isEnabled': 'true', - 'configs': hdfs_ranger_plugin_config, - 'description': 'hdfs repo', - 'name': repo_name, - 'type': 'hdfs' - } - - xa_audit_db_is_enabled = False - ranger_audit_solr_urls = config['configurations']['ranger-admin-site']['ranger.audit.solr.urls'] - if xml_configurations_supported and stack_supports_ranger_audit_db: - xa_audit_db_is_enabled = config['configurations']['ranger-hdfs-audit']['xasecure.audit.destination.db'] - xa_audit_hdfs_is_enabled = config['configurations']['ranger-hdfs-audit']['xasecure.audit.destination.hdfs'] if xml_configurations_supported else None - ssl_keystore_password = unicode(config['configurations']['ranger-hdfs-policymgr-ssl']['xasecure.policymgr.clientssl.keystore.password']) if xml_configurations_supported else None - ssl_truststore_password = unicode(config['configurations']['ranger-hdfs-policymgr-ssl']['xasecure.policymgr.clientssl.truststore.password']) if xml_configurations_supported else None - credential_file = format('/etc/ranger/{repo_name}/cred.jceks') if xml_configurations_supported else None - - #For SQLA explicitly disable audit to DB for Ranger - if xa_audit_db_flavor == 'sqla': - xa_audit_db_is_enabled = False http://git-wip-us.apache.org/repos/asf/ambari/blob/7df6bba4/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_windows.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_windows.py b/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_windows.py deleted file mode 100644 index 70d95a6..0000000 --- a/ambari-server/src/main/resources/common-services/HDFS/3.0.0/package/scripts/params_windows.py +++ /dev/null @@ -1,79 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import os - -#Used in subsequent imports from params -from resource_management.libraries.script.script import Script -from resource_management.libraries.functions.default import default -from resource_management.libraries.functions.format import format -from install_params import exclude_packages -from status_params import * - -config = Script.get_config() -hadoop_conf_dir = None -hbase_conf_dir = None -hadoop_home = None -try: - hadoop_conf_dir = os.environ["HADOOP_CONF_DIR"] - hbase_conf_dir = os.environ["HBASE_CONF_DIR"] - hadoop_home = os.environ["HADOOP_HOME"] -except: - pass -#directories & files -dfs_name_dir = config['configurations']['hdfs-site']['dfs.namenode.name.dir'] -fs_checkpoint_dir = config['configurations']['hdfs-site']['dfs.namenode.checkpoint.dir'] -dfs_data_dir = config['configurations']['hdfs-site']['dfs.datanode.data.dir'] -#decomission -hdfs_exclude_file = default("/clusterHostInfo/decom_dn_hosts", []) -exclude_file_path = config['configurations']['hdfs-site']['dfs.hosts.exclude'] -# HDFS High Availability properties -dfs_ha_enabled = False -dfs_ha_nameservices = default("/configurations/hdfs-site/dfs.internal.nameservices", None) -dfs_ha_namenode_ids = default(format("/configurations/hdfs-site/dfs.ha.namenodes.{dfs_ha_nameservices}"), None) - -namenode_id = None -namenode_rpc = None -hostname = config["hostname"] -if dfs_ha_namenode_ids: - dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",") - dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list) - if dfs_ha_namenode_ids_array_len > 1: - dfs_ha_enabled = True -if dfs_ha_enabled: - for nn_id in dfs_ha_namemodes_ids_list: - nn_host = config['configurations']['hdfs-site'][format('dfs.namenode.rpc-address.{dfs_ha_nameservices}.{nn_id}')] - if hostname in nn_host: - namenode_id = nn_id - namenode_rpc = nn_host - -hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"] -hdfs_user = hadoop_user - -grep_exe = "findstr" - -name_node_params = default("/commandParams/namenode", None) - -service_map = { - "datanode" : datanode_win_service_name, - "journalnode" : journalnode_win_service_name, - "namenode" : namenode_win_service_name, - "secondarynamenode" : snamenode_win_service_name, - "zkfc_slave": zkfc_win_service_name -}