Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 f07b29bae -> 95e273d14


AMBARI-15158. Resource Manager fails to start: IOException: /ats/active does 
not exist. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95e273d1
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95e273d1
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95e273d1

Branch: refs/heads/branch-2.2
Commit: 95e273d14e3245508b1464817d5801db9d658b9f
Parents: f07b29b
Author: Toader, Sebastian <[email protected]>
Authored: Fri Feb 26 16:45:50 2016 +0100
Committer: Toader, Sebastian <[email protected]>
Committed: Sat Feb 27 01:57:38 2016 +0100

----------------------------------------------------------------------
 .../libraries/providers/hdfs_resource.py        | 161 +++++++++----------
 .../2.1.0.2.0/package/scripts/params_linux.py   |   6 +-
 .../package/scripts/resourcemanager.py          |  63 +++++++-
 3 files changed, 145 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
----------------------------------------------------------------------
diff --git 
a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
 
b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
index de0a0ad..ed9a642 100644
--- 
a/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
+++ 
b/ambari-common/src/main/python/resource_management/libraries/providers/hdfs_resource.py
@@ -57,10 +57,10 @@ RESOURCE_TO_JSON_FIELDS = {
 class HdfsResourceJar:
   """
   This is slower than HdfsResourceWebHDFS implementation of HdfsResouce, but 
it works in any cases on any DFS types.
-  
+
   The idea is to put all the files/directories/copyFromLocals we have to 
create/delete into a json file.
   And then create in it with ONLY ONE expensive hadoop call to our custom jar 
fast-hdfs-resource.jar which grabs this json.
-  
+
   'create_and_execute' and 'delete_on_execute' does nothing but add 
files/directories to this json,
   while execute does all the expensive creating/deleting work executing the 
jar with the json as parameter.
   """
@@ -81,7 +81,7 @@ class HdfsResourceJar:
 
     # Add resource to create
     env.config['hdfs_files'].append(resource)
-    
+
   def action_execute(self, main_resource):
     env = Environment.get_instance()
 
@@ -91,7 +91,7 @@ class HdfsResourceJar:
     if not 'hdfs_files' in env.config or not env.config['hdfs_files']:
       Logger.info("No resources to create. 'create_on_execute' or 
'delete_on_execute' wasn't triggered before this 'execute' action.")
       return
-    
+
     hadoop_bin_dir = main_resource.resource.hadoop_bin_dir
     hadoop_conf_dir = main_resource.resource.hadoop_conf_dir
     user = main_resource.resource.user
@@ -139,17 +139,17 @@ class WebHDFSUtil:
 
     address = https_nn_address if self.is_https_enabled else http_nn_address
     protocol = "https" if self.is_https_enabled else "http"
-    
+
     self.address = format("{protocol}://{address}")
     self.run_user = run_user
     self.security_enabled = security_enabled
     self.logoutput = logoutput
-    
+
   @staticmethod
   def is_webhdfs_available(is_webhdfs_enabled, default_fs):
     # only hdfs seems to support webHDFS
     return (is_webhdfs_enabled and default_fs.startswith("hdfs"))
-    
+
   valid_status_codes = ["200", "201"]
   def run_command(self, target, operation, method='POST', 
assertable_result=True, file_to_put=None, ignore_status_codes=[], **kwargs):
     """
@@ -157,48 +157,48 @@ class WebHDFSUtil:
     depending on if query was successful or not, we can assert this for them
     """
     target = HdfsResourceProvider.parse_path(target)
-    
+
     url = 
format("{address}/webhdfs/v1{target}?op={operation}&user.name={run_user}", 
address=self.address, run_user=self.run_user)
     for k,v in kwargs.iteritems():
       url = format("{url}&{k}={v}")
-    
+
     if file_to_put and not os.path.exists(file_to_put):
       raise Fail(format("File {file_to_put} is not found."))
-    
+
     cmd = ["curl", "-sS","-L", "-w", "%{http_code}", "-X", method]
-    
+
     if file_to_put:
       cmd += ["-T", file_to_put]
     if self.security_enabled:
       cmd += ["--negotiate", "-u", ":"]
     if self.is_https_enabled:
       cmd += ["-k"]
-      
+
     cmd.append(url)
     _, out, err = get_user_call_output(cmd, user=self.run_user, 
logoutput=self.logoutput, quiet=False)
     status_code = out[-3:]
     out = out[:-3] # remove last line from output which is status code
-    
+
     try:
       result_dict = json.loads(out)
     except ValueError:
       result_dict = out
-          
+
     if status_code not in WebHDFSUtil.valid_status_codes+ignore_status_codes 
or assertable_result and result_dict and not result_dict['boolean']:
       formatted_output = json.dumps(result_dict, indent=2) if 
isinstance(result_dict, dict) else result_dict
       formatted_output = err + "\n" + formatted_output
       err_msg = "Execution of '%s' returned status_code=%s. %s" % 
(shell.string_cmd_from_args_list(cmd), status_code, formatted_output)
       raise Fail(err_msg)
-    
+
     return result_dict
-    
+
 class HdfsResourceWebHDFS:
   """
   This is the fastest implementation of HdfsResource using WebHDFS.
-  Since it's not available on non-hdfs FS and also can be disabled in scope of 
HDFS. 
+  Since it's not available on non-hdfs FS and also can be disabled in scope of 
HDFS.
   We should still have the other implementations for such a cases.
   """
-  
+
   """
   If we have more than this count of files to recursively chmod/chown
   webhdfs won't be used, but 'hadoop fs -chmod (or chown) -R ..' As it can 
really slow.
@@ -210,51 +210,51 @@ class HdfsResourceWebHDFS:
   contains a lot of files. LISTSTATUS of directory with 1000 files takes ~0.5 
seconds.
   """
   MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS = 250
-  
+
   def action_execute(self, main_resource):
     pass
-  
+
   def _assert_valid(self):
     source = self.main_resource.resource.source
     type = self.main_resource.resource.type
     target = self.main_resource.resource.target
-    
+
     if source:
       if not os.path.exists(source):
         raise Fail(format("Source {source} doesn't exist"))
       if type == "directory" and os.path.isfile(source):
         raise Fail(format("Source {source} is file but type is {type}"))
-      elif type == "file" and os.path.isdir(source): 
+      elif type == "file" and os.path.isdir(source):
         raise Fail(format("Source {source} is directory but type is {type}"))
-    
+
     self.target_status = self._get_file_status(target)
-    
+
     if self.target_status and self.target_status['type'].lower() != type:
       raise Fail(format("Trying to create file/directory but directory/file 
exists in the DFS on {target}"))
-    
+
   def action_delayed(self, action_name, main_resource):
     main_resource.assert_parameter_is_set('user')
-    
+
     if main_resource.resource.security_enabled:
       main_resource.kinit()
 
-    self.util = WebHDFSUtil(main_resource.resource.hdfs_site, 
main_resource.resource.user, 
+    self.util = WebHDFSUtil(main_resource.resource.hdfs_site, 
main_resource.resource.user,
                             main_resource.resource.security_enabled, 
main_resource.resource.logoutput)
     self.mode = oct(main_resource.resource.mode)[1:] if 
main_resource.resource.mode else main_resource.resource.mode
     self.mode_set = False
     self.main_resource = main_resource
     self._assert_valid()
-        
+
     if action_name == "create":
       self._create_resource()
       self._set_mode(self.target_status)
       self._set_owner(self.target_status)
     else:
       self._delete_resource()
-    
+
   def _create_resource(self):
     is_create = (self.main_resource.resource.source == None)
-    
+
     if is_create and self.main_resource.resource.type == "directory":
       self._create_directory(self.main_resource.resource.target)
     elif is_create and self.main_resource.resource.type == "file":
@@ -264,7 +264,7 @@ class HdfsResourceWebHDFS:
     elif not is_create and self.main_resource.resource.type == "directory":
       self._create_directory(self.main_resource.resource.target)
       self._copy_from_local_directory(self.main_resource.resource.target, 
self.main_resource.resource.source)
-    
+
   def _copy_from_local_directory(self, target, source):
     for next_path_part in os.listdir(source):
       new_source = os.path.join(source, next_path_part)
@@ -275,17 +275,17 @@ class HdfsResourceWebHDFS:
         self._copy_from_local_directory(new_target, new_source)
       else:
         self._create_file(new_target, new_source)
-  
+
   def _create_directory(self, target):
     if target == self.main_resource.resource.target and self.target_status:
       return
-    
+
     self.util.run_command(target, 'MKDIRS', method='PUT')
-    
+
   def _get_file_status(self, target):
     list_status = self.util.run_command(target, 'GETFILESTATUS', method='GET', 
ignore_status_codes=['404'], assertable_result=False)
     return list_status['FileStatus'] if 'FileStatus' in list_status else None
-    
+
   def _create_file(self, target, source=None, mode=""):
     """
     PUT file command in slow, however _get_file_status is pretty fast,
@@ -293,12 +293,12 @@ class HdfsResourceWebHDFS:
     """
     file_status = self._get_file_status(target) if 
target!=self.main_resource.resource.target else self.target_status
     mode = "" if not mode else mode
-    
+
     if file_status:
       if source:
         length = file_status['length']
         local_file_size = os.stat(source).st_size # TODO: os -> sudo
-        
+
         # TODO: re-implement this using checksums
         if local_file_size == length:
           Logger.info(format("DFS file {target} is identical to {source}, 
skipping the copying"))
@@ -309,16 +309,16 @@ class HdfsResourceWebHDFS:
       else:
         Logger.info(format("File {target} already exists in DFS, skipping the 
creation"))
         return
-    
+
     Logger.info(format("Creating new file {target} in DFS"))
     kwargs = {'permission': mode} if mode else {}
-      
+
     self.util.run_command(target, 'CREATE', method='PUT', overwrite=True, 
assertable_result=False, file_to_put=source, **kwargs)
-    
+
     if mode and file_status:
       file_status['permission'] = mode
-    
-     
+
+
   def _delete_resource(self):
     if not self.target_status:
           return
@@ -327,17 +327,17 @@ class HdfsResourceWebHDFS:
   def _set_owner(self, file_status=None):
     owner = "" if not self.main_resource.resource.owner else 
self.main_resource.resource.owner
     group = "" if not self.main_resource.resource.group else 
self.main_resource.resource.group
-    
+
     if (not owner or file_status and file_status['owner'] == owner) and (not 
group or file_status and file_status['group'] == group):
       return
-    
+
     self.util.run_command(self.main_resource.resource.target, 'SETOWNER', 
method='PUT', owner=owner, group=group, assertable_result=False)
-    
+
     results = []
-    
+
     if self.main_resource.resource.recursive_chown:
       content_summary = 
self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', 
method='GET', assertable_result=False)
-      
+
       if content_summary['ContentSummary']['fileCount'] <= 
HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and 
content_summary['ContentSummary']['directoryCount'] <= 
HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
         self._fill_directories_list(self.main_resource.resource.target, 
results)
       else: # avoid chmowning a lot of files and listing a lot dirs via 
webhdfs which can take a lot of time.
@@ -345,34 +345,34 @@ class HdfsResourceWebHDFS:
 
     if self.main_resource.resource.change_permissions_for_parents:
       self._fill_in_parent_directories(self.main_resource.resource.target, 
results)
-      
+
     for path in results:
       self.util.run_command(path, 'SETOWNER', method='PUT', owner=owner, 
group=group, assertable_result=False)
-  
+
   def _set_mode(self, file_status=None):
     if not self.mode or file_status and file_status['permission'] == self.mode:
       return
-    
+
     if not self.mode_set:
       self.util.run_command(self.main_resource.resource.target, 
'SETPERMISSION', method='PUT', permission=self.mode, assertable_result=False)
-    
+
     results = []
-    
+
     if self.main_resource.resource.recursive_chmod:
       content_summary = 
self.util.run_command(self.main_resource.resource.target, 'GETCONTENTSUMMARY', 
method='GET', assertable_result=False)
-      
+
       if content_summary['ContentSummary']['fileCount'] <= 
HdfsResourceWebHDFS.MAX_FILES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS and 
content_summary['ContentSummary']['directoryCount'] <= 
HdfsResourceWebHDFS.MAX_DIRECTORIES_FOR_RECURSIVE_ACTION_VIA_WEBHDFS:
         self._fill_directories_list(self.main_resource.resource.target, 
results)
       else: # avoid chmoding a lot of files and listing a lot dirs via webhdfs 
which can take a lot of time.
         shell.checked_call(["hadoop", "fs", "-chmod", "-R", self.mode, 
self.main_resource.resource.target], user=self.main_resource.resource.user)
-      
+
     if self.main_resource.resource.change_permissions_for_parents:
       self._fill_in_parent_directories(self.main_resource.resource.target, 
results)
-      
+
     for path in results:
       self.util.run_command(path, 'SETPERMISSION', method='PUT', 
permission=self.mode, assertable_result=False)
-    
-    
+
+
   def _fill_in_parent_directories(self, target, results):
     path_parts = HdfsResourceProvider.parse_path(target).split("/")[1:]# [1:] 
remove '' from parts
     path = "/"
@@ -380,26 +380,25 @@ class HdfsResourceWebHDFS:
     for path_part in path_parts:
       path += path_part + "/"
       results.append(path)
-      
+
   def _fill_directories_list(self, target, results):
     list_status = self.util.run_command(target, 'LISTSTATUS', method='GET', 
assertable_result=False)['FileStatuses']['FileStatus']
-    
+
     for file in list_status:
       if file['pathSuffix']:
         new_path = target + "/" + file['pathSuffix']
         results.append(new_path)
-        
+
         if file['type'] == 'DIRECTORY':
-          self._fill_directories_list(new_path, results)  
-    
+          self._fill_directories_list(new_path, results)
+
 class HdfsResourceProvider(Provider):
   def __init__(self, resource):
     super(HdfsResourceProvider,self).__init__(resource)
     self.assert_parameter_is_set('hdfs_site')
-    self.ignored_resources_list = self.get_ignored_resources_list()
+    self.ignored_resources_list = 
HdfsResourceProvider.get_ignored_resources_list(self.resource.hdfs_resource_ignore_file)
     self.webhdfs_enabled = self.resource.hdfs_site['dfs.webhdfs.enabled']
-    
-      
+          
   @staticmethod
   def parse_path(path):
     """
@@ -410,33 +409,34 @@ class HdfsResourceProvider(Provider):
     """
     math_with_protocol_and_nn_url = re.match("[a-zA-Z]+://[^/]+(/.+)", path)
     math_with_protocol = re.match("[a-zA-Z]+://(/.+)", path)
-    
+
     if math_with_protocol_and_nn_url:
       path = math_with_protocol_and_nn_url.group(1)
     elif math_with_protocol:
       path = math_with_protocol.group(1)
     else:
       path = path
-      
+
     return re.sub("[/]+", "/", path)
-  
-  def get_ignored_resources_list(self):
-    if not self.resource.hdfs_resource_ignore_file or not 
os.path.exists(self.resource.hdfs_resource_ignore_file):
+
+  @staticmethod
+  def get_ignored_resources_list(hdfs_resource_ignore_file):
+    if not hdfs_resource_ignore_file or not 
os.path.exists(hdfs_resource_ignore_file):
       return []
-    
-    with open(self.resource.hdfs_resource_ignore_file, "rb") as fp:
+
+    with open(hdfs_resource_ignore_file, "rb") as fp:
       content = fp.read()
-      
+
     hdfs_resources_to_ignore = []
     for hdfs_resource_to_ignore in content.split("\n"):
       
hdfs_resources_to_ignore.append(HdfsResourceProvider.parse_path(hdfs_resource_to_ignore))
-            
+
     return hdfs_resources_to_ignore
 
-    
+
   def action_delayed(self, action_name):
     self.assert_parameter_is_set('type')
-    
+
     if HdfsResourceProvider.parse_path(self.resource.target) in 
self.ignored_resources_list:
       Logger.info("Skipping '{0}' because it is in ignore file 
{1}.".format(self.resource, self.resource.hdfs_resource_ignore_file))
       return
@@ -457,19 +457,18 @@ class HdfsResourceProvider(Provider):
       return HdfsResourceWebHDFS()
     else:
       return HdfsResourceJar()
-  
+
   def assert_parameter_is_set(self, parameter_name):
     if not getattr(self.resource, parameter_name):
       raise Fail("Resource parameter '{0}' is not set.".format(parameter_name))
     return True
-  
+
   def kinit(self):
     keytab_file = self.resource.keytab
     kinit_path = self.resource.kinit_path_local
     principal_name = self.resource.principal_name
     user = self.resource.user
-    
+
     Execute(format("{kinit_path} -kt {keytab_file} {principal_name}"),
             user=user
-    )    
-
+    )

http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index 83f47a8..8d7427d 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -259,6 +259,10 @@ hdfs_principal_name = 
config['configurations']['hadoop-env']['hdfs_principal_nam
 
 hdfs_site = config['configurations']['hdfs-site']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
+is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled']
+
+# Path to file that contains list of HDFS resources to be skipped during 
processing
+hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore"
 
 import functools
 #create partial functions with common arguments for every HdfsResource call
@@ -266,7 +270,7 @@ import functools
 HdfsResource = functools.partial(
   HdfsResource,
   user=hdfs_user,
-  hdfs_resource_ignore_file = 
"/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  hdfs_resource_ignore_file = hdfs_resource_ignore_file,
   security_enabled = security_enabled,
   keytab = hdfs_user_keytab,
   kinit_path_local = kinit_path_local,

http://git-wip-us.apache.org/repos/asf/ambari/blob/95e273d1/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
index ec7799e..d40abff 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
@@ -20,7 +20,6 @@ Ambari Agent
 """
 
 from resource_management.libraries.script.script import Script
-from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import hdp_select
 from resource_management.libraries.functions.check_process_status import 
check_process_status
@@ -29,13 +28,17 @@ from resource_management.libraries.functions.version import 
compare_versions, fo
 from resource_management.libraries.functions.security_commons import 
build_expectations, \
   cached_kinit_executor, get_params_from_filesystem, 
validate_security_config_properties, \
   FILE_TYPE_XML
+from resource_management.libraries.functions.decorator import retry
 from resource_management.core.resources.system import File, Execute
 from resource_management.core.source import Template
 from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil
+from resource_management.libraries.providers.hdfs_resource import 
HdfsResourceProvider
+from resource_management import is_empty
+from resource_management import shell
 
 
-
-from install_jars import install_tez_jars
 from yarn import yarn
 from service import service
 from ambari_commons import OSConst
@@ -113,6 +116,12 @@ class ResourcemanagerDefault(Resourcemanager):
     self.configure(env) # FOR SECURITY
     if params.has_ranger_admin and params.is_supported_yarn_ranger:
       setup_ranger_yarn() #Ranger Yarn Plugin related calls
+
+    # wait for active-dir and done-dir to be created by ATS if needed
+    if params.has_ats:
+      Logger.info("Verifying DFS directories where ATS stores time line data 
for active and completed applications.")
+      self.wait_for_dfs_directories_created(params.entity_groupfs_store_dir, 
params.entity_groupfs_active_dir)
+
     service('resourcemanager', action='start')
 
   def status(self, env):
@@ -217,5 +226,53 @@ class ResourcemanagerDefault(Resourcemanager):
     pass
 
 
+
+
+  def wait_for_dfs_directories_created(self, *dirs):
+    import params
+
+    ignored_dfs_dirs = 
HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file)
+
+    if params.security_enabled:
+      Execute(
+        format("{rm_kinit_cmd}")
+        , user=params.yarn_user
+      )
+
+    for dir_path in dirs:
+      self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs)
+
+
+  @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail)
+  def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
+    import params
+
+
+    if not is_empty(dir_path):
+      dir_path = HdfsResourceProvider.parse_path(dir_path)
+
+      if dir_path in ignored_dfs_dirs:
+        Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked 
to be ignored.")
+        return
+
+      Logger.info("Verifying if DFS directory '" + dir_path + "' exists.")
+
+      dir_exists = None
+
+      if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, 
params.default_fs):
+        # check with webhdfs is much faster than executing hdfs dfs -test
+        util = WebHDFSUtil(params.hdfs_site, params.yarn_user, 
params.security_enabled)
+        list_status = util.run_command(dir_path, 'GETFILESTATUS', 
method='GET', ignore_status_codes=['404'], assertable_result=False)
+        dir_exists = ('FileStatus' in list_status)
+      else:
+        # have to do time expensive hdfs dfs -d check.
+        dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs 
-test -d " + dir_path), user=params.yarn_user)[0]
+        dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir 
exists
+
+      if not dir_exists:
+        raise Fail("DFS directory '" + dir_path + "' does not exist !")
+      else:
+        Logger.info("DFS directory '" + dir_path + "' exists.")
+
 if __name__ == "__main__":
   Resourcemanager().execute()

Reply via email to