Repository: ambari Updated Branches: refs/heads/trunk 1e393e6fc -> 3f1d3dfac
AMBARI-8477. HDFS service components should indicate security state. (robert levas via jaimin) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/3f1d3dfa Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/3f1d3dfa Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/3f1d3dfa Branch: refs/heads/trunk Commit: 3f1d3dfac12e6d870c60d583af7ba1bdeb54a546 Parents: 1e393e6 Author: Jaimin Jetly <[email protected]> Authored: Tue Dec 23 13:15:52 2014 -0800 Committer: Jaimin Jetly <[email protected]> Committed: Tue Dec 23 13:15:52 2014 -0800 ---------------------------------------------------------------------- .../resource_management/TestSecurityCommons.py | 271 +++++++++++++++++++ .../libraries/functions/security_commons.py | 184 +++++++++++++ .../HDFS/2.1.0.2.0/package/scripts/datanode.py | 59 +++- .../2.1.0.2.0/package/scripts/hdfs_client.py | 44 +++ .../2.1.0.2.0/package/scripts/journalnode.py | 59 +++- .../HDFS/2.1.0.2.0/package/scripts/namenode.py | 91 +++++-- .../HDFS/2.1.0.2.0/package/scripts/snamenode.py | 58 ++++ .../2.1.0.2.0/package/scripts/status_params.py | 9 + .../2.1.0.2.0/package/scripts/zkfc_slave.py | 48 +++- 9 files changed, 804 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-agent/src/test/python/resource_management/TestSecurityCommons.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestSecurityCommons.py b/ambari-agent/src/test/python/resource_management/TestSecurityCommons.py new file mode 100644 index 0000000..e14f3b1 --- /dev/null +++ b/ambari-agent/src/test/python/resource_management/TestSecurityCommons.py @@ -0,0 +1,271 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from mock.mock import patch, MagicMock, Mock +from unittest import TestCase +from resource_management.libraries.functions.security_commons import * +from datetime import datetime, timedelta +from tempfile import gettempdir + +class TestSecurityCommons(TestCase): + @patch('os.path.isfile') + def test_validate_security_config_properties(self, os_path_isfile_mock): + + # Testing with correct values_checks + params = {} + params["config_file"] = {} + params["config_file"]["property1"] = {"firstCase"} + params["config_file"]["property2"] = {"secondCase"} + params["config_file"]["property3"] = {"thirdCase"} + params["config_file"]["property4"] = {"fourthCase"} + params["config_file"]["property5"] = {"fifthCase"} + params["config_file"]["property6"] = {"sixthCase"} + + configuration_rules = {} + configuration_rules["config_file"] = {} + configuration_rules["config_file"]["value_checks"] = {} + configuration_rules["config_file"]["value_checks"]["property1"] = {"firstCase"} + configuration_rules["config_file"]["value_checks"]["property2"] = {"secondCase"} + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + True) # issues is empty + + #Testing with correct empty_checks + configuration_rules["config_file"]["empty_checks"] = ["property3", "property4"] + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + True) # issues is empty + + # Testing with correct read_checks + configuration_rules["config_file"]["read_checks"] = ["property5", "property6"] + + os_path_isfile_mock.return_value = True + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + True) # issues is empty + + # Testing with wrong values_checks + configuration_rules["config_file"]["value_checks"]["property1"] = {"failCase"} + configuration_rules["config_file"]["value_checks"]["property2"] = {"failCase2"} + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + False) # Doesn't return an empty issues + + configuration_rules["config_file"]["value_checks"]["property1"] = {"firstCase"} + configuration_rules["config_file"]["value_checks"]["property2"] = {"secondCase"} + + # Testing with a property which doesn't exist in params + configuration_rules["config_file"]["empty_checks"].append("property7") + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + False) # Doesn't return an empty list + + configuration_rules["config_file"]["empty_checks"].remove("property7") + + # Testing with a property which doesn't exist in params + configuration_rules["config_file"]["read_checks"].append("property8") + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + False) # Doesn't return an empty list + + configuration_rules["config_file"]["read_checks"].remove("property8") + + #Testing empty_checks and read_checks with an empty params[config_file][property] + params["config_file"]["property1"] = {""} + params["config_file"]["property2"] = {""} + params["config_file"]["property3"] = {""} + params["config_file"]["property4"] = {""} + params["config_file"]["property5"] = {""} + params["config_file"]["property6"] = {""} + + self.assertEquals(not validate_security_config_properties(params, configuration_rules), + False) # Doesn't return an empty list + + + def test_build_expectations(self): + + config_file = 'config_file' + + value_checks = {} + value_checks["property1"] = {"value1"} + value_checks["property2"] = {"value2"} + + empty_checks = ["property3", "property4"] + + read_checks = ["property5", "property6"] + + result = build_expectations(config_file, value_checks, empty_checks, read_checks) + + self.assertEquals(len(result[config_file]['value_checks']), len(value_checks)) + self.assertEquals(len(result[config_file]['empty_checks']), len(empty_checks)) + self.assertEquals(len(result[config_file]['read_checks']), len(read_checks)) + + # Testing that returns empty dict if is called without values + result = build_expectations(config_file, [], [], []) + + self.assertEquals(not result[config_file].items(), True) + + @patch('xml.etree.ElementTree.parse') + def test_get_params_from_filesystem(self, et_parser_mock): + + conf_dir = gettempdir() + config_file = [] + config_file.append("config.xml") + + prop1_name_mock = MagicMock() + prop1_name_mock.text.return_value = 'property1' + prop1_value_mock = MagicMock() + prop1_value_mock.text.return_value = 'true' + + prop2_name_mock = MagicMock() + prop2_name_mock.text.return_value = 'property2' + prop2_value_mock = MagicMock() + prop2_value_mock.text.return_value = 'false' + + prop3_name_mock = MagicMock() + prop3_name_mock.text.return_value = 'property3' + prop3_value_mock = MagicMock() + prop3_value_mock.text.return_value = 'true' + + props = [] + props.append([prop1_name_mock, prop1_value_mock]) + props.append([prop2_name_mock, prop2_value_mock]) + props.append([prop3_name_mock, prop3_value_mock]) + + element_tree_mock = MagicMock() + et_parser_mock.return_value = element_tree_mock + + get_root_mock = MagicMock() + element_tree_mock.getroot.return_value = get_root_mock + get_root_mock.getchildren.return_value = props + + result = get_params_from_filesystem(conf_dir, config_file) + + # Testing that the mock is called with the correct path + et_parser_mock.assert_called_with(conf_dir + os.sep + "config.xml") + + #Testing that the dictionary and the list from the result are not empty + self.assertEquals(not result, False) + self.assertEquals(not result[result.keys()[0]], False) + + #Testing that returns an empty dictionary if is called with no props + empty_props = [] + + get_root_mock.getchildren.return_value = empty_props + + result = get_params_from_filesystem(conf_dir, config_file) + + self.assertEquals(not result, False) + self.assertEquals(not result['config'].items(), True) + + #Testing that returns an empty dictionary if is called with empty config_files + empty_config_file = [] + + result = get_params_from_filesystem(conf_dir, empty_config_file) + + self.assertEquals(not result, True) + + #Test that params returns an exception + et_parser_mock.reset_mock() + et_parser_mock.side_effect = Exception("Invalid path") + + try: + get_params_from_filesystem(conf_dir, config_file) + except: + self.assertTrue(True) + + @patch('os.path.exists') + @patch('os.makedirs') + @patch('os.path.isfile') + @patch('json.load') + @patch('resource_management.libraries.functions.security_commons.new_cached_exec') + @patch('__builtin__.open') + def test_cached_executor(self, open_file_mock, new_cached_exec_mock, json_load_mock, + os_isfile_mock, os_makedirs_mock, os_path_exists_mock): + + # Test that function works when is called with correct parameters + temp_dir = gettempdir() + kinit_path = "kinit" + user = "user" + hostname ="hostnamne" + keytab_file ="/etc/security/keytabs/nn.service.keytab" + principal = "nn/[email protected]" + key = str(hash("%s|%s" % (principal, keytab_file))) + expiration_time = 30 + filename = key + "_tmp.txt" + file_path = temp_dir + os.sep + "kinit_executor_cache" + + os_path_exists_mock.return_value = True + os_isfile_mock.return_value = True + + output = {} + output[key] = {} + output[key] = {"last_successful_execution": str(datetime.now())} + + json_load_mock.return_value = output + + self.assertEquals(cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time), True) + os_path_exists_mock.assert_called_with(file_path) + os_isfile_mock.assert_called_with(file_path + os.sep + filename) + open_file_mock.assert_called_with(file_path + os.sep + filename, 'r') + + # Test that the new_cached_exec function is called if the time spend since the last call is greater than 30 minutes + last_successful_executation = datetime.now() + last_successful_executation = last_successful_executation - timedelta(minutes=31) + + output_error = {} + output_error[key] = {} + output_error[key] = {"last_successful_execution": str(last_successful_executation)} + + json_load_mock.reset_mock() + json_load_mock.return_value = output_error + + new_cached_exec_mock.return_value = output + + cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time) + + self.assertTrue(new_cached_exec_mock.called) + new_cached_exec_mock.assert_called_with(key, file_path + os.sep + filename, kinit_path, user, principal, keytab_file, hostname) + + # Test that the makedirs function is called with correct path when the directory doesn't exist + os_path_exists_mock.return_value = False + + cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time) + + os_makedirs_mock.assert_called_with(file_path) + + # Test that the json throws an exception + os_path_exists_mock.return_value = True + + json_load_mock.reset_mock() + json_load_mock.side_effect = Exception("Invalid file") + + try: + cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time) + except: + self.assertTrue(True) + + # Test that the new_cached_exec function is called if the output doesn't have data + json_load_mock.reset_mock() + json_load_mock.return_value = None + + new_cached_exec_mock.return_value = output + + cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time) + + self.assertTrue(new_cached_exec_mock.called) + new_cached_exec_mock.assert_called_with(key, file_path + os.sep + filename, kinit_path, user, principal, keytab_file, hostname) http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-common/src/main/python/resource_management/libraries/functions/security_commons.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/security_commons.py b/ambari-common/src/main/python/resource_management/libraries/functions/security_commons.py new file mode 100644 index 0000000..d3cd1a2 --- /dev/null +++ b/ambari-common/src/main/python/resource_management/libraries/functions/security_commons.py @@ -0,0 +1,184 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from datetime import datetime, timedelta +from resource_management import Execute +from tempfile import mkstemp +import os +import json + + +def validate_security_config_properties(params, configuration_rules): + """ + Generic security configuration validation based on a set of rules and operations + :param params: The structure where the config parameters are held + :param configuration_rules: A structure containing rules and expectations, + Three types of checks are currently supported by this method: + 1. value_checks - checks that a certain value must be set + 2. empty_checks - checks that the property values must not be empty + 3. read_checks - checks that the value represented by the property describes a readable file on the filesystem + :return: Issues found - should be empty if all is good + """ + + issues = {} + + for config_file, rule_sets in configuration_rules.iteritems(): + # Each configuration rule set may have 0 or more of the following rule sets: + # - value_checks + # - empty_checks + # - read_checks + try: + # Each rule set has at least a list of relevant property names to check in some way + # The rule set for the operation of 'value_checks' is expected to be a dictionary of + # property names to expected values + + actual_values = params[config_file] if config_file in params else {} + + # Process Value Checks + # The rules are expected to be a dictionary of property names to expected values + rules = rule_sets['value_checks'] if 'value_checks' in rule_sets else None + if rules: + for property_name, expected_value in rules.iteritems(): + actual_value = actual_values[property_name] if property_name in actual_values else '' + if actual_value != expected_value: + issues[config_file] = "Property " + property_name + ". Expected/Actual: " + \ + expected_value + "/" + actual_value + + # Process Empty Checks + # The rules are expected to be a list of property names that should not have empty values + rules = rule_sets['empty_checks'] if 'empty_checks' in rule_sets else None + if rules: + for property_name in rules: + actual_value = actual_values[property_name] if property_name in actual_values else '' + if not actual_value: + issues[config_file] = "Property " + property_name + " must exist and must not be empty!" + + # Process Read Checks + # The rules are expected to be a list of property names that resolve to files names and must + # exist and be readable + rules = rule_sets['read_checks'] if 'read_checks' in rule_sets else None + if rules: + for property_name in rules: + actual_value = actual_values[property_name] if property_name in actual_values else None + if not actual_value or not os.path.isfile(actual_value): + issues[ + config_file] = "Property " + property_name + " points to an inaccessible file or parameter does not exist!" + except Exception as e: + issues[config_file] = "Exception occurred while validating the config file\nCauses: " + str(e) + return issues + + +def build_expectations(config_file, value_checks, empty_checks, read_checks): + """ + Helper method used to build the check expectations dict + :return: + """ + configs_expectations = {} + configs_expectations[config_file] = {} + if value_checks: + configs_expectations[config_file]['value_checks'] = value_checks + if empty_checks: + configs_expectations[config_file]['empty_checks'] = empty_checks + if read_checks: + configs_expectations[config_file]['read_checks'] = read_checks + return configs_expectations + + +def get_params_from_filesystem(conf_dir, config_files): + """ + Used to retrieve properties from xml config files and build a dict + :param conf_dir: directory where the configuration files sit + :param config_files: list of configuration file names + :return: + """ + result = {} + from xml.etree import ElementTree as ET + + for config_file in config_files: + configuration = ET.parse(conf_dir + os.sep + config_file) + props = configuration.getroot().getchildren() + config_file_id = config_file[:-4] if len(config_file) > 4 else config_file + result[config_file_id] = {} + for prop in props: + result[config_file_id].update({prop[0].text: prop[1].text}) + return result + + +def cached_kinit_executor(kinit_path, exec_user, keytab_file, principal, hostname, temp_dir, + expiration_time): + """ + Main cached kinit executor - Uses a temporary file on the FS to cache executions. Each command + will have its own file and only one entry (last successful execution) will be stored + :return: + """ + key = str(hash("%s|%s" % (principal, keytab_file))) + filename = key + "_tmp.txt" + file_path = temp_dir + os.sep + "kinit_executor_cache" + output = None + + # First execution scenario dir file existence check + if not os.path.exists(file_path): + os.makedirs(file_path) + + file_path += os.sep + filename + + # If the file does not exist create before read + if not os.path.isfile(file_path): + with open(file_path, 'w+') as new_file: + new_file.write("{}") + try: + with open(file_path, 'r') as cache_file: + output = json.load(cache_file) + except: + # In the extraordinary case the temporary file gets corrupted the cache should be reset to avoid error loop + with open(file_path, 'w+') as cache_file: + cache_file.write("{}") + + if (not output) or (key not in output) or ("last_successful_execution" not in output[key]): + return new_cached_exec(key, file_path, kinit_path, exec_user, keytab_file, principal, hostname) + else: + last_run_time = output[key]["last_successful_execution"] + now = datetime.now() + if (now - datetime.strptime(last_run_time, "%Y-%m-%d %H:%M:%S.%f") > timedelta( + minutes=expiration_time)): + return new_cached_exec(key, file_path, kinit_path, exec_user, keytab_file, principal, hostname) + else: + return True + + +def new_cached_exec(key, file_path, kinit_path, exec_user, keytab_file, principal, hostname): + """ + Entry point of an actual execution - triggered when timeout on the cache expired or on fresh execution + """ + now = datetime.now() + _, temp_kinit_cache_file = mkstemp() + command = "su -s /bin/bash - %s -c '%s -c %s -kt %s %s'" % \ + (exec_user, kinit_path, temp_kinit_cache_file, keytab_file, + principal.replace("_HOST", hostname)) + + try: + Execute(command) + + with open(file_path, 'w+') as cache_file: + result = {key: {"last_successful_execution": str(now)}} + json.dump(result, cache_file) + finally: + os.remove(temp_kinit_cache_file) + + return True http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py index 4afd4ce..8bce423 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/datanode.py @@ -19,7 +19,10 @@ limitations under the License. import datanode_upgrade from hdfs_datanode import datanode from resource_management import * -from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version +from resource_management.libraries.functions.version import compare_versions, \ + format_hdp_stack_version +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties from hdfs import hdfs @@ -83,6 +86,60 @@ class DataNode(Script): env.set_params(status_params) check_process_status(status_params.datanode_pid_file) + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.datanode.keytab.file', + 'dfs.datanode.kerberos.principal'] + props_read_check = ['dfs.datanode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + ['core-site.xml', 'hdfs-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.datanode.keytab.file' not in security_params['hdfs-site'] or + 'dfs.datanode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.datanode.keytab.file'], + security_params['hdfs-site']['dfs.datanode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = "" + for cf in result_issues: + issues += "Configuration file " + cf + " did not pass the validation. Reason: " + \ + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + if __name__ == "__main__": DataNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py index 3c54e6b..cf93c2f 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_client.py @@ -18,6 +18,8 @@ limitations under the License. """ from resource_management import * +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties from hdfs import hdfs from utils import service @@ -54,6 +56,48 @@ class HdfsClient(Script): hdfs() pass + def security_status(self, env): + import status_params + env.set_params(status_params) + if status_params.security_enabled: + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations ={} + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, ['core-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + if status_params.hdfs_user_principal or status_params.hdfs_user_keytab: + try: + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + status_params.hdfs_user_keytab, + status_params.hdfs_user_principal, + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + self.put_structured_out({"securityIssuesFound": "hdfs principal and/or keytab file is not specified"}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + issues="" + for cf in result_issues: + issues+="Configuration file " + cf + " did not pass the validation. Reason: " + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + + else: + self.put_structured_out({"securityState": "UNSECURED"}) if __name__ == "__main__": HdfsClient().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py index f664bcd..60e91ce 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/journalnode.py @@ -18,8 +18,11 @@ limitations under the License. """ from resource_management import * -from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version +from resource_management.libraries.functions.version import compare_versions, \ + format_hdp_stack_version from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties from utils import service from hdfs import hdfs @@ -85,6 +88,60 @@ class JournalNode(Script): env.set_params(status_params) check_process_status(status_params.journalnode_pid_file) + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + + props_value_check = None + props_empty_check = ['dfs.journalnode.keytab.file', + 'dfs.journalnode.kerberos.principal'] + props_read_check = ['dfs.journalnode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(hdfs_site_expectations) + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, ['core-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.journalnode.kerberos.keytab.file' not in security_params['hdfs-site'] or + 'dfs.journalnode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.journalnode.kerberos.keytab.file'], + security_params['hdfs-site']['dfs.journalnode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = "" + for cf in result_issues: + issues += "Configuration file " + cf + " did not pass the validation. Reason: " + \ + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + if __name__ == "__main__": JournalNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py index c8a460f..3eb9cc2 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/namenode.py @@ -24,7 +24,10 @@ import subprocess from datetime import datetime from resource_management import * -from resource_management.libraries.functions.version import compare_versions, format_hdp_stack_version +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties +from resource_management.libraries.functions.version import compare_versions, \ + format_hdp_stack_version from resource_management.libraries.functions.format import format from resource_management.libraries.functions.check_process_status import check_process_status @@ -94,14 +97,70 @@ class NameNode(Script): check_process_status(status_params.namenode_pid_file) pass + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.namenode.kerberos.internal.spnego.principal', + 'dfs.namenode.keytab.file', + 'dfs.namenode.kerberos.principal'] + props_read_check = ['dfs.namenode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + ['core-site.xml', 'hdfs-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ( 'hdfs-site' not in security_params + or 'dfs.namenode.keytab.file' not in security_params['hdfs-site'] + or 'dfs.namenode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.namenode.keytab.file'], + security_params['hdfs-site']['dfs.namenode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = "" + for cf in result_issues: + issues += "Configuration file " + cf + " did not pass the validation. Reason: " + \ + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + + def decommission(self, env): import params env.set_params(params) namenode(action="decommission") pass - - + + def rebalancehdfs(self, env): import params env.set_params(params) @@ -109,27 +168,27 @@ class NameNode(Script): name_node_parameters = json.loads( params.name_node_params ) threshold = name_node_parameters['threshold'] _print("Starting balancer with threshold = %s\n" % threshold) - + def calculateCompletePercent(first, current): return 1.0 - current.bytesLeftToMove/first.bytesLeftToMove - - + + def startRebalancingProcess(threshold): rebalanceCommand = format('hdfs --config {hadoop_conf_dir} balancer -threshold {threshold}') return as_user(rebalanceCommand, params.hdfs_user, env={'PATH': params.hadoop_bin_dir}) - + command = startRebalancingProcess(threshold) - + basedir = os.path.join(env.config.basedir, 'scripts') if(threshold == 'DEBUG'): #FIXME TODO remove this on PROD basedir = os.path.join(env.config.basedir, 'scripts', 'balancer-emulator') command = ['python','hdfs-command.py'] - + _print("Executing command %s\n" % command) - + parser = hdfs_rebalance.HdfsParser() proc = subprocess.Popen( - command, + command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, @@ -141,19 +200,19 @@ class NameNode(Script): pl = parser.parseLine(line) if pl: res = pl.toJson() - res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) - + res['completePercent'] = calculateCompletePercent(parser.initialLine, pl) + self.put_structured_out(res) - elif parser.state == 'PROCESS_FINISED' : + elif parser.state == 'PROCESS_FINISED' : _print('[balancer] %s %s' % (str(datetime.now()), 'Process is finished' )) self.put_structured_out({'completePercent' : 1}) break - + proc.stdout.close() proc.wait() if proc.returncode != None and proc.returncode != 0: raise Fail('Hdfs rebalance process exited with error. See the log output') - + def _print(line): sys.stdout.write(line) sys.stdout.flush() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py index 7106422..9900a7e 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/snamenode.py @@ -18,6 +18,8 @@ limitations under the License. """ from resource_management import * +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties from hdfs_snamenode import snamenode from hdfs import hdfs @@ -63,6 +65,62 @@ class SNameNode(Script): check_process_status(status_params.snamenode_pid_file) + def security_status(self, env): + import status_params + + env.set_params(status_params) + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + props_value_check = None + props_empty_check = ['dfs.secondary.namenode.kerberos.internal.spnego.principal', + 'dfs.secondary.namenode.keytab.file', + 'dfs.secondary.namenode.kerberos.principal'] + props_read_check = ['dfs.secondary.namenode.keytab.file'] + hdfs_site_expectations = build_expectations('hdfs-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + hdfs_expectations.update(hdfs_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, + ['core-site.xml', 'hdfs-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + try: + # Double check the dict before calling execute + if ('hdfs-site' not in security_params or + 'dfs.secondary.namenode.keytab.file' not in security_params['hdfs-site'] or + 'dfs.secondary.namenode.kerberos.principal' not in security_params['hdfs-site']): + self.put_structured_out({"securityState": "UNSECURED"}) + self.put_structured_out( + {"securityIssuesFound": "Keytab file or principal are not set property."}) + return + + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + security_params['hdfs-site']['dfs.secondary.namenode.keytab.file'], + security_params['hdfs-site'][ + 'dfs.secondary.namenode.kerberos.principal'], + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + issues = "" + for cf in result_issues: + issues += "Configuration file " + cf + " did not pass the validation. Reason: " + \ + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + if __name__ == "__main__": SNameNode().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py index 0027a4c..c3e5832 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/status_params.py @@ -29,3 +29,12 @@ namenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-namenode.pid") snamenode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-secondarynamenode.pid") journalnode_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-journalnode.pid") zkfc_pid_file = format("{hdp_pid_dir}/hadoop-{hdfs_user}-zkfc.pid") + +# Security related/required params +hostname = config['hostname'] +security_enabled = config['configurations']['cluster-env']['security_enabled'] +hdfs_user_principal = config['configurations']['hadoop-env']['hdfs_principal_name'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +hadoop_conf_dir = "/etc/hadoop/conf" +kinit_path_local = functions.get_kinit_path(["/usr/bin", "/usr/kerberos/bin", "/usr/sbin"]) +tmp_dir = Script.get_tmp_dir() http://git-wip-us.apache.org/repos/asf/ambari/blob/3f1d3dfa/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py index 4102b69..0e5d666 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/zkfc_slave.py @@ -19,7 +19,8 @@ limitations under the License. from resource_management import * from resource_management.libraries.functions.check_process_status import check_process_status - +from resource_management.libraries.functions.security_commons import build_expectations, \ + cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties import utils # this is needed to avoid a circular dependency since utils.py calls this class from hdfs import hdfs @@ -74,6 +75,51 @@ class ZkfcSlave(Script): check_process_status(status_params.zkfc_pid_file) + def security_status(self, env): + import status_params + + env.set_params(status_params) + if status_params.security_enabled: + props_value_check = {"hadoop.security.authentication": "kerberos", + "hadoop.security.authorization": "true"} + props_empty_check = ["hadoop.security.auth_to_local"] + props_read_check = None + core_site_expectations = build_expectations('core-site', props_value_check, props_empty_check, + props_read_check) + + hdfs_expectations = {} + hdfs_expectations.update(core_site_expectations) + + security_params = get_params_from_filesystem(status_params.hadoop_conf_dir, ['core-site.xml']) + result_issues = validate_security_config_properties(security_params, hdfs_expectations) + if not result_issues: # If all validations passed successfully + if status_params.hdfs_user_principal or status_params.hdfs_user_keytab: + try: + cached_kinit_executor(status_params.kinit_path_local, + status_params.hdfs_user, + status_params.hdfs_user_keytab, + status_params.hdfs_user_principal, + status_params.hostname, + status_params.tmp_dir, + 30) + self.put_structured_out({"securityState": "SECURED_KERBEROS"}) + except Exception as e: + self.put_structured_out({"securityState": "ERROR"}) + self.put_structured_out({"securityStateErrorInfo": str(e)}) + else: + self.put_structured_out( + {"securityIssuesFound": "hdfs principal and/or keytab file is not specified"}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + issues = "" + for cf in result_issues: + issues += "Configuration file " + cf + " did not pass the validation. Reason: " + \ + result_issues[cf] + self.put_structured_out({"securityIssuesFound": issues}) + self.put_structured_out({"securityState": "UNSECURED"}) + else: + self.put_structured_out({"securityState": "UNSECURED"}) + if __name__ == "__main__": ZkfcSlave().execute()
