Repository: ambari Updated Branches: refs/heads/branch-2.2 f07b29bae -> 95e273d14
AMBARI-15158. Resource Manager fails to start: IOException: /ats/active does not exist. (stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95e273d1 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95e273d1 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95e273d1 Branch: refs/heads/branch-2.2 Commit: 95e273d14e3245508b1464817d5801db9d658b9f Parents: f07b29b Author: Toader, Sebastian <[email protected]> Authored: Fri Feb 26 16:45:50 2016 +0100 Committer: Toader, Sebastian <[email protected]> Committed: Sat Feb 27 01:57:38 2016 +0100 ---------------------------------------------------------------------- .../libraries/providers/hdfs_resource.py | 161 +++++++++---------- .../2.1.0.2.0/package/scripts/params_linux.py | 6 +- .../package/scripts/resourcemanager.py | 63 +++++++- 3 files changed, 145 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py index de0a0ad..ed9a642 100644 --- a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py +++ b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py @@ -57,10 +57,10 @@ RESOURCE_TO_JSON_FIELDS = { class HdfsResourceJar: """ This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but it works in any cases on any DFS types. - + The idea is to put all the files/directories/copyFromLocals we have to create/delete into a json file. And then create in it with ONLY ONE expensive hadoop call to our custom jar fast-hdfs-resource.jar which grabs this json. - + 'create_and_execute' and 'delete_on_execute' does nothing but add files/directories to this json, while execute does all the expensive creating/deleting work executing the jar with the json as parameter. """ @@ -81,7 +81,7 @@ class HdfsResourceJar: # Add resource to create env.config['hdfs_files'].append(resource) - + def action_execute(self, main_resource): env = Environment.get_instance() @@ -91,7 +91,7 @@ class HdfsResourceJar: if not 'hdfs_files' in env.config or not env.config['hdfs_files']: Logger.info("No resources to create. 'create_on_execute' or 'delete_on_execute' wasn't triggered before this 'execute' action.") return - + hadoop_bin_dir = main_resource.resource.hadoop_bin_dir hadoop_conf_dir = main_resource.resource.hadoop_conf_dir user = main_resource.resource.user @@ -139,17 +139,17 @@ class WebHDFSUtil: address = https_nn_address if self.is_https_enabled else http_nn_address protocol = "https" if self.is_https_enabled else "http" - + self.address = format("{protocol}://{address}") self.run_user = run_user self.security_enabled = security_enabled self.logoutput = logoutput - + @staticmethod def is_webhdfs_available(is_webhdfs_enabled, default_fs): # only hdfs seems to support webHDFS return (is_webhdfs_enabled and default_fs.startswith("hdfs")) - + valid_status_codes = ["200", "201"] def run_command(self, target, operation, method='POST', assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs): """ @@ -157,48 +157,48 @@ class WebHDFSUtil: depending on if query was successful or not, we can assert this for them """ target = HdfsResourceProvider.parse_path(target) - + url = format("{address}/webhdfs/v1{target}?op={operation}&user.name={run_user}", address=self.address, run_user=self.run_user) for k,v in kwargs.iteritems(): url = format("{url}&{k}={v}") - + if file_to_put and not os.path.exists(file_to_put): raise Fail(format("File {file_to_put} is not found.")) - + cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method] - + if file_to_put: cmd += ["-T", file_to_put] if self.security_enabled: cmd += ["--negotiate", "-u", ":"] if self.is_https_enabled: cmd += ["-k"] - + cmd.append(url) _, out, err = get_user_call_output(cmd, user=self.run_user, logoutput=self.logoutput, quiet=False) status_code = out[-3:] out = out[:-3] # remove last line from output which is status code - + try: result_dict = json.loads(out) except ValueError: result_dict = out - + if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes or assertable_result and result_dict and not result_dict['boolean']: formatted_output = json.dumps(result_dict, indent=2) if isinstance(result_dict, dict) else result_dict formatted_output = err + "\n" + formatted_output err_msg = "Execution of '%s' returned status_code=%s. %s" % (shell.string_cmd_from_args_list(cmd), status_code, formatted_output) raise Fail(err_msg) - + return result_dict - + class HdfsResourceWebHDFS: """ This is the fastest implementation of HdfsResource using WebHDFS. - Since it's not available on non-hdfs FS and also can be disabled in scope of HDFS. + Since it's not available on non-hdfs FS and also can be disabled in scope of HDFS. We should still have the other implementations for such a cases. """ - + """ If we have more than this count of files to recursively chmod/chown webhdfs won't be used, but 'hadoop fs -chmod (or chown) -R ..' As it can really slow. @@ -210,51 +210,51 @@ class HdfsResourceWebHDFS: contains a lot of files. LISTSTATUS of directory with 1000 files takes ~0.5 seconds. """ MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 250 - + def action_execute(self, main_resource): pass - + def _assert_valid(self): source = self.main_resource.resource.source type = self.main_resource.resource.type target = self.main_resource.resource.target - + if source: if not os.path.exists(source): raise Fail(format("Source {source} doesn't exist")) if type == "directory" and os.path.isfile(source): raise Fail(format("Source {source} is file but type is {type}")) - elif type == "file" and os.path.isdir(source): + elif type == "file" and os.path.isdir(source): raise Fail(format("Source {source} is directory but type is {type}")) - + self.target_status = self._get_file_status(target) - + if self.target_status and self.target_status['type'].lower() != type: raise Fail(format("Trying to create file/directory but directory/file exists in the DFS on {target}")) - + def action_delayed(self, action_name, main_resource): main_resource.assert_parameter_is_set('user') - + if main_resource.resource.security_enabled: main_resource.kinit() - self.util = WebHDFSUtil(main_resource.resource.hdfs_site, main_resource.resource.user, + self.util = WebHDFSUtil(main_resource.resource.hdfs_site, main_resource.resource.user, main_resource.resource.security_enabled, main_resource.resource.logoutput) self.mode = oct(main_resource.resource.mode)[1:] if main_resource.resource.mode else main_resource.resource.mode self.mode_set = False self.main_resource = main_resource self._assert_valid() - + if action_name == "create": self._create_resource() self._set_mode(self.target_status) self._set_owner(self.target_status) else: self._delete_resource() - + def _create_resource(self): is_create = (self.main_resource.resource.source == None) - + if is_create and self.main_resource.resource.type == "directory": self._create_directory(self.main_resource.resource.target) elif is_create and self.main_resource.resource.type == "file": @@ -264,7 +264,7 @@ class HdfsResourceWebHDFS: elif not is_create and self.main_resource.resource.type == "directory": self._create_directory(self.main_resource.resource.target) self._copy_from_local_directory(self.main_resource.resource.target, self.main_resource.resource.source) - + def _copy_from_local_directory(self, target, source): for next_path_part in os.listdir(source): new_source = os.path.join(source, next_path_part) @@ -275,17 +275,17 @@ class HdfsResourceWebHDFS: self._copy_from_local_directory(new_target, new_source) else: self._create_file(new_target, new_source) - + def _create_directory(self, target): if target == self.main_resource.resource.target and self.target_status: return - + self.util.run_command(target, 'MKDIRS', method='PUT') - + def _get_file_status(self, target): list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) return list_status['FileStatus'] if 'FileStatus' in list_status else None - + def _create_file(self, target, source=None, mode=""): """ PUT file command in slow, however _get_file_status is pretty fast, @@ -293,12 +293,12 @@ class HdfsResourceWebHDFS: """ file_status = self._get_file_status(target) if target!=self.main_resource.resource.target else self.target_status mode = "" if not mode else mode - + if file_status: if source: length = file_status['length'] local_file_size = os.stat(source).st_size # TODO: os -> sudo - + # TODO: re-implement this using checksums if local_file_size == length: Logger.info(format("DFS file {target} is identical to {source}, skipping the copying")) @@ -309,16 +309,16 @@ class HdfsResourceWebHDFS: else: Logger.info(format("File {target} already exists in DFS, skipping the creation")) return - + Logger.info(format("Creating new file {target} in DFS")) kwargs = {'permission': mode} if mode else {} - + self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, assertable_result=False, file_to_put=source, **kwargs) - + if mode and file_status: file_status['permission'] = mode - - + + def _delete_resource(self): if not self.target_status: return @@ -327,17 +327,17 @@ class HdfsResourceWebHDFS: def _set_owner(self, file_status=None): owner = "" if not self.main_resource.resource.owner else self.main_resource.resource.owner group = "" if not self.main_resource.resource.group else self.main_resource.resource.group - + if (not owner or file_status and file_status['owner'] == owner) and (not group or file_status and file_status['group'] == group): return - + self.util.run_command(self.main_resource.resource.target, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False) - + results = [] - + if self.main_resource.resource.recursive_chown: content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False) - + if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS: self._fill_directories_list(self.main_resource.resource.target, results) else: # avoid chmowning a lot of files and listing a lot dirs via webhdfs which can take a lot of time. @@ -345,34 +345,34 @@ class HdfsResourceWebHDFS: if self.main_resource.resource.change_permissions_for_parents: self._fill_in_parent_directories(self.main_resource.resource.target, results) - + for path in results: self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, group=group, assertable_result=False) - + def _set_mode(self, file_status=None): if not self.mode or file_status and file_status['permission'] == self.mode: return - + if not self.mode_set: self.util.run_command(self.main_resource.resource.target, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False) - + results = [] - + if self.main_resource.resource.recursive_chmod: content_summary = self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', method='GET', assertable_result=False) - + if content_summary['ContentSummary']['fileCount'] <= HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and content_summary['ContentSummary']['directoryCount'] <= HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS: self._fill_directories_list(self.main_resource.resource.target, results) else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs which can take a lot of time. shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, self.main_resource.resource.target], user=self.main_resource.resource.user) - + if self.main_resource.resource.change_permissions_for_parents: self._fill_in_parent_directories(self.main_resource.resource.target, results) - + for path in results: self.util.run_command(path, 'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False) - - + + def _fill_in_parent_directories(self, target, results): path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] remove '' from parts path = "/" @@ -380,26 +380,25 @@ class HdfsResourceWebHDFS: for path_part in path_parts: path += path_part + "/" results.append(path) - + def _fill_directories_list(self, target, results): list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', assertable_result=False)['FileStatuses']['FileStatus'] - + for file in list_status: if file['pathSuffix']: new_path = target + "/" + file['pathSuffix'] results.append(new_path) - + if file['type'] == 'DIRECTORY': - self._fill_directories_list(new_path, results) - + self._fill_directories_list(new_path, results) + class HdfsResourceProvider(Provider): def __init__(self, resource): super(HdfsResourceProvider,self).__init__(resource) self.assert_parameter_is_set('hdfs_site') - self.ignored_resources_list = self.get_ignored_resources_list() + self.ignored_resources_list = HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file) self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled'] - - + @staticmethod def parse_path(path): """ @@ -410,33 +409,34 @@ class HdfsResourceProvider(Provider): """ math_with_protocol_and_nn_url = re.match("[a-zA-Z]+://[^/]+(/.+)", path) math_with_protocol = re.match("[a-zA-Z]+://(/.+)", path) - + if math_with_protocol_and_nn_url: path = math_with_protocol_and_nn_url.group(1) elif math_with_protocol: path = math_with_protocol.group(1) else: path = path - + return re.sub("[/]+", "/", path) - - def get_ignored_resources_list(self): - if not self.resource.hdfs_resource_ignore_file or not os.path.exists(self.resource.hdfs_resource_ignore_file): + + @staticmethod + def get_ignored_resources_list(hdfs_resource_ignore_file): + if not hdfs_resource_ignore_file or not os.path.exists(hdfs_resource_ignore_file): return [] - - with open(self.resource.hdfs_resource_ignore_file, "rb") as fp: + + with open(hdfs_resource_ignore_file, "rb") as fp: content = fp.read() - + hdfs_resources_to_ignore = [] for hdfs_resource_to_ignore in content.split("\n"): hdfs_resources_to_ignore.append(HdfsResourceProvider.parse_path(hdfs_resource_to_ignore)) - + return hdfs_resources_to_ignore - + def action_delayed(self, action_name): self.assert_parameter_is_set('type') - + if HdfsResourceProvider.parse_path(self.resource.target) in self.ignored_resources_list: Logger.info("Skipping '{0}' because it is in ignore file {1}.".format(self.resource, self.resource.hdfs_resource_ignore_file)) return @@ -457,19 +457,18 @@ class HdfsResourceProvider(Provider): return HdfsResourceWebHDFS() else: return HdfsResourceJar() - + def assert_parameter_is_set(self, parameter_name): if not getattr(self.resource, parameter_name): raise Fail("Resource parameter '{0}' is not set.".format(parameter_name)) return True - + def kinit(self): keytab_file = self.resource.keytab kinit_path = self.resource.kinit_path_local principal_name = self.resource.principal_name user = self.resource.user - + Execute(format("{kinit_path} -kt {keytab_file} {principal_name}"), user=user - ) - + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py index 83f47a8..8d7427d 100644 --- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py @@ -259,6 +259,10 @@ hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_nam hdfs_site = config['configurations']['hdfs-site'] default_fs = config['configurations']['core-site']['fs.defaultFS'] +is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled'] + +# Path to file that contains list of HDFS resources to be skipped during processing +hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore" import functools #create partial functions with common arguments for every HdfsResource call @@ -266,7 +270,7 @@ import functools HdfsResource = functools.partial( HdfsResource, user=hdfs_user, - hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore", + hdfs_resource_ignore_file = hdfs_resource_ignore_file, security_enabled = security_enabled, keytab = hdfs_user_keytab, kinit_path_local = kinit_path_local, http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py index ec7799e..d40abff 100644 --- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py +++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py @@ -20,7 +20,6 @@ Ambari Agent """ from resource_management.libraries.script.script import Script -from resource_management.libraries.resources.hdfs_resource import HdfsResource from resource_management.libraries.functions import conf_select from resource_management.libraries.functions import hdp_select from resource_management.libraries.functions.check_process_status import check_process_status @@ -29,13 +28,17 @@ from resource_management.libraries.functions.version import compare_versions, fo from resource_management.libraries.functions.security_commons import build_expectations, \ cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ FILE_TYPE_XML +from resource_management.libraries.functions.decorator import retry from resource_management.core.resources.system import File, Execute from resource_management.core.source import Template from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil +from resource_management.libraries.providers.hdfs_resource import HdfsResourceProvider +from resource_management import is_empty +from resource_management import shell - -from install_jars import install_tez_jars from yarn import yarn from service import service from ambari_commons import OSConst @@ -113,6 +116,12 @@ class ResourcemanagerDefault(Resourcemanager): self.configure(env) # FOR SECURITY if params.has_ranger_admin and params.is_supported_yarn_ranger: setup_ranger_yarn() #Ranger Yarn Plugin related calls + + # wait for active-dir and done-dir to be created by ATS if needed + if params.has_ats: + Logger.info("Verifying DFS directories where ATS stores time line data for active and completed applications.") + self.wait_for_dfs_directories_created(params.entity_groupfs_store_dir, params.entity_groupfs_active_dir) + service('resourcemanager', action='start') def status(self, env): @@ -217,5 +226,53 @@ class ResourcemanagerDefault(Resourcemanager): pass + + + def wait_for_dfs_directories_created(self, *dirs): + import params + + ignored_dfs_dirs = HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file) + + if params.security_enabled: + Execute( + format("{rm_kinit_cmd}") + , user=params.yarn_user + ) + + for dir_path in dirs: + self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs) + + + @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail) + def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): + import params + + + if not is_empty(dir_path): + dir_path = HdfsResourceProvider.parse_path(dir_path) + + if dir_path in ignored_dfs_dirs: + Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked to be ignored.") + return + + Logger.info("Verifying if DFS directory '" + dir_path + "' exists.") + + dir_exists = None + + if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + # check with webhdfs is much faster than executing hdfs dfs -test + util = WebHDFSUtil(params.hdfs_site, params.yarn_user, params.security_enabled) + list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) + dir_exists = ('FileStatus' in list_status) + else: + # have to do time expensive hdfs dfs -d check. + dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs -test -d " + dir_path), user=params.yarn_user)[0] + dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir exists + + if not dir_exists: + raise Fail("DFS directory '" + dir_path + "' does not exist !") + else: + Logger.info("DFS directory '" + dir_path + "' exists.") + if __name__ == "__main__": Resourcemanager().execute()
