Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 736e3ec2f -> ad9fce8a8


AMBARI-15558. ambari-agent upstart script broken in RHEL6 (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ad9fce8a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ad9fce8a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ad9fce8a

Branch: refs/heads/branch-2.2
Commit: ad9fce8a872f79912176c0221d156d456a968d88
Parents: 736e3ec
Author: Andrew Onishuk <[email protected]>
Authored: Thu Mar 24 17:15:19 2016 +0200
Committer: Andrew Onishuk <[email protected]>
Committed: Thu Mar 24 17:15:19 2016 +0200

----------------------------------------------------------------------
 ambari-agent/etc/init/ambari-agent.conf         |  1 -
 .../src/main/python/ambari_agent/ExitHelper.py  |  1 +
 .../python/ambari_agent/HeartbeatHandlers.py    | 27 ++++--------
 .../src/main/python/ambari_agent/main.py        | 43 ++++++++++++--------
 .../test/python/ambari_agent/TestController.py  |  5 ---
 .../src/test/python/ambari_agent/TestMain.py    | 32 ++++++++++-----
 6 files changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/etc/init/ambari-agent.conf
----------------------------------------------------------------------
diff --git a/ambari-agent/etc/init/ambari-agent.conf 
b/ambari-agent/etc/init/ambari-agent.conf
index 75c1b06..b3f2987 100644
--- a/ambari-agent/etc/init/ambari-agent.conf
+++ b/ambari-agent/etc/init/ambari-agent.conf
@@ -17,7 +17,6 @@ description     "ambari agent"
 
 stop on runlevel [06]
 
-kill signal SIGKILL
 respawn
 
 script

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py 
b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
index 06dfadb..e51646f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
+++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
@@ -64,6 +64,7 @@ class ExitHelper(object):
 
   def exit(self, code):
     self.execute_cleanup()
+    logger.info("Cleanup finished, exiting with code:" + str(code))
     os._exit(code)
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
index f9d0e14..4347595 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py
@@ -94,18 +94,9 @@ def debug(sig, frame):
 
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
-  def __init__(self, stopEvent=None):
-    # Event is used for synchronizing heartbeat iterations (to make possible
-    # manual wait() interruption between heartbeats )
+  def __init__(self):
     self.heartbeat_wait_event = threading.Event()
-
-    # Event is used to stop the Agent process
-    if stopEvent is None:
-      # Allow standalone testing
-      self.stop_event = threading.Event()
-    else:
-      # Allow one unique event per process
-      self.stop_event = stopEvent
+    self._stop = False
 
   def set_heartbeat(self):
     self.heartbeat_wait_event.set()
@@ -114,19 +105,15 @@ class HeartbeatStopHandlersLinux(HeartbeatStopHandlers):
     self.heartbeat_wait_event.clear()
 
   def set_stop(self):
-    self.stop_event.set()
+    self._stop = True
 
   def wait(self, timeout1, timeout2=0):
-    if self.heartbeat_wait_event.wait(timeout=timeout1):
-      # Event signaled, exit
-      return 1
-    # Stop loop when stop event received
-    # Otherwise sleep a bit more to allow STATUS_COMMAND results to be 
collected
-    # and sent in one heartbeat. Also avoid server overload with heartbeats
-    if self.stop_event.wait(timeout=timeout2):
+    if self._stop:
       logger.info("Stop event received")
       return 0
-    # Timeout
+
+    if self.heartbeat_wait_event.wait(timeout=timeout1):
+      return 1
     return -1
 
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index 22e2e4a..b146ba8 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -59,6 +59,9 @@ IS_LINUX = platform.system() == "Linux"
 SYSLOG_FORMAT_STRING = ' ambari_agent - %(filename)s - [%(process)d] - 
%(name)s - %(levelname)s - %(message)s'
 SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING)
 
+GRACEFUL_STOP_TRIES = 10
+GRACEFUL_STOP_TRIES_SLEEP = 3
+
 
 def setup_logging(logger, filename, logging_level):
   formatter = logging.Formatter(formatstr)
