Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 42a3e69c5 -> c4c1b74bc


AMBARI-15158. Resource Manager fails to start: IOException: /ats/active does 
not exist. (stoader)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c4c1b74b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c4c1b74b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c4c1b74b

Branch: refs/heads/branch-2.2
Commit: c4c1b74bc75947e795a5ce3dd564bc0fa3f9243f
Parents: 42a3e69
Author: Toader, Sebastian <[email protected]>
Authored: Fri Feb 26 18:25:23 2016 +0100
Committer: Toader, Sebastian <[email protected]>
Committed: Fri Feb 26 18:25:23 2016 +0100

----------------------------------------------------------------------
 .../2.1.0.2.0/package/scripts/params_linux.py   | 10 +++-
 .../package/scripts/resourcemanager.py          | 63 +++++++++++++++++++-
 2 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c1b74b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
index 83f47a8..40fa915 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/params_linux.py
@@ -30,7 +30,6 @@ from resource_management.libraries.functions.version import 
format_hdp_stack_ver
 from resource_management.libraries.functions.default import default
 from resource_management.libraries import functions
 
-
 import status_params
 
 # a map of the Ambari role to the component name
@@ -259,6 +258,13 @@ hdfs_principal_name = 
config['configurations']['hadoop-env']['hdfs_principal_nam
 
 hdfs_site = config['configurations']['hdfs-site']
 default_fs = config['configurations']['core-site']['fs.defaultFS']
+is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled']
+
+# Path to file that contains list of HDFS resources to be skipped during 
processing
+hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore"
+
+dfs_type = default("/commandParams/dfs_type", "")
+
 
 import functools
 #create partial functions with common arguments for every HdfsResource call
@@ -266,7 +272,7 @@ import functools
 HdfsResource = functools.partial(
   HdfsResource,
   user=hdfs_user,
-  hdfs_resource_ignore_file = 
"/var/lib/ambari-agent/data/.hdfs_resource_ignore",
+  hdfs_resource_ignore_file = hdfs_resource_ignore_file,
   security_enabled = security_enabled,
   keytab = hdfs_user_keytab,
   kinit_path_local = kinit_path_local,

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c1b74b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
index ec7799e..d40abff 100644
--- 
a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
+++ 
b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/scripts/resourcemanager.py
@@ -20,7 +20,6 @@ Ambari Agent
 """
 
 from resource_management.libraries.script.script import Script
-from resource_management.libraries.resources.hdfs_resource import HdfsResource
 from resource_management.libraries.functions import conf_select
 from resource_management.libraries.functions import hdp_select
 from resource_management.libraries.functions.check_process_status import 
check_process_status
@@ -29,13 +28,17 @@ from resource_management.libraries.functions.version import 
compare_versions, fo
 from resource_management.libraries.functions.security_commons import 
build_expectations, \
   cached_kinit_executor, get_params_from_filesystem, 
validate_security_config_properties, \
   FILE_TYPE_XML
+from resource_management.libraries.functions.decorator import retry
 from resource_management.core.resources.system import File, Execute
 from resource_management.core.source import Template
 from resource_management.core.logger import Logger
+from resource_management.core.exceptions import Fail
+from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil
+from resource_management.libraries.providers.hdfs_resource import 
HdfsResourceProvider
+from resource_management import is_empty
+from resource_management import shell
 
 
-
-from install_jars import install_tez_jars
 from yarn import yarn
 from service import service
 from ambari_commons import OSConst
@@ -113,6 +116,12 @@ class ResourcemanagerDefault(Resourcemanager):
     self.configure(env) # FOR SECURITY
     if params.has_ranger_admin and params.is_supported_yarn_ranger:
       setup_ranger_yarn() #Ranger Yarn Plugin related calls
+
+    # wait for active-dir and done-dir to be created by ATS if needed
+    if params.has_ats:
+      Logger.info("Verifying DFS directories where ATS stores time line data 
for active and completed applications.")
+      self.wait_for_dfs_directories_created(params.entity_groupfs_store_dir, 
params.entity_groupfs_active_dir)
+
     service('resourcemanager', action='start')
 
   def status(self, env):
@@ -217,5 +226,53 @@ class ResourcemanagerDefault(Resourcemanager):
     pass
 
 
+
+
+  def wait_for_dfs_directories_created(self, *dirs):
+    import params
+
+    ignored_dfs_dirs = 
HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file)
+
+    if params.security_enabled:
+      Execute(
+        format("{rm_kinit_cmd}")
+        , user=params.yarn_user
+      )
+
+    for dir_path in dirs:
+      self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs)
+
+
+  @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail)
+  def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs):
+    import params
+
+
+    if not is_empty(dir_path):
+      dir_path = HdfsResourceProvider.parse_path(dir_path)
+
+      if dir_path in ignored_dfs_dirs:
+        Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked 
to be ignored.")
+        return
+
+      Logger.info("Verifying if DFS directory '" + dir_path + "' exists.")
+
+      dir_exists = None
+
+      if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, 
params.default_fs):
+        # check with webhdfs is much faster than executing hdfs dfs -test
+        util = WebHDFSUtil(params.hdfs_site, params.yarn_user, 
params.security_enabled)
+        list_status = util.run_command(dir_path, 'GETFILESTATUS', 
method='GET', ignore_status_codes=['404'], assertable_result=False)
+        dir_exists = ('FileStatus' in list_status)
+      else:
+        # have to do time expensive hdfs dfs -d check.
+        dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs 
-test -d " + dir_path), user=params.yarn_user)[0]
+        dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir 
exists
+
+      if not dir_exists:
+        raise Fail("DFS directory '" + dir_path + "' does not exist !")
+      else:
+        Logger.info("DFS directory '" + dir_path + "' exists.")
+
 if __name__ == "__main__":
   Resourcemanager().execute()

Reply via email to