This is an automated email from the ASF dual-hosted git repository.
avijayan pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0e77456 Branch 2.7 cherry picked commits (#2396)
0e77456 is described below
commit 0e7745639ed3f5fe56bd1e5578590b6e5780642c
Author: avijayanhwx <[email protected]>
AuthorDate: Thu Sep 27 22:12:07 2018 -0700
Branch 2.7 cherry picked commits (#2396)
* [AMBARI-24653] YARN Timeline server v2 related system tests fail. (#2367)
* [AMBARI-24653] YARN Timeline server v2 related system tests fail.
* [AMBARI-24653] YARN Timeline server v2 related system tests fail.
* AMBARI-24661. While registering agent can miss updates from server
(aonishuk)
* [AMBARI-24679] Fix race condition in agent during registration and
topology updates. (#2368)
---
.../main/python/ambari_agent/HeartbeatThread.py | 23 ++++++++-----
.../main/python/ambari_agent/listeners/__init__.py | 40 +++++++++++++++++++++-
.../src/main/resources/scripts/Ambaripreupload.py | 1 +
3 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 2d4e06b..ded5edd 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -134,15 +134,22 @@ class HeartbeatThread(threading.Thread):
self.handle_registration_response(response)
for endpoint, cache, listener, subscribe_to in
self.post_registration_requests:
- # should not hang forever on these requests
- response = self.blocking_request({'hash': cache.hash}, endpoint,
log_handler=listener.get_log_message)
try:
- listener.on_event({}, response)
- except:
- logger.exception("Exception while handing response to request at {0}.
{1}".format(endpoint, response))
- raise
-
- self.subscribe_to_topics([subscribe_to])
+ listener.enabled = False
+ self.subscribe_to_topics([subscribe_to])
+ response = self.blocking_request({'hash': cache.hash}, endpoint,
log_handler=listener.get_log_message)
+
+ try:
+ listener.on_event({}, response)
+ except:
+ logger.exception("Exception while handing response to request at
{0}. {1}".format(endpoint, response))
+ raise
+ finally:
+ with listener.event_queue_lock:
+ logger.info("Enabling events for listener {0}".format(listener))
+ listener.enabled = True
+ # Process queued messages if any
+ listener.dequeue_unprocessed_events()
self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 7e66197..b50bdaa 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -25,15 +25,40 @@ import copy
from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
from ambari_agent import Constants
from ambari_agent.Utils import Utils
+from Queue import Queue
+import threading
logger = logging.getLogger(__name__)
class EventListener(ambari_stomp.ConnectionListener):
+
+ unprocessed_messages_queue = Queue(100)
+
"""
Base abstract class for event listeners on specific topics.
"""
def __init__(self, initializer_module):
self.initializer_module = initializer_module
+ self.enabled = True
+ self.event_queue_lock = threading.RLock()
+
+ def dequeue_unprocessed_events(self):
+ while not self.unprocessed_messages_queue.empty():
+ payload = self.unprocessed_messages_queue.get_nowait()
+ if payload:
+ logger.info("Processing event from unprocessed queue {0}
{1}".format(payload[0], payload[1]))
+ destination = payload[0]
+ headers = payload[1]
+ message_json = payload[2]
+ message = payload[3]
+ try:
+ self.on_event(headers, message_json)
+ except Exception as ex:
+ logger.exception("Exception while handing event from {0} {1}
{2}".format(destination, headers, message))
+ self.report_status_to_sender(headers, message, ex)
+ else:
+ self.report_status_to_sender(headers, message)
+
def on_message(self, headers, message):
"""
@@ -55,10 +80,23 @@ class EventListener(ambari_stomp.ConnectionListener):
return
logger.info("Event from server at {0}{1}".format(destination,
self.get_log_message(headers, copy.deepcopy(message_json))))
+
+ if not self.enabled:
+ with self.event_queue_lock:
+ if not self.enabled:
+ logger.info("Queuing event as unprocessed {0} since event "
+ "listener is disabled".format(destination))
+ try:
+ self.unprocessed_messages_queue.put_nowait((destination,
headers, message_json, message))
+ except Exception as ex:
+ logger.warning("Cannot queue any more unprocessed events since "
+ "queue is full! {0} {1}".format(destination,
message))
+ return
+
try:
self.on_event(headers, message_json)
except Exception as ex:
- logger.exception("Exception while handing event from {0}
{1}".format(destination, headers, message))
+ logger.exception("Exception while handing event from {0} {1}
{2}".format(destination, headers, message))
self.report_status_to_sender(headers, message, ex)
else:
self.report_status_to_sender(headers, message)
diff --git a/ambari-server/src/main/resources/scripts/Ambaripreupload.py
b/ambari-server/src/main/resources/scripts/Ambaripreupload.py
index dca9bb5..ab98c64 100644
--- a/ambari-server/src/main/resources/scripts/Ambaripreupload.py
+++ b/ambari-server/src/main/resources/scripts/Ambaripreupload.py
@@ -264,6 +264,7 @@ with Environment() as env:
params.HdfsResource(format('{hdfs_path_prefix}/mapred'), owner='mapred',
type='directory', action=['create_on_execute'])
params.HdfsResource(format('{hdfs_path_prefix}/mapred/system'),
owner='hdfs', type='directory', action=['create_on_execute'])
params.HdfsResource(format('{hdfs_path_prefix}/mr-history/done'),
change_permissions_for_parents=True, owner='mapred', group='hadoop',
type='directory', action=['create_on_execute'], mode=0777)
+ params.HdfsResource(format('{hdfs_path_prefix}/user/yarn-ats'),
owner='yarn-ats', type='directory', action=['create_on_execute'], mode=0755)
params.HdfsResource(format('{hdfs_path_prefix}/atshistory/done'),
owner='yarn', group='hadoop', type='directory', action=['create_on_execute'],
mode=0700)
params.HdfsResource(format('{hdfs_path_prefix}/atshistory/active'),
owner='yarn', group='hadoop', type='directory', action=['create_on_execute'],
mode=01777)
params.HdfsResource(format('{hdfs_path_prefix}/ams/hbase'), owner='ams',
type='directory', action=['create_on_execute'], mode=0775)