@@ -164,22 +167,26 @@ def daemonize():
   pid = str(os.getpid())
   file(ProcessHelper.pidfile, 'w').write(pid)
 
-
 def stop_agent():
 # stop existing Ambari agent
   pid = -1
   runner = shellRunner()
   try:
-    f = open(ProcessHelper.pidfile, 'r')
-    pid = f.read()
+    with open(ProcessHelper.pidfile, 'r') as f:
+      pid = f.read()
     pid = int(pid)
-    f.close()
+    
     runner.run([AMBARI_SUDO_BINARY, 'kill', '-15', str(pid)])
-    time.sleep(5)
-    if os.path.exists(ProcessHelper.pidfile):
-      raise Exception("PID file still exists.")
-    sys.exit(0)
+    for i in range(GRACEFUL_STOP_TRIES):
+      result = runner.run([AMBARI_SUDO_BINARY, 'kill', '-0', str(pid)])
+      if result['exitCode'] != 0:
+        logger.info("Agent died gracefully, exiting.")
+        sys.exit(0)
+      time.sleep(GRACEFUL_STOP_TRIES_SLEEP)
+    logger.info("Agent not going to die gracefully, going to execute kill -9")
+    raise Exception("Agent is running")
   except Exception, err:
+    #raise
     if pid == -1:
       print ("Agent process is not running")
     else:
@@ -236,7 +243,6 @@ def main(heartbeat_stop_callback=None):
 
   default_cfg = {'agent': {'prefix': '/home/ambari'}}
   config.load(default_cfg)
-  bind_signal_handlers(agentPid)
 
   if (len(sys.argv) > 1) and sys.argv[1] == 'stop':
     stop_agent()
@@ -292,11 +298,12 @@ def main(heartbeat_stop_callback=None):
     # Launch Controller communication
     controller = Controller(config, heartbeat_stop_callback)
     controller.start()
-    controller.join()
-  if not OSCheck.get_os_family() == OSConst.WINSRV_FAMILY:
-    ExitHelper.execute_cleanup()
-    stop_agent()
-  logger.info("finished")
+    
+    # controller.join() is not appropriate here as it blocks the signal 
handlers for stop etc.
+    while controller.is_alive():
+      time.sleep(0.1)
+
+  ExitHelper().exit(0)
 
 if __name__ == "__main__":
   is_logger_setup = False
@@ -304,7 +311,9 @@ if __name__ == "__main__":
     heartbeat_stop_callback = bind_signal_handlers(agentPid)
   
     main(heartbeat_stop_callback)
-  except:
+  except SystemExit as e:
+    raise e
+  except BaseException as e:
     if is_logger_setup:
-      logger.exception("Fatal exception occurred:")
-    raise
+      logger.exception("Exiting with exception:" + e)
+  raise

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py 
b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 00725c4..9b3ce47 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -545,8 +545,6 @@ class TestController(unittest.TestCase):
     response["restartAgent"] = "false"
     self.controller.heartbeatWithServer()
 
-    event_mock.assert_any_call(timeout=
-      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
 
     # Check that server continues to heartbeat after connection errors
     self.controller.responseId = 1
@@ -566,9 +564,6 @@ class TestController(unittest.TestCase):
     self.controller.heartbeatWithServer()
     self.assertTrue(sendRequest.call_count > 5)
 
-    event_mock.assert_called_with(timeout=
-      self.controller.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
-
     sys.stdout = sys.__stdout__
     self.controller.sendRequest = Controller.Controller.sendRequest
     self.controller.sendRequest = Controller.Controller.addToQueue

http://git-wip-us.apache.org/repos/asf/ambari/blob/ad9fce8a/ambari-agent/src/test/python/ambari_agent/TestMain.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py 
b/ambari-agent/src/test/python/ambari_agent/TestMain.py
index 873f0ba..63f8024 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestMain.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py
@@ -29,7 +29,7 @@ import ConfigParser
 
 from ambari_commons import OSCheck
 from only_for_platform import get_platform, not_for_platform, 
only_for_platform, PLATFORM_WINDOWS, PLATFORM_LINUX
-from mock.mock import MagicMock, patch, ANY, Mock
+from mock.mock import MagicMock, patch, ANY, Mock, call
 
 if get_platform() != PLATFORM_WINDOWS:
   os_distro_value = ('Suse','11','Final')
@@ -207,13 +207,13 @@ class TestMain(unittest.TestCase):
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = 
os_distro_value))
   @patch("time.sleep")
   @patch.object(shellRunnerLinux,"run")
