Repository: ambari Updated Branches: refs/heads/branch-2.2 2d4bf7197 -> 9b86d41e1
AMBARI-15762. Component install post processing can not be run in parallel (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9b86d41e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9b86d41e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9b86d41e Branch: refs/heads/branch-2.2 Commit: 9b86d41e197adc31ab29c1bc71a578a7d4b5da81 Parents: 2d4bf71 Author: Andrew Onishuk <[email protected]> Authored: Thu Apr 7 20:29:01 2016 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Thu Apr 7 20:29:01 2016 +0300 ---------------------------------------------------------------------- .../TestFileBasedProcessLock.py | 61 ++++++++++++++++ .../functions/file_based_process_lock.py | 73 ++++++++++++++++++++ .../2.0.6/hooks/after-INSTALL/scripts/params.py | 5 ++ .../scripts/shared_initialization.py | 8 ++- 4 files changed, 145 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py new file mode 100644 index 0000000..e4606cc --- /dev/null +++ b/ambari-agent/src/test/python/resource_management/TestFileBasedProcessLock.py @@ -0,0 +1,61 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import os +import tempfile +import time +import shutil +from unittest import TestCase +from multiprocessing import Process +from only_for_platform import not_for_platform, PLATFORM_WINDOWS +from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock + +class TestFileBasedProcessLock(TestCase): + + + @not_for_platform(PLATFORM_WINDOWS) + def test_file_based_lock(self): + """ + Test BlockingLock using mkdir atomicity. + """ + test_temp_dir = tempfile.mkdtemp(prefix="test_file_based_lock") + try: + indicator_dir = os.path.join(test_temp_dir, "indicator") + lock_file = os.path.join(test_temp_dir, "lock") + + # Raises an exception if mkdir operation fails. + # It indicates that more than one process acquired the lock. + def dummy_task(index): + with FileBasedProcessLock(lock_file): + os.mkdir(indicator_dir) + time.sleep(0.1) + os.rmdir(indicator_dir) + + process_list = [] + for i in range(0, 3): + p = Process(target=dummy_task, args=(i,)) + p.start() + process_list.append(p) + + for p in process_list: + p.join(2) + self.assertEquals(p.exitcode, 0) + + finally: + shutil.rmtree(test_temp_dir) + http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py new file mode 100644 index 0000000..f9c981d --- /dev/null +++ b/ambari-common/src/main/python/resource_management/libraries/functions/file_based_process_lock.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Ambari Agent + +""" + +import fcntl + +from resource_management.core.logger import Logger + +class FileBasedProcessLock(object): + """A file descriptor based lock for interprocess locking. + The lock is automatically released when process dies. + + WARNING: Do not use this lock for synchronization between threads. + Multiple threads in a same process can simultaneously acquire this lock. + It should be used only for locking between processes. + """ + + def __init__(self, lock_file_path): + """ + :param lock_file_path: The path to the file used for locking + """ + self.lock_file_name = lock_file_path + self.lock_file = None + + def blocking_lock(self): + """ + Creates the lock file if it doesn't exist. + Waits to acquire an exclusive lock on the lock file descriptor. + """ + Logger.info("Trying to acquire a lock on {0}".format(self.lock_file_name)) + if self.lock_file is None or self.lock_file.closed: + self.lock_file = open(self.lock_file_name, 'a') + fcntl.lockf(self.lock_file, fcntl.LOCK_EX) + Logger.info("Acquired the lock on {0}".format(self.lock_file_name)) + + def unlock(self): + """ + Unlocks the lock file descriptor. + """ + Logger.info("Releasing the lock on {0}".format(self.lock_file_name)) + fcntl.lockf(self.lock_file, fcntl.LOCK_UN) + try: + if self.lock_file is not None: + self.lock_file.close() + self.lock_file = None + except IOError: + pass + + def __enter__(self): + self.blocking_lock() + return None + + def __exit__(self, exc_type, exc_val, exc_tb): + self.unlock() + return False \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py index 68fe9f9..603875d 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/params.py @@ -17,6 +17,8 @@ limitations under the License. """ +import os + from ambari_commons.constants import AMBARI_SUDO_BINARY from resource_management.libraries.script import Script from resource_management.libraries.functions import default @@ -26,6 +28,7 @@ from resource_management.libraries.functions import format_jvm_option from resource_management.libraries.functions.version import format_hdp_stack_version config = Script.get_config() +tmp_dir = Script.get_tmp_dir() dfs_type = default("/commandParams/dfs_type", "") @@ -89,3 +92,5 @@ has_namenode = not len(namenode_host) == 0 if has_namenode or dfs_type == 'HCFS': hadoop_conf_dir = conf_select.get_hadoop_conf_dir(force_latest_on_upgrade=True) + +link_configs_lock_file = os.path.join(tmp_dir, "link_configs_lock_file") http://git-wip-us.apache.org/repos/asf/ambari/blob/9b86d41e/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py index 8ee2f7a..1d61dfc 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/after-INSTALL/scripts/shared_initialization.py @@ -24,6 +24,7 @@ from resource_management.libraries.functions import conf_select from resource_management.libraries.functions import hdp_select from resource_management.libraries.functions.format import format from resource_management.libraries.functions.version import compare_versions +from resource_management.libraries.functions.file_based_process_lock import FileBasedProcessLock from resource_management.libraries.resources.xml_config import XmlConfig from resource_management.libraries.script import Script @@ -85,6 +86,7 @@ def link_configs(struct_out_file): """ Links configs, only on a fresh install of HDP-2.3 and higher """ + import params if not Script.is_hdp_stack_greater_or_equal("2.3"): Logger.info("Can only link configs for HDP-2.3 and higher.") @@ -96,5 +98,7 @@ def link_configs(struct_out_file): Logger.info("Could not load 'version' from {0}".format(struct_out_file)) return - for k, v in conf_select.PACKAGE_DIRS.iteritems(): - conf_select.convert_conf_directories_to_symlinks(k, json_version, v) + # On parallel command execution this should be executed by a single process at a time. + with FileBasedProcessLock(params.link_configs_lock_file): + for k, v in conf_select.PACKAGE_DIRS.iteritems(): + conf_select.convert_conf_directories_to_symlinks(k, json_version, v)
