Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 8564be100 -> ac32c2885


Revert "AMBARI-18629. HDFS goes down after installing cluster (aonishuk) and 
AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat 
interval (aonishuk)"


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ac32c288
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ac32c288
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ac32c288

Branch: refs/heads/branch-2.5
Commit: ac32c2885865bdb13fcd28b1e6e2283df61a6ec7
Parents: 8564be1
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Wed Oct 19 01:49:30 2016 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Wed Oct 19 01:49:30 2016 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 -
 .../src/main/python/ambari_agent/ActionQueue.py | 16 +-------
 .../ambari_agent/PythonReflectiveExecutor.py    | 25 +++---------
 .../test/python/ambari_agent/TestActionQueue.py |  3 +-
 .../main/python/ambari_commons/thread_utils.py  | 43 --------------------
 5 files changed, 8 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ac32c288/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini 
b/ambari-agent/conf/unix/ambari-agent.ini
index 1c39c24..914e09a 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -32,7 +32,6 @@ tolerate_download_failures=true
 run_as_user=root
 parallel_execution=0
 alert_grace_period=5
-status_command_timeout=2
 alert_kinit_timeout=14400000
 system_resource_overrides=/etc/resource_overrides
 ; memory_threshold_soft_mb=400

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac32c288/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5962d94..f104939 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -29,14 +29,12 @@ import time
 import signal
 
 from AgentException import AgentException
-from PythonReflectiveExecutor import PythonReflectiveExecutor
 from LiveStatus import LiveStatus
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.BackgroundCommandExecutionHandle import 
BackgroundCommandExecutionHandle
 from ambari_commons.str_utils import split_on_chunks
-from ambari_commons.thread_utils import terminate_thread
 
 
 logger = logging.getLogger()
@@ -87,7 +85,6 @@ class ActionQueue(threading.Thread):
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config, 
controller)
     self.parallel_execution = config.get_parallel_exec_option()
-    self.status_command_timeout = int(self.config.get('agent', 
'status_command_timeout', 2))
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands 
in parallel")
 
@@ -228,18 +225,7 @@ class ActionQueue(threading.Thread):
           if self.controller.recovery_manager.enabled():
             self.controller.recovery_manager.stop_execution_command()
       elif commandType == self.STATUS_COMMAND:
-        component_name = command['componentName']
-
-        thread = threading.Thread(target = self.execute_status_command, args = 
(command,))
-        thread.daemon = True # hanging status commands should not be prevent 
ambari-agent from stopping
-        thread.start()
-        thread.join(timeout=self.status_command_timeout)
-
-        if thread.isAlive():
-          terminate_thread(thread)
-          # Force context to reset to normal. By context we mean sys.path, 
imports, logger setting, etc. They are set by specific status command, and are 
not relevant to ambari-agent.
-          PythonReflectiveExecutor.last_context.revert()
-          logger.warn("Command {0} for {1} was running for more than {2} 
seconds. Terminated due to timeout.".format(commandType, component_name, 
self.status_command_timeout))
+        self.execute_status_command(command)
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac32c288/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index b476671..655b2fc 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,9 +53,7 @@ class PythonReflectiveExecutor(PythonExecutor):
     returncode = 1
 
     try:
-      current_context = PythonContext(script_dir, pythonCommand)
-      PythonReflectiveExecutor.last_context = current_context
-      with current_context:
+      with PythonContext(script_dir, pythonCommand):
         imp.load_source('__main__', script)
     except SystemExit as e:
       returncode = e.code
@@ -64,10 +62,7 @@ class PythonReflectiveExecutor(PythonExecutor):
     except (ClientComponentHasNoStatus, ComponentIsNotRunning):
       logger.debug("Reflective command failed with exception:", exc_info=1)
     except Exception:
-      if current_context.is_forced_revert:
-        logger.info("Hanging status command finished its execution")
-      else:
-        logger.info("Reflective command failed with exception:", exc_info=1)
+      logger.info("Reflective command failed with exception:", exc_info=1)
     else: 
       returncode = 0
       
@@ -81,8 +76,6 @@ class PythonContext:
   def __init__(self, script_dir, pythonCommand):
     self.script_dir = script_dir
     self.pythonCommand = pythonCommand
-    self.is_reverted = False
-    self.is_forced_revert = False
     
   def __enter__(self):
     self.old_sys_path = copy.copy(sys.path)
@@ -95,18 +88,12 @@ class PythonContext:
     sys.argv = self.pythonCommand[1:]
 
   def __exit__(self, exc_type, exc_val, exc_tb):
-    self.revert(is_forced_revert=False)
+    sys.path = self.old_sys_path
+    sys.argv = self.old_agv
+    logging.disable(self.old_logging_disable)
+    self.revert_sys_modules(self.old_sys_modules)
     return False
   
-  def revert(self, is_forced_revert=True):
-    if not self.is_reverted:
-      self.is_forced_revert = is_forced_revert
-      self.is_reverted = True
-      sys.path = self.old_sys_path
-      sys.argv = self.old_agv
-      logging.disable(self.old_logging_disable)
-      self.revert_sys_modules(self.old_sys_modules)
-
   def revert_sys_modules(self, value):
     sys.modules.update(value)
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac32c288/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py 
b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 32773b8..7d04d42 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -225,7 +225,6 @@ class TestActionQueue(TestCase):
   retryable_command = {
     'commandType': 'EXECUTION_COMMAND',
     'role': 'NAMENODE',
-    'componentName': 'NAMENODE',
     'roleCommand': 'INSTALL',
     'commandId': '1-1',
     'taskId': 19,
@@ -323,7 +322,6 @@ class TestActionQueue(TestCase):
     }
     status_command = {
       'commandType' : ActionQueue.STATUS_COMMAND,
-      'componentName': 'NAMENODE'
     }
     wrong_command = {
       'commandType' : "SOME_WRONG_COMMAND",
@@ -1128,6 +1126,7 @@ class TestActionQueue(TestCase):
     self.assertTrue(runCommand_mock.called)
     self.assertEqual(2, runCommand_mock.call_count)
     self.assertEqual(1, sleep_mock.call_count)
+    sleep_mock.assert_has_calls([call(1)], False)
     runCommand_mock.assert_has_calls([
       call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 
'output-19.txt',
            os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 
'errors-19.txt', override_output_files=True, retry=False),

http://git-wip-us.apache.org/repos/asf/ambari/blob/ac32c288/ambari-common/src/main/python/ambari_commons/thread_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py 
b/ambari-common/src/main/python/ambari_commons/thread_utils.py
deleted file mode 100644
index 952022c..0000000
--- a/ambari-common/src/main/python/ambari_commons/thread_utils.py
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env python
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-
-def terminate_thread(thread):
-  """Terminates a python thread abruptly from another thread.
-  
-  This is consider a bad pattern to do this. 
-  If possible, please consider handling stopping of the thread from inside of 
it
-  or creating thread as a separate process (multiprocessing module).
-
-  :param thread: a threading.Thread instance
-  """
-  import ctypes
-  if not thread.isAlive():
-      return
-
-  exc = ctypes.py_object(SystemExit)
-  res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
-      ctypes.c_long(thread.ident), exc)
-  if res == 0:
-      raise ValueError("nonexistent thread id")
-  elif res > 1:
-      # """if it returns a number greater than one, you're in trouble,
-      # and you should call it again with exc=NULL to revert the effect"""
-      ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
-      raise SystemError("PyThreadState_SetAsyncExc failed")
\ No newline at end of file

Reply via email to