This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0e77456  Branch 2.7 cherry picked commits (#2396)
0e77456 is described below

commit 0e7745639ed3f5fe56bd1e5578590b6e5780642c
Author: avijayanhwx <[email protected]>
AuthorDate: Thu Sep 27 22:12:07 2018 -0700

    Branch 2.7 cherry picked commits (#2396)
    
    * [AMBARI-24653] YARN Timeline server v2 related system tests fail. (#2367)
    
    * [AMBARI-24653] YARN Timeline server v2 related system tests fail.
    
    * [AMBARI-24653] YARN Timeline server v2 related system tests fail.
    
    * AMBARI-24661. While registering agent can miss updates from server 
(aonishuk)
    
    * [AMBARI-24679] Fix race condition in agent during registration and 
topology updates. (#2368)
---
 .../main/python/ambari_agent/HeartbeatThread.py    | 23 ++++++++-----
 .../main/python/ambari_agent/listeners/__init__.py | 40 +++++++++++++++++++++-
 .../src/main/resources/scripts/Ambaripreupload.py  |  1 +
 3 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 2d4e06b..ded5edd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -134,15 +134,22 @@ class HeartbeatThread(threading.Thread):
     self.handle_registration_response(response)
 
     for endpoint, cache, listener, subscribe_to in 
self.post_registration_requests:
-      # should not hang forever on these requests
-      response = self.blocking_request({'hash': cache.hash}, endpoint, 
log_handler=listener.get_log_message)
       try:
-        listener.on_event({}, response)
-      except:
-        logger.exception("Exception while handing response to request at {0}. 
{1}".format(endpoint, response))
-        raise
-
-      self.subscribe_to_topics([subscribe_to])
+        listener.enabled = False
+        self.subscribe_to_topics([subscribe_to])
+        response = self.blocking_request({'hash': cache.hash}, endpoint, 
log_handler=listener.get_log_message)
+
+        try:
+          listener.on_event({}, response)
+        except:
+          logger.exception("Exception while handing response to request at 
{0}. {1}".format(endpoint, response))
+          raise
+      finally:
+        with listener.event_queue_lock:
+          logger.info("Enabling events for listener {0}".format(listener))
+          listener.enabled = True
+          # Process queued messages if any
+          listener.dequeue_unprocessed_events()
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
 
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 7e66197..b50bdaa 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -25,15 +25,40 @@ import copy
 from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
 from ambari_agent import Constants
 from ambari_agent.Utils import Utils
+from Queue import Queue
+import threading
 
 logger = logging.getLogger(__name__)
 
 class EventListener(ambari_stomp.ConnectionListener):
+
+  unprocessed_messages_queue = Queue(100)
+
   """
   Base abstract class for event listeners on specific topics.
   """
   def __init__(self, initializer_module):
     self.initializer_module = initializer_module
+    self.enabled = True
+    self.event_queue_lock = threading.RLock()
+
+  def dequeue_unprocessed_events(self):
+    while not self.unprocessed_messages_queue.empty():
+      payload = self.unprocessed_messages_queue.get_nowait()
+      if payload:
+        logger.info("Processing event from unprocessed queue {0} 
{1}".format(payload[0], payload[1]))
+        destination = payload[0]
+        headers = payload[1]
+        message_json = payload[2]
+        message = payload[3]
+        try:
+          self.on_event(headers, message_json)
+        except Exception as ex:
+          logger.exception("Exception while handing event from {0} {1} 
{2}".format(destination, headers, message))
+          self.report_status_to_sender(headers, message, ex)
+        else:
+          self.report_status_to_sender(headers, message)
+
 
   def on_message(self, headers, message):
     """
@@ -55,10 +80,23 @@ class EventListener(ambari_stomp.ConnectionListener):
         return
 
       logger.info("Event from server at {0}{1}".format(destination, 
self.get_log_message(headers, copy.deepcopy(message_json))))
+
+      if not self.enabled:
+        with self.event_queue_lock:
+          if not self.enabled:
+            logger.info("Queuing event as unprocessed {0} since event "
+                        "listener is disabled".format(destination))
+            try:
+              self.unprocessed_messages_queue.put_nowait((destination, 
headers, message_json, message))
+            except Exception as ex:
+              logger.warning("Cannot queue any more unprocessed events since "
+                           "queue is full! {0} {1}".format(destination, 
message))
+            return
+
       try:
         self.on_event(headers, message_json)
       except Exception as ex:
-        logger.exception("Exception while handing event from {0} 
{1}".format(destination, headers, message))
+        logger.exception("Exception while handing event from {0} {1} 
{2}".format(destination, headers, message))
         self.report_status_to_sender(headers, message, ex)
       else:
         self.report_status_to_sender(headers, message)
diff --git a/ambari-server/src/main/resources/scripts/Ambaripreupload.py 
b/ambari-server/src/main/resources/scripts/Ambaripreupload.py
index dca9bb5..ab98c64 100644
--- a/ambari-server/src/main/resources/scripts/Ambaripreupload.py
+++ b/ambari-server/src/main/resources/scripts/Ambaripreupload.py
@@ -264,6 +264,7 @@ with Environment() as env:
     params.HdfsResource(format('{hdfs_path_prefix}/mapred'), owner='mapred', 
type='directory', action=['create_on_execute'])
     params.HdfsResource(format('{hdfs_path_prefix}/mapred/system'), 
owner='hdfs', type='directory', action=['create_on_execute'])
     params.HdfsResource(format('{hdfs_path_prefix}/mr-history/done'), 
change_permissions_for_parents=True, owner='mapred', group='hadoop', 
type='directory', action=['create_on_execute'], mode=0777)
+    params.HdfsResource(format('{hdfs_path_prefix}/user/yarn-ats'), 
owner='yarn-ats', type='directory', action=['create_on_execute'], mode=0755)
     params.HdfsResource(format('{hdfs_path_prefix}/atshistory/done'), 
owner='yarn', group='hadoop', type='directory', action=['create_on_execute'], 
mode=0700)
     params.HdfsResource(format('{hdfs_path_prefix}/atshistory/active'), 
owner='yarn', group='hadoop', type='directory', action=['create_on_execute'], 
mode=01777)
     params.HdfsResource(format('{hdfs_path_prefix}/ams/hbase'), owner='ams', 
type='directory', action=['create_on_execute'], mode=0775)

Reply via email to