Repository: metron Updated Branches: refs/heads/master ff1f9cf52 -> ebdaf5f90
METRON-1798 Add mpack support for parser aggregation (anandsubbu) closes apache/metron#1215 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ebdaf5f9 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ebdaf5f9 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ebdaf5f9 Branch: refs/heads/master Commit: ebdaf5f905829d8ff59814bb3555666217f85abb Parents: ff1f9cf Author: anandsubbu <[email protected]> Authored: Wed Oct 3 18:57:48 2018 +0530 Committer: anandsubbu <[email protected]> Committed: Wed Oct 3 18:57:48 2018 +0530 ---------------------------------------------------------------------- .../configuration/metron-parsers-env.xml | 2 +- .../CURRENT/package/scripts/parser_commands.py | 49 ++++++++++++++++++-- .../metron-parsers/ParserChaining.md | 18 +++++++ 3 files changed, 63 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml index a9a498b..03a2594 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-parsers-env.xml @@ -21,7 +21,7 @@ <property> <name>parsers</name> <value>bro,snort,yaf</value> - <description>Metron parsers to deploy</description> + <description>Metron parsers to deploy. You can also specify an aggregated parser list by grouping them with double quotes. For example: "parserA,parserB",parserC,parserD</description> <display-name>Metron Parsers</display-name> </property> <property> http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py index 274306a..18780d9 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py @@ -20,6 +20,7 @@ limitations under the License. import os import re +import shlex import subprocess import time @@ -49,7 +50,45 @@ class ParserCommands: # get list of parsers def __get_parsers(self, params): - return params.parsers.replace(' ', '').split(',') + """ + Combines the list of parser topics and sends a unique list to be used for + Kafka topic creation and the like. + :param params: + :return: List containing the names of unique parsers + """ + parserBatches = list(self.__get_aggr_parsers(params)) + parsers = ','.join(s.translate(None, '"') for s in parserBatches) + # Get only the unique list of parser names + parsers = list(set(parsers.split(','))) + return parsers + + def __get_aggr_parsers(self, params): + """ + Fetches the list of aggregated (and regular) parsers and returns a list. + If the input list of parsers were "bro,snort,yaf", "bro,snort" and yaf, for example, + then this method will return ["bro,snort,yaf", "bro,snort", "yaf"] + :param params: + :return: List containing the names of parsers + """ + parserList = [] + parsers = shlex.shlex(params.parsers) + for name in parsers: + parserList.append(name.strip(',')) + return [s.translate(None, "'[]") for s in filter(None, parserList)] + + def get_parser_aggr_topology_names(self, params): + """ + Returns the names of regular and aggregated topologies as they would run in storm + An aggregated topology has the naming convention of 'parserA__parserB'. + For example, a list of parsers like ["bro,snort", yaf] will be returned as ["bro__snort", "yaf"] + :param params: + :return: List containing the names of parser topologies + """ + topologyName = [] + for parser in self.__get_aggr_parsers(params): + parser = parser.replace(",", "__").strip('"') + topologyName.append(parser) + return topologyName def __get_topics(self): # All errors go to indexing topics, so create it here if it's not already @@ -104,7 +143,7 @@ class ParserCommands: metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) def start_parser_topologies(self, env): - Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list())) + Logger.info("Starting Metron parser topologies: {0}".format(self.__get_aggr_parsers(self.__params))) start_cmd_template = """{0}/bin/start_parser_topology.sh \ -k {1} \ -z {2} \ @@ -118,7 +157,7 @@ class ParserCommands: self.__params.metron_principal_name, execute_user=self.__params.metron_user) - stopped_parsers = set(self.get_parser_list()) - self.get_running_topology_names(env) + stopped_parsers = set(self.__get_aggr_parsers(self.__params)) - self.get_running_topology_names(env) Logger.info('Parsers that need started: ' + str(stopped_parsers)) for parser in stopped_parsers: @@ -135,7 +174,7 @@ class ParserCommands: def stop_parser_topologies(self, env): Logger.info('Stopping parsers') - running_parsers = set(self.get_parser_list()) & self.get_running_topology_names(env) + running_parsers = set(self.get_parser_aggr_topology_names(self.__params)) & self.get_running_topology_names(env) Logger.info('Parsers that need stopped: ' + str(running_parsers)) for parser in running_parsers: @@ -192,7 +231,7 @@ class ParserCommands: env.set_params(self.__params) all_running = True topologies = metron_service.get_running_topologies(self.__params) - for parser in self.get_parser_list(): + for parser in self.get_parser_aggr_topology_names(self.__params): parser_found = False is_running = False if parser in topologies: http://git-wip-us.apache.org/repos/asf/metron/blob/ebdaf5f9/metron-platform/metron-parsers/ParserChaining.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/ParserChaining.md b/metron-platform/metron-parsers/ParserChaining.md index a28f11d..4cab74c 100644 --- a/metron-platform/metron-parsers/ParserChaining.md +++ b/metron-platform/metron-parsers/ParserChaining.md @@ -49,6 +49,24 @@ data emitted from any parser, we will need to adjust the downstream parsers to extract the enveloped data from the JSON blob and treat it as the data to parse. +# Aggregated Parsers with Parser Chaining +Chained parsers can be run as aggregated parsers. These parsers continue to use the sensor specific Kafka topics, and do not do internal routing to the appropriate sensor. + +Say, there were three sensors (`bro`, `snort` and `yaf`). Instead of creating a topology per sensor, all 3 can be run in a single aggregated parser. It is also possible to aggregate a subset of these parsers (e.g. run `bro` as it's own topology, and aggregate the other 2). + +The step to start an aggregated parsers then becomes +``` +$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s bro,snort,yaf +``` + +which will result in a single storm topology named `bro__snort__yaf` to run. + +Aggregated parsers can be specified using the Ambari Metron config as well under Services -> Metron -> Configs -> 'Parsers' tab -> 'Metron Parsers' field. The grouping is configured by enclosing the desired parsers in double quotes. + +Some examples of specifying aggregated parsers are as follows: +* "bro,snort,yaf" --> Will start a single topology named `bro__snort__yaf` +* "ciscopixA,ciscopixB",yaf,"squid,ciscopixC" --> Will start three topologies viz. `ciscopixA__ciscopixB`, `yaf` and `squid__ciscopixC` + # Architecting a Parser Chaining Solution in Metron Currently the approach to fulfill this requirement involves a couple
