AMBARI-20620. Add hashing support into agent for caches ; add topology and 
metadata cache (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/df752f86
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/df752f86
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/df752f86

Branch: refs/heads/branch-3.0-perf
Commit: df752f86e5e887e67016e81125702192a97f46a7
Parents: 3bcf2ad
Author: Andrew Onishuk <[email protected]>
Authored: Wed Mar 29 14:56:00 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Wed Mar 29 14:56:00 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/ClusterCache.py    | 57 ++++++++++++--------
 .../ambari_agent/ClusterConfigurationCache.py   |  4 +-
 .../python/ambari_agent/ClusterMetadataCache.py | 42 +++++++++++++++
 .../python/ambari_agent/ClusterTopologyCache.py | 42 +++++++++++++++
 .../src/main/python/ambari_agent/Controller.py  | 12 +++++
 .../src/main/python/ambari_agent/Utils.py       | 52 ++++++++++++++++++
 6 files changed, 186 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index d7ea08c..1986733 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -18,14 +18,16 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 
+import hashlib
 import logging
 import ambari_simplejson as json
 import os
 import threading
+from ambari_agent.Utils import Utils
 
 logger = logging.getLogger(__name__)
 
-class ClusterCache(object):
+class ClusterCache(dict):
   """
   Maintains an in-memory cache and disk cache (for debugging purposes) for
   every cluster. This is useful for having quick access to any of the 
properties.
@@ -37,10 +39,8 @@ class ClusterCache(object):
     :param cluster_cache_dir:
     :return:
     """
-    self.cluster_cache_dir = cluster_cache_dir
 
-    # keys are cluster names, values are caches for the clusters
-    self._cache_dict = {}
+    self.cluster_cache_dir = cluster_cache_dir
 
     self.__file_lock = threading.RLock()
     self._cache_lock = threading.RLock()
@@ -51,9 +51,12 @@ class ClusterCache(object):
       os.makedirs(cluster_cache_dir)
 
     # if the file exists, then load it
+    cache_dict = {}
     if os.path.isfile(self.__current_cache_json_file):
       with open(self.__current_cache_json_file, 'r') as fp:
-        self._cache_dict = json.load(fp)
+        cache_dict = json.load(fp)
+
+    super(ClusterCache, self).__init__(cache_dict)
 
   def update_cache(self, cluster_name, cache):
     """
@@ -65,22 +68,34 @@ class ClusterCache(object):
     """
     logger.info("Updating cache {0} for cluster 
{1}".format(self.__class__.__name__, cluster_name))
 
-    self._cache_lock.acquire()
-    try:
-      self._cache_dict[cluster_name] = cache
-    finally:
-      self._cache_lock.release()
+    # The cache should contain exactly the data received from server.
+    # Modifications on agent-side will lead to unnecessary cache sync every 
agent registration. Which is a big concern on perf clusters!
+    # Also immutability can lead to multithreading issues.
+    immutable_cache = Utils.make_immutable(cache)
+    with self._cache_lock:
+      self[cluster_name] = immutable_cache
 
 
-    self.__file_lock.acquire()
-    try:
+    with self.__file_lock:
       with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | 
os.O_CREAT, 0o600), "w") as f:
-        json.dump(self._cache_dict, f, indent=2)
-    finally:
-      self.__file_lock.release()
-
-  def get_cache(self):
-    self._cache_lock.acquire()
-    cache_copy = self._cache_dict[:]
-    self._cache_lock.release()
-    return cache_copy
+        json.dump(self, f, indent=2)
+
+  def get_md5_hashsum(self, cluster_name):
+    """
+    Thread-safe method for writing out the specified cluster cache
+    and updating the in-memory representation.
+    :param cluster_name:
+    :param cache:
+    :return:
+    """
+    with self._cache_lock:
+      # have to make sure server generates json in exactly the same way. So 
hashes are equal
+      json_repr = json.dumps(self, sort_keys=True)
+
+    md5_calculator = hashlib.md5()
+    md5_calculator.update(json_repr)
+    result = md5_calculator.hexdigest()
+
+    logger.info("Cache value for {0} is {1}".format(self.__class__.__name__, 
result))
+
+    return result
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
index 2c35018..61fdf94 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py
@@ -33,7 +33,7 @@ class ClusterConfigurationCache(ClusterCache):
   def __init__(self, cluster_cache_dir):
     """
     Initializes the configuration cache.
-    :param cluster_cache_dir:
+    :param cluster_cache_dir: directory the changed json are saved
     :return:
     """
     super(ClusterConfigurationCache, self).__init__(cluster_cache_dir)
