Repository: ambari Updated Branches: refs/heads/trunk fbd6d520d -> ad8767063
AMBARI-20496. Proxy settings not honored by Ambari agents causing lot of traffic (dgrinenko via dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad876706 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad876706 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad876706 Branch: refs/heads/trunk Commit: ad8767063ab188078d2c57dda3d6596624e02df2 Parents: fbd6d52 Author: Lisnichenko Dmitro <[email protected]> Authored: Thu Mar 23 17:57:29 2017 +0200 Committer: Lisnichenko Dmitro <[email protected]> Committed: Thu Mar 23 17:58:16 2017 +0200 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 4 ++++ .../ambari_agent/AlertSchedulerHandler.py | 21 ++++++++++++++++++-- .../main/python/ambari_agent/AmbariConfig.py | 12 +++++++++++ .../src/main/python/ambari_agent/Controller.py | 4 ++++ .../ambari_agent/CustomServiceOrchestrator.py | 7 ++++++- .../ambari_agent/apscheduler/threadpool.py | 11 +++++++++- .../src/main/python/ambari_agent/main.py | 6 ++++++ .../ambari_agent/TestAlertSchedulerHandler.py | 18 ++++++++++++++++- .../src/main/python/ambari_commons/network.py | 14 +++++++++++++ .../libraries/script/script.py | 7 +++++++ 10 files changed, 99 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 56fa605..441a01d 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -51,6 +51,10 @@ credential_lib_dir=/var/lib/ambari-agent/cred/lib credential_conf_dir=/var/lib/ambari-agent/cred/conf credential_shell_cmd=org.apache.hadoop.security.alias.CredentialShell +[network] +; this option apply only for Agent communication +use_system_proxy_settings=true + [services] pidLookupPath=/var/run/ http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index 026a5ed..6c1d29c 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -72,11 +72,15 @@ class AlertSchedulerHandler(): except: logger.critical("[AlertScheduler] Could not create the cache directory {0}".format(cachedir)) + apscheduler_standalone = False + self.APS_CONFIG = { 'apscheduler.threadpool.core_threads': 3, 'apscheduler.coalesce': True, - 'apscheduler.standalone': False, - 'apscheduler.misfire_grace_time': alert_grace_period + 'apscheduler.standalone': apscheduler_standalone, + 'apscheduler.misfire_grace_time': alert_grace_period, + 'apscheduler.threadpool.context_injector': self._job_context_injector if not apscheduler_standalone else None, + 'apscheduler.threadpool.agent_config': config } self._collector = AlertCollector() @@ -88,6 +92,19 @@ class AlertSchedulerHandler(): # register python exit handler ExitHelper().register(self.exit_handler) + def _job_context_injector(self, config): + """ + apscheduler hack to inject monkey-patching, context and configuration to all jobs inside scheduler in case if scheduler running + in embedded mode + + Please note, this function called in job context thus all injects should be time-running optimized + + :type config AmbariConfig.AmbariConfig + """ + if not config.use_system_proxy_setting(): + from ambari_commons.network import reconfigure_urllib2_opener + reconfigure_urllib2_opener(ignore_system_proxy=True) + def exit_handler(self): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 1965dc2..cf48189 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -52,6 +52,10 @@ system_resource_overrides={ps}etc{ps}resource_overrides [python] custom_actions_dir = {ps}var{ps}lib{ps}ambari-agent{ps}resources{ps}custom_actions + +[network] +use_system_proxy_settings=true + [security] keysdir={ps}tmp{ps}ambari-agent server_crt=ca.crt @@ -304,6 +308,14 @@ class AmbariConfig: self.set('agent', self.ULIMIT_OPEN_FILES_KEY, value) + def use_system_proxy_setting(self): + """ + Return `True` if Agent need to honor system proxy setting and `False` if not + + :rtype bool + """ + return "true" == self.get("network", "use_system_proxy_settings", "true").lower() + def get_multiprocess_status_commands_executor_enabled(self): return bool(int(self.get('agent', 'multiprocess_status_commands_executor_enabled', 1))) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 29a11aa..78b5c0c 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -49,6 +49,7 @@ from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers from ambari_agent.ExitHelper import ExitHelper from ambari_agent.StatusCommandsExecutor import MultiProcessStatusCommandsExecutor, SingleProcessStatusCommandsExecutor +from ambari_commons.network import reconfigure_urllib2_opener from resource_management.libraries.functions.version import compare_versions from ambari_commons.os_utils import get_used_ram @@ -126,6 +127,9 @@ class Controller(threading.Thread): self.move_data_dir_mount_file() + if not config.use_system_proxy_setting(): + reconfigure_urllib2_opener(ignore_system_proxy=True) + self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, stacks_cache_dir, common_services_cache_dir, extensions_cache_dir, host_scripts_cache_dir, self.cluster_configuration, config, http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 7539710..a67e16e 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -533,7 +533,12 @@ class CustomServiceOrchestrator(): command['public_hostname'] = public_fqdn # Add cache dir to make it visible for commands command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 'cache_dir') - command["agentConfigParams"] = {"agent": {"parallel_execution": self.config.get_parallel_exec_option()}} + command["agentConfigParams"] = { + "agent": { + "parallel_execution": self.config.get_parallel_exec_option(), + "use_system_proxy_settings": self.config.use_system_proxy_setting() + } + } # Now, dump the json file command_type = command['commandType'] from ActionQueue import ActionQueue # To avoid cyclic dependency http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py index cb19888..d6624cc 100644 --- a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py @@ -31,13 +31,16 @@ ExitHelper().register(_shutdown_all) class ThreadPool(object): - def __init__(self, core_threads=0, max_threads=20, keepalive=1): + def __init__(self, core_threads=0, max_threads=20, keepalive=1, context_injector=None, agent_config=None): """ :param core_threads: maximum number of persistent threads in the pool :param max_threads: maximum number of total threads in the pool :param thread_class: callable that creates a Thread object :param keepalive: seconds to keep non-core worker threads waiting for new tasks + + :type context_injector func + :type agent_config AmbariConfig.AmbariConfig """ self.core_threads = core_threads self.max_threads = max(max_threads, core_threads, 1) @@ -47,6 +50,9 @@ class ThreadPool(object): self._threads = set() self._shutdown = False + self._job_context_injector = context_injector + self._agent_config = agent_config + _threadpools.add(ref(self)) logger.info('Started thread pool with %d core threads and %s maximum ' 'threads', core_threads, max_threads or 'unlimited') @@ -73,6 +79,9 @@ class ThreadPool(object): block = self.keepalive > 0 timeout = self.keepalive + if self._job_context_injector is not None: + self._job_context_injector(self._agent_config) + while True: try: func, args, kwargs = self._queue.get(block, timeout) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 236de25..923c570 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -102,6 +102,7 @@ from ambari_agent.ExitHelper import ExitHelper import socket from ambari_commons import OSConst, OSCheck from ambari_commons.shell import shellRunner +from ambari_commons.network import reconfigure_urllib2_opener from ambari_commons import shell import HeartbeatHandlers from HeartbeatHandlers import bind_signal_handlers @@ -118,6 +119,7 @@ agentPid = os.getpid() home_dir = "" config = AmbariConfig.AmbariConfig() + # TODO AMBARI-18733, remove this global variable and calculate it based on home_dir once it is set. configFile = config.getConfigFile() two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY @@ -428,6 +430,10 @@ def main(heartbeat_stop_callback=None): update_open_files_ulimit(config) + if not config.use_system_proxy_setting(): + logger.info('Agent is configured to ignore system proxy settings') + reconfigure_urllib2_opener(ignore_system_proxy=True) + if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY: daemonize() http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py index e396a3e..d1d27ef 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py @@ -29,7 +29,7 @@ from ambari_agent.alerts.web_alert import WebAlert from AmbariConfig import AmbariConfig -from mock.mock import Mock, MagicMock +from mock.mock import Mock, MagicMock, patch from unittest import TestCase TEST_PATH = os.path.join('ambari_agent', 'dummy_files') @@ -46,6 +46,22 @@ class TestAlertSchedulerHandler(TestCase): self.assertEquals(len(definitions), 1) + @patch("ambari_commons.network.reconfigure_urllib2_opener") + def test_job_context_injector(self, reconfigure_urllib2_opener_mock): + self.config.use_system_proxy_setting = lambda: False + scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) + scheduler._job_context_injector(self.config) + + self.assertTrue(reconfigure_urllib2_opener_mock.called) + + reconfigure_urllib2_opener_mock.reset_mock() + + self.config.use_system_proxy_setting = lambda: True + scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) + scheduler._job_context_injector(self.config) + self.assertFalse(reconfigure_urllib2_opener_mock.called) + + def test_json_to_callable_metric(self): scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, None, self.config, None) json_definition = { http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-common/src/main/python/ambari_commons/network.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/network.py b/ambari-common/src/main/python/ambari_commons/network.py index 9bc16ed..6ab92b2 100644 --- a/ambari-common/src/main/python/ambari_commons/network.py +++ b/ambari-common/src/main/python/ambari_commons/network.py @@ -21,6 +21,7 @@ limitations under the License. import httplib import ssl import socket +import urllib2 from ambari_commons.logging_utils import print_warning_msg from resource_management.core.exceptions import Fail @@ -64,3 +65,16 @@ def check_ssl_certificate_and_return_ssl_version(host, port, ca_certs): raise Fail("Failed to verify the SSL certificate for https://{0}:{1} with CA certificate in {2}. Error : {3}" .format(host, port, ca_certs, str(ssl_error))) return ssl_version + + +def reconfigure_urllib2_opener(ignore_system_proxy=False): + """ + Reconfigure urllib opener + + :type ignore_system_proxy bool + """ + + if ignore_system_proxy: + proxy_handler = urllib2.ProxyHandler({}) + opener = urllib2.build_opener(proxy_handler) + urllib2.install_opener(opener) http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-common/src/main/python/resource_management/libraries/script/script.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/resource_management/libraries/script/script.py b/ambari-common/src/main/python/resource_management/libraries/script/script.py index 6eec3cc..fad14fd 100644 --- a/ambari-common/src/main/python/resource_management/libraries/script/script.py +++ b/ambari-common/src/main/python/resource_management/libraries/script/script.py @@ -34,6 +34,7 @@ from ambari_commons import OSCheck, OSConst from ambari_commons.constants import UPGRADE_TYPE_NON_ROLLING from ambari_commons.constants import UPGRADE_TYPE_ROLLING from ambari_commons.constants import UPGRADE_TYPE_HOST_ORDERED +from ambari_commons.network import reconfigure_urllib2_opener from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl from resource_management.libraries.resources import XmlConfig from resource_management.libraries.resources import PropertiesFile @@ -907,6 +908,12 @@ class Script(object): @staticmethod def get_instance(): if Script.instance is None: + + from resource_management.libraries.functions.default import default + use_proxy = default("/agentConfigParams/agent/use_system_proxy_settings", True) + if not use_proxy: + reconfigure_urllib2_opener(ignore_system_proxy=True) + Script.instance = Script() return Script.instance
