AMBARI-12301. Ambari Metrics Monitor process CPU usage goes up if Collector is down for sometime. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/96d056e2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/96d056e2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/96d056e2 Branch: refs/heads/trunk Commit: 96d056e26de7133426cc1f80f7edf40f84e60e27 Parents: 7763a47 Author: Siddharth Wagle <[email protected]> Authored: Mon Jul 6 16:50:58 2015 -0700 Committer: Siddharth Wagle <[email protected]> Committed: Mon Jul 6 16:50:58 2015 -0700 ---------------------------------------------------------------------- .../main/python/core/application_metric_map.py | 10 ++++-- .../src/main/python/core/emitter.py | 37 +++++++++++--------- .../python/core/TestApplicationMetricMap.py | 11 ++++-- .../src/test/python/core/TestEmitter.py | 3 +- 4 files changed, 38 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/96d056e2/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py index 1be6fa2..0052808 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py @@ -66,7 +66,7 @@ class ApplicationMetricMap: del self.app_metric_map[ app_id ] pass - def flatten(self, application_id = None): + def flatten(self, application_id = None, clear_once_flattened = False): """ Return flatten dict to caller in json format. Json format: @@ -100,7 +100,13 @@ class ApplicationMetricMap: timeline_metrics[ "metrics" ].append( timeline_metric ) pass pass - return json.dumps(timeline_metrics) if len(timeline_metrics[ "metrics" ]) > 0 else None + json_data = json.dumps(timeline_metrics) if len(timeline_metrics[ "metrics" ]) > 0 else None + + if clear_once_flattened: + self.app_metric_map.clear() + pass + + return json_data pass def get_start_time(self, app_id, metric_id): http://git-wip-us.apache.org/repos/asf/ambari/blob/96d056e2/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py index c3fd543..6f66093 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py @@ -46,35 +46,38 @@ class Emitter(threading.Thread): while True: try: self.submit_metrics() - #Wait for the service stop event instead of sleeping blindly - if 0 == self._stop_handler.wait(self.send_interval): - logger.info('Shutting down Emitter thread') - return except Exception, e: logger.warn('Unable to emit events. %s' % str(e)) - #Wait for the service stop event instead of sleeping blindly - if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL): - logger.info('Shutting down Emitter thread - abort retry') - return - logger.info('Retrying emit after %s seconds.' % self.RETRY_SLEEP_INTERVAL) + pass + #Wait for the service stop event instead of sleeping blindly + if 0 == self._stop_handler.wait(self.send_interval): + logger.info('Shutting down Emitter thread') + return pass def submit_metrics(self): retry_count = 0 + # This call will acquire lock on the map and clear contents before returning + # After configured number of retries the data will not be sent to the + # collector + json_data = self.application_metric_map.flatten(None, True) + if json_data is None: + logger.info("Nothing to emit, resume waiting.") + return + pass + + response = None while retry_count < self.MAX_RETRY_COUNT: - json_data = self.application_metric_map.flatten() - if json_data is None: - logger.info("Nothing to emit, resume waiting.") - break + try: + response = self.push_metrics(json_data) + except Exception, e: + logger.warn('Error sending metrics to server. %s' % str(e)) pass - response = self.push_metrics(json_data) if response and response.getcode() == 200: retry_count = self.MAX_RETRY_COUNT - self.application_metric_map.clear() else: - logger.warn("Error sending metrics to server. Retrying after {0} " - "...".format(self.RETRY_SLEEP_INTERVAL)) + logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL)) retry_count += 1 #Wait for the service stop event instead of sleeping blindly if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL): http://git-wip-us.apache.org/repos/asf/ambari/blob/96d056e2/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py index e653c48..a956a78 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py @@ -62,6 +62,13 @@ class TestApplicationMetricMap(TestCase): def testEmptyMapReturnNone(self): - application_metric_map = ApplicationMetricMap("host","10.10.10.10") + application_metric_map = ApplicationMetricMap("host", "10.10.10.10") self.assertTrue(application_metric_map.flatten() == None) - + + def testFlattenAndClear(self): + application_metric_map = ApplicationMetricMap("host", "10.10.10.10") + application_metric_map.put_metric("A1", { "a" : "b" }, int(round(1415390657.3806491 * 1000))) + json_data = json.loads(application_metric_map.flatten('A1', True)) + self.assertEqual(len(json_data['metrics']), 1) + self.assertTrue(json_data['metrics'][0]['metricname'] == 'a') + self.assertFalse(application_metric_map.app_metric_map) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/96d056e2/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py index 56e6475..a9357fb 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py @@ -80,7 +80,6 @@ class TestEmitter(TestCase): self.assertEqual(url_open_mock.call_count, 3) self.assertUrlData(url_open_mock) - def assertUrlData(self, url_open_mock): self.assertEqual(len(url_open_mock.call_args), 2) data = url_open_mock.call_args[0][0].data @@ -90,4 +89,4 @@ class TestEmitter(TestCase): self.assertEqual(len(metrics['metrics']), 1) self.assertEqual(metrics['metrics'][0]['metricname'],'metric1') self.assertEqual(metrics['metrics'][0]['starttime'],1) - pass \ No newline at end of file + pass
