Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 33c347b07 -> 9332b381b
  refs/heads/branch-2.5 be3a65d37 -> d3a3dd5c2
  refs/heads/trunk ee2a12527 -> e68cc10dd


AMBARI-18629. HDFS goes down after installing cluster (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e68cc10d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e68cc10d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e68cc10d

Branch: refs/heads/trunk
Commit: e68cc10dd9e0c0c012aa684a69f743fee41310d5
Parents: ee2a125
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Tue Oct 18 18:35:15 2016 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Tue Oct 18 18:35:15 2016 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 26 +++++-------
 .../main/python/ambari_commons/thread_utils.py  | 43 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e68cc10d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c03ee4f..5962d94 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -36,6 +36,7 @@ from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.BackgroundCommandExecutionHandle import 
BackgroundCommandExecutionHandle
 from ambari_commons.str_utils import split_on_chunks
+from ambari_commons.thread_utils import terminate_thread
 
 
 logger = logging.getLogger()
@@ -83,7 +84,6 @@ class ActionQueue(threading.Thread):
     self.controller = controller
     self.configTags = {}
     self._stop = threading.Event()
-    self.hangingStatusCommands = {}
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config, 
controller)
     self.parallel_execution = config.get_parallel_exec_option()
@@ -230,22 +230,16 @@ class ActionQueue(threading.Thread):
       elif commandType == self.STATUS_COMMAND:
         component_name = command['componentName']
 
-        if component_name in self.hangingStatusCommands and not 
self.hangingStatusCommands[component_name].isAlive():
-          del self.hangingStatusCommands[component_name]
+        thread = threading.Thread(target = self.execute_status_command, args = 
(command,))
+        thread.daemon = True # hanging status commands should not be prevent 
ambari-agent from stopping
+        thread.start()
+        thread.join(timeout=self.status_command_timeout)
 
-        if not component_name in self.hangingStatusCommands:
-          thread = threading.Thread(target = self.execute_status_command, args 
= (command,))
-          thread.daemon = True # hanging status commands should not be prevent 
ambari-agent from stopping
-          thread.start()
-          thread.join(timeout=self.status_command_timeout)
-
-          if thread.isAlive():
-            # Force context to reset to normal. By context we mean sys.path, 
imports, logger setting, etc. They are set by specific status command, and are 
not relevant to ambari-agent.
-            PythonReflectiveExecutor.last_context.revert()
-            logger.warn("Command {0} for {1} is running for more than {2} 
seconds. Skipping it for current pack of status commands.".format(commandType, 
component_name, self.status_command_timeout))
-            self.hangingStatusCommands[component_name] = thread
-        else:
-          logger.info("Not running {0} for {1}, because previous one is still 
running.".format(commandType, component_name))
+        if thread.isAlive():
+          terminate_thread(thread)
+          # Force context to reset to normal. By context we mean sys.path, 
imports, logger setting, etc. They are set by specific status command, and are 
not relevant to ambari-agent.
+          PythonReflectiveExecutor.last_context.revert()
+          logger.warn("Command {0} for {1} was running for more than {2} 
seconds. Terminated due to timeout.".format(commandType, component_name, 
self.status_command_timeout))
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:

http://git-wip-us.apache.org/repos/asf/ambari/blob/e68cc10d/ambari-common/src/main/python/ambari_commons/thread_utils.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/main/python/ambari_commons/thread_utils.py 
b/ambari-common/src/main/python/ambari_commons/thread_utils.py
new file mode 100644
index 0000000..952022c
--- /dev/null
+++ b/ambari-common/src/main/python/ambari_commons/thread_utils.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+def terminate_thread(thread):
+  """Terminates a python thread abruptly from another thread.
+  
+  This is consider a bad pattern to do this. 
+  If possible, please consider handling stopping of the thread from inside of 
it
+  or creating thread as a separate process (multiprocessing module).
+
+  :param thread: a threading.Thread instance
+  """
+  import ctypes
+  if not thread.isAlive():
+      return
+
+  exc = ctypes.py_object(SystemExit)
+  res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
+      ctypes.c_long(thread.ident), exc)
+  if res == 0:
+      raise ValueError("nonexistent thread id")
+  elif res > 1:
+      # """if it returns a number greater than one, you're in trouble,
+      # and you should call it again with exc=NULL to revert the effect"""
+      ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
+      raise SystemError("PyThreadState_SetAsyncExc failed")
\ No newline at end of file

Reply via email to