-  @patch("sys.exit")
   @patch("os.path.exists")
-  def test_daemonize_and_stop(self, exists_mock, sys_exit_mock, kill_mock, 
sleep_mock):
+  def test_daemonize_and_stop(self, exists_mock, kill_mock, sleep_mock):
     oldpid = ProcessHelper.pidfile
     pid = str(os.getpid())
     _, tmpoutfile = tempfile.mkstemp()
     ProcessHelper.pidfile = tmpoutfile
+    kill_mock.return_value = {'exitCode': 1}
 
     # Test daemonization
     main.daemonize()
@@ -223,21 +223,30 @@ class TestMain(unittest.TestCase):
     # Reuse pid file when testing agent stop
     # Testing normal exit
     exists_mock.return_value = False
-    main.stop_agent()
-    kill_mock.assert_called_with(['ambari-sudo.sh', 'kill', '-15', pid])
-    sys_exit_mock.assert_called_with(0)
+    try:
+      main.stop_agent()
+      raise Exception("main.stop_agent() should raise sys.exit(0).")
+    except SystemExit as e:
+      self.assertEquals(0, e.code);
+      
+    kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid])
+    
 
     # Restore
     kill_mock.reset_mock()
-    sys_exit_mock.reset_mock()
     kill_mock.return_value = {'exitCode': 0, 'output': 'out', 'error': 'err'}
 
     # Testing exit when failed to remove pid file
     exists_mock.return_value = True
-    main.stop_agent()
+    
+    try:
+      main.stop_agent()
+      raise Exception("main.stop_agent() should raise sys.exit(0).")
+    except SystemExit as e:
+      self.assertEquals(1, e.code);
+
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-15', pid])
     kill_mock.assert_any_call(['ambari-sudo.sh', 'kill', '-9', pid])
-    sys_exit_mock.assert_called_with(1)
 
     # Restore
     ProcessHelper.pidfile = oldpid
@@ -299,8 +308,8 @@ class TestMain(unittest.TestCase):
   @patch.object(main, "update_log_level")
   @patch.object(NetUtil.NetUtil, "try_to_connect")
   @patch.object(Controller, "__init__")
+  @patch.object(Controller, "is_alive")
   @patch.object(Controller, "start")
-  @patch.object(Controller, "join")
   @patch("optparse.OptionParser.parse_args")
   @patch.object(DataCleaner,"start")
   @patch.object(DataCleaner,"__init__")
@@ -308,13 +317,14 @@ class TestMain(unittest.TestCase):
   @patch.object(PingPortListener,"__init__")
   @patch.object(ExitHelper,"execute_cleanup")
   def test_main(self, cleanup_mock, ping_port_init_mock, ping_port_start_mock, 
data_clean_init_mock,data_clean_start_mock,
-                parse_args_mock, join_mock, start_mock, Controller_init_mock, 
try_to_connect_mock,
+                parse_args_mock, start_mock, Controller_is_alive_mock, 
Controller_init_mock, try_to_connect_mock,
                 update_log_level_mock, daemonize_mock, 
perform_prestart_checks_mock,
                 ambari_config_mock,
                 stop_mock, bind_signal_handlers_mock,
                 setup_logging_mock, socket_mock):
     data_clean_init_mock.return_value = None
     Controller_init_mock.return_value = None
+    Controller_is_alive_mock.return_value = False
     ping_port_init_mock.return_value = None
     options = MagicMock()
     parse_args_mock.return_value = (options, MagicMock)

Reply via email to