Repository: metron Updated Branches: refs/heads/master a0e756bf9 -> 309d8097a
METRON-1054 Ambari Mpack Attempts to Kill Topologies That Are Not Running (nickwallen) closes apache/metron#660 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/309d8097 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/309d8097 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/309d8097 Branch: refs/heads/master Commit: 309d8097a91cc40e46f187f672d651b00cdd90fa Parents: a0e756b Author: nickwallen <[email protected]> Authored: Mon Jul 24 09:47:06 2017 -0400 Committer: nickallen <[email protected]> Committed: Mon Jul 24 09:47:06 2017 -0400 ---------------------------------------------------------------------- .../package/scripts/enrichment_commands.py | 35 ++++++++----- .../package/scripts/enrichment_master.py | 4 +- .../package/scripts/indexing_commands.py | 55 ++++++++++++-------- .../CURRENT/package/scripts/indexing_master.py | 4 +- .../CURRENT/package/scripts/parser_commands.py | 34 +++++++++--- .../CURRENT/package/scripts/parser_master.py | 4 +- 6 files changed, 90 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py index 8ca97ad..f9a75b1 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py @@ -136,26 +136,37 @@ class EnrichmentCommands: self.set_kafka_acl_configured() - def start_enrichment_topology(self): + def start_enrichment_topology(self, env): Logger.info("Starting Metron enrichment topology: {0}".format(self.__enrichment_topology)) - start_cmd_template = """{0}/bin/start_enrichment_topology.sh \ - -s {1} \ - -z {2}""" - Logger.info('Starting ' + self.__enrichment_topology) - Execute(start_cmd_template.format(self.__params.metron_home, self.__enrichment_topology, self.__params.zookeeper_quorum), - user=self.__params.metron_user) + + if not self.is_topology_active(env): + start_cmd_template = """{0}/bin/start_enrichment_topology.sh \ + -s {1} \ + -z {2}""" + Logger.info('Starting ' + self.__enrichment_topology) + Execute(start_cmd_template.format(self.__params.metron_home, + self.__enrichment_topology, + self.__params.zookeeper_quorum), + user=self.__params.metron_user) + else: + Logger.info('Enrichment topology already running') Logger.info('Finished starting enrichment topology') - def stop_enrichment_topology(self): + def stop_enrichment_topology(self, env): Logger.info('Stopping ' + self.__enrichment_topology) - stop_cmd = 'storm kill ' + self.__enrichment_topology - Execute(stop_cmd, user=self.__params.metron_user) + + if self.is_topology_active(env): + stop_cmd = 'storm kill ' + self.__enrichment_topology + Execute(stop_cmd, user=self.__params.metron_user) + else: + Logger.info("Enrichment topology already stopped") + Logger.info('Done stopping enrichment topologies') def restart_enrichment_topology(self, env): Logger.info('Restarting the enrichment topologies') - self.stop_enrichment_topology() + self.stop_enrichment_topology(env) # Wait for old topology to be cleaned up by Storm, before starting again. retries = 0 @@ -167,7 +178,7 @@ class EnrichmentCommands: retries += 1 if not topology_active: - self.start_enrichment_topology() + self.start_enrichment_topology(env) Logger.info('Done restarting the enrichment topology') else: Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.') http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py index de3c3f8..f3453ea 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py @@ -71,7 +71,7 @@ class Enrichment(Script): if not commands.is_geo_configured(): commands.init_geo() - commands.start_enrichment_topology() + commands.start_enrichment_topology(env) def stop(self, env, upgrade_type=None): from params import params @@ -85,7 +85,7 @@ class Enrichment(Script): params.metron_principal_name, execute_user=params.metron_user) - commands.stop_enrichment_topology() + commands.stop_enrichment_topology(env) def status(self, env): from params import status_params http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py index b671c9d..711d4fc 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py @@ -95,37 +95,48 @@ class IndexingCommands: ) Logger.info('Done creating HDFS indexing directory') - def start_indexing_topology(self): - Logger.info("Starting Metron indexing topology: {0}".format(self.__indexing_topology)) - start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \ + def start_indexing_topology(self, env): + Logger.info('Starting ' + self.__indexing_topology) + + if not self.is_topology_active(env): + if self.__params.security_enabled: + metron_security.kinit(self.__params.kinit_path_local, + self.__params.metron_keytab_path, + self.__params.metron_principal_name, + execute_user=self.__params.metron_user) + start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \ -s {1} \ -z {2}""" - Logger.info('Starting ' + self.__indexing_topology) - if self.__params.security_enabled: - metron_security.kinit(self.__params.kinit_path_local, - self.__params.metron_keytab_path, - self.__params.metron_principal_name, - execute_user=self.__params.metron_user) - Execute(start_cmd_template.format(self.__params.metron_home, self.__indexing_topology, self.__params.zookeeper_quorum), - user=self.__params.metron_user) + Execute(start_cmd_template.format(self.__params.metron_home, + self.__indexing_topology, + self.__params.zookeeper_quorum), + user=self.__params.metron_user) + + else: + Logger.info('Indexing topology already running') Logger.info('Finished starting indexing topology') - def stop_indexing_topology(self): + def stop_indexing_topology(self, env): Logger.info('Stopping ' + self.__indexing_topology) - stop_cmd = 'storm kill ' + self.__indexing_topology - if self.__params.security_enabled: - metron_security.kinit(self.__params.kinit_path_local, - self.__params.metron_keytab_path, - self.__params.metron_principal_name, - execute_user=self.__params.metron_user) - Execute(stop_cmd, - user=self.__params.metron_user) + + if self.is_topology_active(env): + if self.__params.security_enabled: + metron_security.kinit(self.__params.kinit_path_local, + self.__params.metron_keytab_path, + self.__params.metron_principal_name, + execute_user=self.__params.metron_user) + stop_cmd = 'storm kill ' + self.__indexing_topology + Execute(stop_cmd, user=self.__params.metron_user) + + else: + Logger.info("Indexing topology already stopped") + Logger.info('Done stopping indexing topologies') def restart_indexing_topology(self, env): Logger.info('Restarting the indexing topologies') - self.stop_indexing_topology() + self.stop_indexing_topology(env) # Wait for old topology to be cleaned up by Storm, before starting again. retries = 0 @@ -138,7 +149,7 @@ class IndexingCommands: if not topology_active: Logger.info('Waiting for storm kill to complete') time.sleep(30) - self.start_indexing_topology() + self.start_indexing_topology(env) Logger.info('Done restarting the indexing topologies') else: Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.') http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py index 76b6b60..7e111cf 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py @@ -73,13 +73,13 @@ class Indexing(Script): env.set_params(params) self.configure(env) commands = IndexingCommands(params) - commands.start_indexing_topology() + commands.start_indexing_topology(env) def stop(self, env, upgrade_type=None): from params import params env.set_params(params) commands = IndexingCommands(params) - commands.stop_indexing_topology() + commands.stop_indexing_topology(env) def status(self, env): from params import status_params http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py index a6e1f1f..a487298 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py @@ -103,7 +103,7 @@ class ParserCommands: topics, [parser + '_parser' for parser in self.get_parser_list()]) - def start_parser_topologies(self): + def start_parser_topologies(self, env): Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list())) start_cmd_template = """{0}/bin/start_parser_topology.sh \ -k {1} \ @@ -117,7 +117,11 @@ class ParserCommands: self.__params.metron_keytab_path, self.__params.metron_principal_name, execute_user=self.__params.metron_user) - for parser in self.get_parser_list(): + + stopped_parsers = set(self.get_parser_list()) - self.get_running_topology_names(env) + Logger.info('Parsers that need started: ' + str(stopped_parsers)) + + for parser in stopped_parsers: Logger.info('Starting ' + parser) Execute(start_cmd_template.format(self.__params.metron_home, self.__params.kafka_brokers, @@ -128,9 +132,13 @@ class ParserCommands: Logger.info('Finished starting parser topologies') - def stop_parser_topologies(self): + def stop_parser_topologies(self, env): Logger.info('Stopping parsers') - for parser in self.get_parser_list(): + + running_parsers = set(self.get_parser_list()) & self.get_running_topology_names(env) + Logger.info('Parsers that need stopped: ' + str(running_parsers)) + + for parser in running_parsers: Logger.info('Stopping ' + parser) stop_cmd = 'storm kill ' + parser if self.__params.security_enabled: @@ -143,14 +151,16 @@ class ParserCommands: def restart_parser_topologies(self, env): Logger.info('Restarting the parser topologies') - self.stop_parser_topologies() + self.stop_parser_topologies(env) + attempt_count = 0 while self.topologies_running(env): if attempt_count > 2: raise Exception("Unable to kill topologies") attempt_count += 1 time.sleep(10) - self.start_parser_topologies() + self.start_parser_topologies(env) + Logger.info('Done restarting the parser topologies') def topologies_exist(self): @@ -166,6 +176,18 @@ class ParserCommands: return True return False + def get_running_topology_names(self, env): + """ + Returns the names of all 'running' topologies. A running topology + is one that is either active or rebalancing. + :param env: Environment + :return: Set containing the names of all running topologies. + """ + env.set_params(self.__params) + topology_status = metron_service.get_running_topologies(self.__params) + topology_names = ([name for name in topology_status if topology_status[name] in ['ACTIVE', 'REBALANCING']]) + return set(topology_names) + def topologies_running(self, env): env.set_params(self.__params) all_running = True http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py index 6f43de4..9d3ebde 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py @@ -58,13 +58,13 @@ class ParserMaster(Script): env.set_params(params) self.configure(env) commands = ParserCommands(params) - commands.start_parser_topologies() + commands.start_parser_topologies(env) def stop(self, env, upgrade_type=None): from params import params env.set_params(params) commands = ParserCommands(params) - commands.stop_parser_topologies() + commands.stop_parser_topologies(env) def status(self, env): from params import status_params