@@ -51,7 +51,7 @@ class ClusterConfigurationCache(ClusterCache):
     """
     self._cache_lock.acquire()
     try:
-      dictionary = self._cache_dict[cluster_name]
+      dictionary = self[cluster_name]
       for layer_key in key.split('/'):
         dictionary = dictionary[layer_key]
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
new file mode 100644
index 0000000..4859b3f
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from ambari_agent.ClusterCache import ClusterCache
+import logging
+
+logger = logging.getLogger(__name__)
+
+class ClusterMetadataCache(ClusterCache):
+  """
+  Maintains an in-memory cache and disk cache of the metadata send from server 
for
+  every cluster. This is useful for having quick access to any of the
+  topology properties.
+  """
+
+  def __init__(self, cluster_cache_dir):
+    """
+    Initializes the topology cache.
+    :param cluster_cache_dir:
+    :return:
+    """
+    super(ClusterMetadataCache, self).__init__(cluster_cache_dir)
+
+  def get_file_name(self):
+    return 'metadata.json'

http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
new file mode 100644
index 0000000..19313c5
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from ambari_agent.ClusterCache import ClusterCache
+import logging
+
+logger = logging.getLogger(__name__)
+
+class ClusterTopologyCache(ClusterCache):
+  """
+  Maintains an in-memory cache and disk cache of the topology for
+  every cluster. This is useful for having quick access to any of the
+  topology properties.
+  """
+
+  def __init__(self, cluster_cache_dir):
+    """
+    Initializes the topology cache.
+    :param cluster_cache_dir:
+    :return:
+    """
+    super(ClusterTopologyCache, self).__init__(cluster_cache_dir)
+
+  def get_file_name(self):
+    return 'topology.json'

http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 49bb3d5..4d81947 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -45,6 +45,7 @@ from ambari_agent.NetUtil import NetUtil
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.ClusterConfigurationCache import  ClusterConfigurationCache
+from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
 from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, 
bind_signal_handlers
 from ambari_agent.ExitHelper import ExitHelper
@@ -127,6 +128,7 @@ class Controller(threading.Thread):
     self.recovery_manager = RecoveryManager(recovery_cache_dir)
 
     self.cluster_configuration_cache = 
ClusterConfigurationCache(cluster_cache_dir)
+    self.cluster_topology_cache = ClusterTopologyCache(cluster_cache_dir)
 
     self.move_data_dir_mount_file()
 
@@ -242,7 +244,14 @@ class Controller(threading.Thread):
         if 'clusterName' in command and 'configurations' in command:
           cluster_name = command['clusterName']
           configurations = command['configurations']
+          topology = command['clusterHostInfo']
           self.cluster_configuration_cache.update_cache(cluster_name, 
configurations)
+          self.cluster_topology_cache.update_cache(cluster_name, topology)
+
+          # TODO: use this once server part is ready.
+          self.cluster_topology_cache.get_md5_hashsum(cluster_name)
+          self.cluster_configuration_cache.get_md5_hashsum(cluster_name)
+
 
     if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys:
       alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS]
@@ -252,6 +261,9 @@ class Controller(threading.Thread):
           configurations = command['configurations']
           self.cluster_configuration_cache.update_cache(cluster_name, 
configurations)
 
+          # TODO: use this once server part is ready.
+          self.cluster_configuration_cache.get_md5_hashsum(cluster_name)
+
   def cancelCommandInQueue(self, commands):
     """ Remove from the queue commands, kill the process if it's in progress 
"""
     if commands:

http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py 
b/ambari-agent/src/main/python/ambari_agent/Utils.py
new file mode 100644
index 0000000..e7b03f9
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+class Utils(object):
+  @staticmethod
+  def make_immutable(value):
+    if isinstance(value, dict):
+      return ImmutableDictionary(value)
+    if isinstance(value, list):
+      return tuple(value)
+    return value
+
+class ImmutableDictionary(dict):
+  def __init__(self, dictionary):
+    """
+    Recursively turn dict to ImmutableDictionary
+    """
+    for k, v in dictionary.iteritems():
+        dictionary[k] = Utils.make_immutable(v)
+
+    super(ImmutableDictionary, self).__init__(dictionary)
+
+def raise_immutable_error(*args, **kwargs):
+  """
+  PLEASE MAKE SURE YOU NEVER UPDATE CACHE on agent side. The cache should 
contain exactly the data received from server.
+  Modifications on agent-side will lead to unnecessary cache sync every agent 
registration. Which is a big concern on perf clusters!
+  Also immutability can lead to multithreading issues.
+  """
+  raise TypeError("The dictionary is immutable cannot change it")
+
+ImmutableDictionary.__setitem__ = raise_immutable_error
+ImmutableDictionary.__delitem__ = raise_immutable_error
+ImmutableDictionary.clear = raise_immutable_error
+ImmutableDictionary.pop = raise_immutable_error
+ImmutableDictionary.update = raise_immutable_error
\ No newline at end of file

Reply via email to