Repository: incubator-eagle Updated Branches: refs/heads/master fb853dfd8 -> 285524efb
[EAGLE-807] Support configuration path argument from CLI * Support configuration path argument from CLI and fix config bug * Fix python script multi-threading problem Author: Hao Chen <h...@apache.org> Closes #697 from haoch/refactorScriptConfig. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/285524ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/285524ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/285524ef Branch: refs/heads/master Commit: 285524efb7e898c1ace70c70a62933d8370b1f63 Parents: fb853df Author: Hao Chen <h...@apache.org> Authored: Tue Nov 29 17:27:31 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Tue Nov 29 17:27:31 2016 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_collector/metric_collector.py | 60 +++++++++++++------- 1 file changed, 40 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/285524ef/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index e165ea2..45a3877 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -51,13 +51,16 @@ class Helper: :param config_file: :return: """ - # read the self-defined filters - script_dir = os.path.dirname(__file__) - rel_path = "./" + config_file - abs_file_path = os.path.join(script_dir, rel_path) + abs_file_path = config_file + if not os.path.isfile(abs_file_path): - logging.error(abs_file_path + " doesn't exist, please rename config-sample.json to config.json") - exit(1) + script_dir = os.path.dirname(__file__) + rel_path = "./" + config_file + abs_file_path = os.path.join(script_dir, rel_path) + if not os.path.isfile(abs_file_path): + raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json") + + logging.info("Using configuration file " + abs_file_path) f = open(abs_file_path, 'r') json_file = f.read() f.close() @@ -227,22 +230,20 @@ class KafkaMetricSender(MetricSender): self.kafka_client.close() class MetricCollector(threading.Thread): - def __init__(self): + def __init__(self,config = None): threading.Thread.__init__(self) - self.config = Helper.load_config() - self.sender = KafkaMetricSender(self.config) + self.config = None + self.sender = None self.fqdn = socket.getfqdn() - self.init(self.config) def init(self, config): + self.config = config + self.sender = KafkaMetricSender(self.config) + self.sender.open() pass def start(self): - try: - self.sender.open() - self.run() - finally: - self.sender.close() + super(MetricCollector, self).start() def collect(self, msg): if not msg.has_key("timestamp"): @@ -253,27 +254,45 @@ class MetricCollector(threading.Thread): msg["host"] = self.fqdn if not msg.has_key("site"): msg["site"] = self.config["env"]["site"] - self.sender.send(msg) + def close(self): + self.sender.close() + def run(self): raise Exception("`run` method should be overrode by sub-class before being called") - class Runner(object): @staticmethod - def run(*threads): + def run(*collectors): """ Execute concurrently :param threads: :return: """ - for thread in threads: + argv = sys.argv + if len(argv) == 1: + config = Helper.load_config() + elif len(argv) == 2: + config = Helper.load_config(argv[1]) + else: + raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) + + for collector in collectors: + try: + collector.init(config) + collector.start() + except Exception as e: + logging.exception(e) + + for collector in collectors: try: - thread.start() + collector.join() except Exception as e: logging.exception(e) + finally: + collector.close() class JmxMetricCollector(MetricCollector): selected_domain = None @@ -281,6 +300,7 @@ class JmxMetricCollector(MetricCollector): input_components = [] def init(self, config): + super(JmxMetricCollector, self).init(config) self.input_components = config["input"] for input in self.input_components: if not input.has_key("host"):