Repository: ambari
Updated Branches:
  refs/heads/trunk fbd6d520d -> ad8767063


AMBARI-20496. Proxy settings not honored by Ambari agents causing lot of 
traffic (dgrinenko via dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad876706
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad876706
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad876706

Branch: refs/heads/trunk
Commit: ad8767063ab188078d2c57dda3d6596624e02df2
Parents: fbd6d52
Author: Lisnichenko Dmitro <[email protected]>
Authored: Thu Mar 23 17:57:29 2017 +0200
Committer: Lisnichenko Dmitro <[email protected]>
Committed: Thu Mar 23 17:58:16 2017 +0200

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  4 ++++
 .../ambari_agent/AlertSchedulerHandler.py       | 21 ++++++++++++++++++--
 .../main/python/ambari_agent/AmbariConfig.py    | 12 +++++++++++
 .../src/main/python/ambari_agent/Controller.py  |  4 ++++
 .../ambari_agent/CustomServiceOrchestrator.py   |  7 ++++++-
 .../ambari_agent/apscheduler/threadpool.py      | 11 +++++++++-
 .../src/main/python/ambari_agent/main.py        |  6 ++++++
 .../ambari_agent/TestAlertSchedulerHandler.py   | 18 ++++++++++++++++-
 .../src/main/python/ambari_commons/network.py   | 14 +++++++++++++
 .../libraries/script/script.py                  |  7 +++++++
 10 files changed, 99 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini 
b/ambari-agent/conf/unix/ambari-agent.ini
index 56fa605..441a01d 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -51,6 +51,10 @@ credential_lib_dir=/var/lib/ambari-agent/cred/lib
 credential_conf_dir=/var/lib/ambari-agent/cred/conf
 credential_shell_cmd=org.apache.hadoop.security.alias.CredentialShell
 
+[network]
+; this option apply only for Agent communication
+use_system_proxy_settings=true
+
 [services]
 pidLookupPath=/var/run/
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py 
b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
index 026a5ed..6c1d29c 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py
@@ -72,11 +72,15 @@ class AlertSchedulerHandler():
       except:
         logger.critical("[AlertScheduler] Could not create the cache directory 
{0}".format(cachedir))
 
+    apscheduler_standalone = False
+
     self.APS_CONFIG = {
       'apscheduler.threadpool.core_threads': 3,
       'apscheduler.coalesce': True,
-      'apscheduler.standalone': False,
-      'apscheduler.misfire_grace_time': alert_grace_period
+      'apscheduler.standalone': apscheduler_standalone,
+      'apscheduler.misfire_grace_time': alert_grace_period,
+      'apscheduler.threadpool.context_injector': self._job_context_injector if 
not apscheduler_standalone else None,
+      'apscheduler.threadpool.agent_config': config
     }
 
     self._collector = AlertCollector()
@@ -88,6 +92,19 @@ class AlertSchedulerHandler():
     # register python exit handler
     ExitHelper().register(self.exit_handler)
 
+  def _job_context_injector(self, config):
+    """
+    apscheduler hack to inject monkey-patching, context and configuration to 
all jobs inside scheduler in case if scheduler running
+    in embedded mode
+
+    Please note, this function called in job context thus all injects should 
be time-running optimized
+
+    :type config AmbariConfig.AmbariConfig
+    """
+    if not config.use_system_proxy_setting():
+      from ambari_commons.network import reconfigure_urllib2_opener
+      reconfigure_urllib2_opener(ignore_system_proxy=True)
+
 
   def exit_handler(self):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py 
b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 1965dc2..cf48189 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -52,6 +52,10 @@ system_resource_overrides={ps}etc{ps}resource_overrides
 [python]
 custom_actions_dir = 
{ps}var{ps}lib{ps}ambari-agent{ps}resources{ps}custom_actions
 
+
+[network]
+use_system_proxy_settings=true
+
 [security]
 keysdir={ps}tmp{ps}ambari-agent
 server_crt=ca.crt
@@ -304,6 +308,14 @@ class AmbariConfig:
     self.set('agent', self.ULIMIT_OPEN_FILES_KEY, value)
 
 
+  def use_system_proxy_setting(self):
+    """
+    Return `True` if Agent need to honor system proxy setting and `False` if 
not
+
+    :rtype bool
+    """
+    return "true" == self.get("network", "use_system_proxy_settings", 
"true").lower()
+
   def get_multiprocess_status_commands_executor_enabled(self):
     return bool(int(self.get('agent', 
'multiprocess_status_commands_executor_enabled', 1)))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 29a11aa..78b5c0c 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -49,6 +49,7 @@ from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, 
bind_signal_handlers
 from ambari_agent.ExitHelper import ExitHelper
 from ambari_agent.StatusCommandsExecutor import 
MultiProcessStatusCommandsExecutor, SingleProcessStatusCommandsExecutor
+from ambari_commons.network import reconfigure_urllib2_opener
 from resource_management.libraries.functions.version import compare_versions
 from ambari_commons.os_utils import get_used_ram
 
@@ -126,6 +127,9 @@ class Controller(threading.Thread):
 
     self.move_data_dir_mount_file()
 
+    if not config.use_system_proxy_setting():
+      reconfigure_urllib2_opener(ignore_system_proxy=True)
+
     self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir,
       stacks_cache_dir, common_services_cache_dir, extensions_cache_dir,
       host_scripts_cache_dir, self.cluster_configuration, config,

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 7539710..a67e16e 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -533,7 +533,12 @@ class CustomServiceOrchestrator():
     command['public_hostname'] = public_fqdn
     # Add cache dir to make it visible for commands
     command["hostLevelParams"]["agentCacheDir"] = self.config.get('agent', 
'cache_dir')
-    command["agentConfigParams"] = {"agent": {"parallel_execution": 
self.config.get_parallel_exec_option()}}
+    command["agentConfigParams"] = {
+      "agent": {
+        "parallel_execution": self.config.get_parallel_exec_option(),
+        "use_system_proxy_settings": self.config.use_system_proxy_setting()
+      }
+    }
     # Now, dump the json file
     command_type = command['commandType']
     from ActionQueue import ActionQueue  # To avoid cyclic dependency

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py 
b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
index cb19888..d6624cc 100644
--- a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
+++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py
@@ -31,13 +31,16 @@ ExitHelper().register(_shutdown_all)
 
 
 class ThreadPool(object):
-    def __init__(self, core_threads=0, max_threads=20, keepalive=1):
+    def __init__(self, core_threads=0, max_threads=20, keepalive=1, 
context_injector=None, agent_config=None):
         """
         :param core_threads: maximum number of persistent threads in the pool
         :param max_threads: maximum number of total threads in the pool
         :param thread_class: callable that creates a Thread object
         :param keepalive: seconds to keep non-core worker threads waiting
             for new tasks
+
+        :type context_injector func
+        :type agent_config AmbariConfig.AmbariConfig
         """
         self.core_threads = core_threads
         self.max_threads = max(max_threads, core_threads, 1)
@@ -47,6 +50,9 @@ class ThreadPool(object):
         self._threads = set()
         self._shutdown = False
 
+        self._job_context_injector = context_injector
+        self._agent_config = agent_config
+
         _threadpools.add(ref(self))
         logger.info('Started thread pool with %d core threads and %s maximum '
                     'threads', core_threads, max_threads or 'unlimited')
@@ -73,6 +79,9 @@ class ThreadPool(object):
             block = self.keepalive > 0
             timeout = self.keepalive
 
+        if self._job_context_injector is not None:
+            self._job_context_injector(self._agent_config)
+
         while True:
             try:
                 func, args, kwargs = self._queue.get(block, timeout)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index 236de25..923c570 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -102,6 +102,7 @@ from ambari_agent.ExitHelper import ExitHelper
 import socket
 from ambari_commons import OSConst, OSCheck
 from ambari_commons.shell import shellRunner
+from ambari_commons.network import reconfigure_urllib2_opener
 from ambari_commons import shell
 import HeartbeatHandlers
 from HeartbeatHandlers import bind_signal_handlers
@@ -118,6 +119,7 @@ agentPid = os.getpid()
 home_dir = ""
 
 config = AmbariConfig.AmbariConfig()
+
 # TODO AMBARI-18733, remove this global variable and calculate it based on 
home_dir once it is set.
 configFile = config.getConfigFile()
 two_way_ssl_property = config.TWO_WAY_SSL_PROPERTY
@@ -428,6 +430,10 @@ def main(heartbeat_stop_callback=None):
 
   update_open_files_ulimit(config)
 
+  if not config.use_system_proxy_setting():
+    logger.info('Agent is configured to ignore system proxy settings')
+    reconfigure_urllib2_opener(ignore_system_proxy=True)
+
   if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
     daemonize()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py 
b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
index e396a3e..d1d27ef 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAlertSchedulerHandler.py
@@ -29,7 +29,7 @@ from ambari_agent.alerts.web_alert import WebAlert
 
 from AmbariConfig import AmbariConfig
 
-from mock.mock import Mock, MagicMock
+from mock.mock import Mock, MagicMock, patch
 from unittest import TestCase
 
 TEST_PATH = os.path.join('ambari_agent', 'dummy_files')
@@ -46,6 +46,22 @@ class TestAlertSchedulerHandler(TestCase):
 
     self.assertEquals(len(definitions), 1)
 
+  @patch("ambari_commons.network.reconfigure_urllib2_opener")
+  def test_job_context_injector(self, reconfigure_urllib2_opener_mock):
+    self.config.use_system_proxy_setting = lambda: False
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, 
TEST_PATH, TEST_PATH, None, self.config, None)
+    scheduler._job_context_injector(self.config)
+
+    self.assertTrue(reconfigure_urllib2_opener_mock.called)
+
+    reconfigure_urllib2_opener_mock.reset_mock()
+
+    self.config.use_system_proxy_setting = lambda: True
+    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, 
TEST_PATH, TEST_PATH, None, self.config, None)
+    scheduler._job_context_injector(self.config)
+    self.assertFalse(reconfigure_urllib2_opener_mock.called)
+
+
   def test_json_to_callable_metric(self):
     scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, 
