Repository: incubator-eagle
Updated Branches:
  refs/heads/master fb853dfd8 -> 285524efb


[EAGLE-807] Support configuration path argument from CLI

* Support configuration path argument from CLI and fix config bug
* Fix python script multi-threading problem

Author: Hao Chen <h...@apache.org>

Closes #697 from haoch/refactorScriptConfig.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/285524ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/285524ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/285524ef

Branch: refs/heads/master
Commit: 285524efb7e898c1ace70c70a62933d8370b1f63
Parents: fb853df
Author: Hao Chen <h...@apache.org>
Authored: Tue Nov 29 17:27:31 2016 +0800
Committer: Hao Chen <h...@apache.org>
Committed: Tue Nov 29 17:27:31 2016 +0800

----------------------------------------------------------------------
 .../hadoop_jmx_collector/metric_collector.py    | 60 +++++++++++++-------
 1 file changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/285524ef/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py 
b/eagle-external/hadoop_jmx_collector/metric_collector.py
index e165ea2..45a3877 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -51,13 +51,16 @@ class Helper:
         :param config_file:
         :return:
         """
-        # read the self-defined filters
-        script_dir = os.path.dirname(__file__)
-        rel_path = "./" + config_file
-        abs_file_path = os.path.join(script_dir, rel_path)
+        abs_file_path = config_file
+
         if not os.path.isfile(abs_file_path):
-            logging.error(abs_file_path + " doesn't exist, please rename 
config-sample.json to config.json")
-            exit(1)
+            script_dir = os.path.dirname(__file__)
+            rel_path = "./" + config_file
+            abs_file_path = os.path.join(script_dir, rel_path)
+            if not os.path.isfile(abs_file_path):
+                raise Exception(abs_file_path + " doesn't exist, please rename 
config-sample.json to config.json")
+
+        logging.info("Using configuration file " + abs_file_path)
         f = open(abs_file_path, 'r')
         json_file = f.read()
         f.close()
@@ -227,22 +230,20 @@ class KafkaMetricSender(MetricSender):
             self.kafka_client.close()
 
 class MetricCollector(threading.Thread):
-    def __init__(self):
+    def __init__(self,config = None):
         threading.Thread.__init__(self)
-        self.config = Helper.load_config()
-        self.sender = KafkaMetricSender(self.config)
+        self.config = None
+        self.sender = None
         self.fqdn = socket.getfqdn()
-        self.init(self.config)
 
     def init(self, config):
+        self.config = config
+        self.sender = KafkaMetricSender(self.config)
+        self.sender.open()
         pass
 
     def start(self):
-        try:
-            self.sender.open()
-            self.run()
-        finally:
-            self.sender.close()
+        super(MetricCollector, self).start()
 
     def collect(self, msg):
         if not msg.has_key("timestamp"):
@@ -253,27 +254,45 @@ class MetricCollector(threading.Thread):
             msg["host"] = self.fqdn
         if not msg.has_key("site"):
             msg["site"] = self.config["env"]["site"]
-
         self.sender.send(msg)
 
+    def close(self):
+        self.sender.close()
+
     def run(self):
         raise Exception("`run` method should be overrode by sub-class before 
being called")
 
-
 class Runner(object):
     @staticmethod
-    def run(*threads):
+    def run(*collectors):
         """
         Execute concurrently
 
         :param threads:
         :return:
         """
-        for thread in threads:
+        argv = sys.argv
+        if len(argv) == 1:
+            config = Helper.load_config()
+        elif len(argv) == 2:
+            config = Helper.load_config(argv[1])
+        else:
+            raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given 
too many arguments: " + str(argv))
+
+        for collector in collectors:
+            try:
+                collector.init(config)
+                collector.start()
+            except Exception as e:
+                logging.exception(e)
+
+        for collector in collectors:
             try:
-                thread.start()
+                collector.join()
             except Exception as e:
                 logging.exception(e)
+            finally:
+                collector.close()
 
 class JmxMetricCollector(MetricCollector):
     selected_domain = None
@@ -281,6 +300,7 @@ class JmxMetricCollector(MetricCollector):
     input_components = []
 
     def init(self, config):
+        super(JmxMetricCollector, self).init(config)
         self.input_components = config["input"]
         for input in self.input_components:
             if not input.has_key("host"):

Reply via email to