TEST_PATH, TEST_PATH, None, self.config, None)
     json_definition = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-common/src/main/python/ambari_commons/network.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/network.py 
b/ambari-common/src/main/python/ambari_commons/network.py
index 9bc16ed..6ab92b2 100644
--- a/ambari-common/src/main/python/ambari_commons/network.py
+++ b/ambari-common/src/main/python/ambari_commons/network.py
@@ -21,6 +21,7 @@ limitations under the License.
 import httplib
 import ssl
 import socket
+import urllib2
 
 from ambari_commons.logging_utils import print_warning_msg
 from resource_management.core.exceptions import Fail
@@ -64,3 +65,16 @@ def check_ssl_certificate_and_return_ssl_version(host, port, 
ca_certs):
       raise Fail("Failed to verify the SSL certificate for https://{0}:{1} 
with CA certificate in {2}. Error : {3}"
                .format(host, port, ca_certs, str(ssl_error)))
   return ssl_version
+
+
+def reconfigure_urllib2_opener(ignore_system_proxy=False):
+  """
+  Reconfigure urllib opener
+
+  :type ignore_system_proxy bool
+  """
+
+  if ignore_system_proxy:
+    proxy_handler = urllib2.ProxyHandler({})
+    opener = urllib2.build_opener(proxy_handler)
+    urllib2.install_opener(opener)

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad876706/ambari-common/src/main/python/resource_management/libraries/script/script.py
----------------------------------------------------------------------
diff --git 
a/ambari-common/src/main/python/resource_management/libraries/script/script.py 
b/ambari-common/src/main/python/resource_management/libraries/script/script.py
index 6eec3cc..fad14fd 100644
--- 
a/ambari-common/src/main/python/resource_management/libraries/script/script.py
+++ 
b/ambari-common/src/main/python/resource_management/libraries/script/script.py
@@ -34,6 +34,7 @@ from ambari_commons import OSCheck, OSConst
 from ambari_commons.constants import UPGRADE_TYPE_NON_ROLLING
 from ambari_commons.constants import UPGRADE_TYPE_ROLLING
 from ambari_commons.constants import UPGRADE_TYPE_HOST_ORDERED
+from ambari_commons.network import reconfigure_urllib2_opener
 from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
 from resource_management.libraries.resources import XmlConfig
 from resource_management.libraries.resources import PropertiesFile
@@ -907,6 +908,12 @@ class Script(object):
   @staticmethod
   def get_instance():
     if Script.instance is None:
+
+      from resource_management.libraries.functions.default import default
+      use_proxy = 
default("/agentConfigParams/agent/use_system_proxy_settings", True)
+      if not use_proxy:
+        reconfigure_urllib2_opener(ignore_system_proxy=True)
+
       Script.instance = Script()
     return Script.instance
 

Reply via email to