Repository: metron
Updated Branches:
  refs/heads/master a0e756bf9 -> 309d8097a


METRON-1054 Ambari Mpack Attempts to Kill Topologies That Are Not Running 
(nickwallen) closes apache/metron#660


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/309d8097
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/309d8097
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/309d8097

Branch: refs/heads/master
Commit: 309d8097a91cc40e46f187f672d651b00cdd90fa
Parents: a0e756b
Author: nickwallen <[email protected]>
Authored: Mon Jul 24 09:47:06 2017 -0400
Committer: nickallen <[email protected]>
Committed: Mon Jul 24 09:47:06 2017 -0400

----------------------------------------------------------------------
 .../package/scripts/enrichment_commands.py      | 35 ++++++++-----
 .../package/scripts/enrichment_master.py        |  4 +-
 .../package/scripts/indexing_commands.py        | 55 ++++++++++++--------
 .../CURRENT/package/scripts/indexing_master.py  |  4 +-
 .../CURRENT/package/scripts/parser_commands.py  | 34 +++++++++---
 .../CURRENT/package/scripts/parser_master.py    |  4 +-
 6 files changed, 90 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
index 8ca97ad..f9a75b1 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py
@@ -136,26 +136,37 @@ class EnrichmentCommands:
 
         self.set_kafka_acl_configured()
 
-    def start_enrichment_topology(self):
+    def start_enrichment_topology(self, env):
         Logger.info("Starting Metron enrichment topology: 
{0}".format(self.__enrichment_topology))
-        start_cmd_template = """{0}/bin/start_enrichment_topology.sh \
-                                    -s {1} \
-                                    -z {2}"""
-        Logger.info('Starting ' + self.__enrichment_topology)
-        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__enrichment_topology, self.__params.zookeeper_quorum),
-                user=self.__params.metron_user)
+
+        if not self.is_topology_active(env):
+            start_cmd_template = """{0}/bin/start_enrichment_topology.sh \
+                                        -s {1} \
+                                        -z {2}"""
+            Logger.info('Starting ' + self.__enrichment_topology)
+            Execute(start_cmd_template.format(self.__params.metron_home,
+                                              self.__enrichment_topology,
+                                              self.__params.zookeeper_quorum),
+                    user=self.__params.metron_user)
+        else:
+            Logger.info('Enrichment topology already running')
 
         Logger.info('Finished starting enrichment topology')
 
-    def stop_enrichment_topology(self):
+    def stop_enrichment_topology(self, env):
         Logger.info('Stopping ' + self.__enrichment_topology)
-        stop_cmd = 'storm kill ' + self.__enrichment_topology
-        Execute(stop_cmd, user=self.__params.metron_user)
+
+        if self.is_topology_active(env):
+            stop_cmd = 'storm kill ' + self.__enrichment_topology
+            Execute(stop_cmd, user=self.__params.metron_user)
+        else:
+            Logger.info("Enrichment topology already stopped")
+
         Logger.info('Done stopping enrichment topologies')
 
     def restart_enrichment_topology(self, env):
         Logger.info('Restarting the enrichment topologies')
-        self.stop_enrichment_topology()
+        self.stop_enrichment_topology(env)
 
         # Wait for old topology to be cleaned up by Storm, before starting 
again.
         retries = 0
@@ -167,7 +178,7 @@ class EnrichmentCommands:
             retries += 1
 
         if not topology_active:
-            self.start_enrichment_topology()
+            self.start_enrichment_topology(env)
             Logger.info('Done restarting the enrichment topology')
         else:
             Logger.warning('Retries exhausted. Existing topology not cleaned 
up.  Aborting topology start.')

http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
index de3c3f8..f3453ea 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_master.py
@@ -71,7 +71,7 @@ class Enrichment(Script):
         if not commands.is_geo_configured():
             commands.init_geo()
 
-        commands.start_enrichment_topology()
+        commands.start_enrichment_topology(env)
 
     def stop(self, env, upgrade_type=None):
         from params import params
@@ -85,7 +85,7 @@ class Enrichment(Script):
                                   params.metron_principal_name,
                                   execute_user=params.metron_user)
 
-        commands.stop_enrichment_topology()
+        commands.stop_enrichment_topology(env)
 
     def status(self, env):
         from params import status_params

http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
index b671c9d..711d4fc 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py
@@ -95,37 +95,48 @@ class IndexingCommands:
                                    )
         Logger.info('Done creating HDFS indexing directory')
 
-    def start_indexing_topology(self):
-        Logger.info("Starting Metron indexing topology: 
{0}".format(self.__indexing_topology))
-        start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \
+    def start_indexing_topology(self, env):
+        Logger.info('Starting ' + self.__indexing_topology)
+
+        if not self.is_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            start_cmd_template = """{0}/bin/start_elasticsearch_topology.sh \
                                     -s {1} \
                                     -z {2}"""
-        Logger.info('Starting ' + self.__indexing_topology)
-        if self.__params.security_enabled:
-            metron_security.kinit(self.__params.kinit_path_local,
-                                  self.__params.metron_keytab_path,
-                                  self.__params.metron_principal_name,
-                                  execute_user=self.__params.metron_user)
-        Execute(start_cmd_template.format(self.__params.metron_home, 
self.__indexing_topology, self.__params.zookeeper_quorum),
-                user=self.__params.metron_user)
+            Execute(start_cmd_template.format(self.__params.metron_home,
+                                              self.__indexing_topology,
+                                              self.__params.zookeeper_quorum),
+                    user=self.__params.metron_user)
+
+        else:
+            Logger.info('Indexing topology already running')
 
         Logger.info('Finished starting indexing topology')
 
-    def stop_indexing_topology(self):
+    def stop_indexing_topology(self, env):
         Logger.info('Stopping ' + self.__indexing_topology)
-        stop_cmd = 'storm kill ' + self.__indexing_topology
-        if self.__params.security_enabled:
-            metron_security.kinit(self.__params.kinit_path_local,
-                                  self.__params.metron_keytab_path,
-                                  self.__params.metron_principal_name,
-                                  execute_user=self.__params.metron_user)
-        Execute(stop_cmd,
-                user=self.__params.metron_user)
+
+        if self.is_topology_active(env):
+            if self.__params.security_enabled:
+                metron_security.kinit(self.__params.kinit_path_local,
+                                      self.__params.metron_keytab_path,
+                                      self.__params.metron_principal_name,
+                                      execute_user=self.__params.metron_user)
+            stop_cmd = 'storm kill ' + self.__indexing_topology
+            Execute(stop_cmd, user=self.__params.metron_user)
+
+        else:
+            Logger.info("Indexing topology already stopped")
+
         Logger.info('Done stopping indexing topologies')
 
     def restart_indexing_topology(self, env):
         Logger.info('Restarting the indexing topologies')
-        self.stop_indexing_topology()
+        self.stop_indexing_topology(env)
 
         # Wait for old topology to be cleaned up by Storm, before starting 
again.
         retries = 0
@@ -138,7 +149,7 @@ class IndexingCommands:
         if not topology_active:
             Logger.info('Waiting for storm kill to complete')
             time.sleep(30)
-            self.start_indexing_topology()
+            self.start_indexing_topology(env)
             Logger.info('Done restarting the indexing topologies')
         else:
             Logger.warning('Retries exhausted. Existing topology not cleaned 
up.  Aborting topology start.')

http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
index 76b6b60..7e111cf 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py
@@ -73,13 +73,13 @@ class Indexing(Script):
         env.set_params(params)
         self.configure(env)
         commands = IndexingCommands(params)
-        commands.start_indexing_topology()
+        commands.start_indexing_topology(env)
 
     def stop(self, env, upgrade_type=None):
         from params import params
         env.set_params(params)
         commands = IndexingCommands(params)
-        commands.stop_indexing_topology()
+        commands.stop_indexing_topology(env)
 
     def status(self, env):
         from params import status_params

http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
index a6e1f1f..a487298 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py
@@ -103,7 +103,7 @@ class ParserCommands:
                                        topics,
                                        [parser + '_parser' for parser in 
self.get_parser_list()])
 
-    def start_parser_topologies(self):
+    def start_parser_topologies(self, env):
         Logger.info("Starting Metron parser topologies: 
{0}".format(self.get_parser_list()))
         start_cmd_template = """{0}/bin/start_parser_topology.sh \
                                     -k {1} \
@@ -117,7 +117,11 @@ class ParserCommands:
                                   self.__params.metron_keytab_path,
                                   self.__params.metron_principal_name,
                                   execute_user=self.__params.metron_user)
-        for parser in self.get_parser_list():
+
+        stopped_parsers = set(self.get_parser_list()) - 
self.get_running_topology_names(env)
+        Logger.info('Parsers that need started: ' + str(stopped_parsers))
+
+        for parser in stopped_parsers:
             Logger.info('Starting ' + parser)
             Execute(start_cmd_template.format(self.__params.metron_home,
                                               self.__params.kafka_brokers,
@@ -128,9 +132,13 @@ class ParserCommands:
 
         Logger.info('Finished starting parser topologies')
 
-    def stop_parser_topologies(self):
+    def stop_parser_topologies(self, env):
         Logger.info('Stopping parsers')
-        for parser in self.get_parser_list():
+
+        running_parsers = set(self.get_parser_list()) & 
self.get_running_topology_names(env)
+        Logger.info('Parsers that need stopped: ' + str(running_parsers))
+
+        for parser in running_parsers:
             Logger.info('Stopping ' + parser)
             stop_cmd = 'storm kill ' + parser
             if self.__params.security_enabled:
@@ -143,14 +151,16 @@ class ParserCommands:
 
     def restart_parser_topologies(self, env):
         Logger.info('Restarting the parser topologies')
-        self.stop_parser_topologies()
+        self.stop_parser_topologies(env)
+
         attempt_count = 0
         while self.topologies_running(env):
             if attempt_count > 2:
                 raise Exception("Unable to kill topologies")
             attempt_count += 1
             time.sleep(10)
-        self.start_parser_topologies()
+        self.start_parser_topologies(env)
+
         Logger.info('Done restarting the parser topologies')
 
     def topologies_exist(self):
@@ -166,6 +176,18 @@ class ParserCommands:
                         return True
         return False
 
+    def get_running_topology_names(self, env):
+        """
+        Returns the names of all 'running' topologies.  A running topology
+        is one that is either active or rebalancing.
+        :param env: Environment
+        :return: Set containing the names of all running topologies.
+        """
+        env.set_params(self.__params)
+        topology_status = metron_service.get_running_topologies(self.__params)
+        topology_names = ([name for name in topology_status if 
topology_status[name] in ['ACTIVE', 'REBALANCING']])
+        return set(topology_names)
+
     def topologies_running(self, env):
         env.set_params(self.__params)
         all_running = True

http://git-wip-us.apache.org/repos/asf/metron/blob/309d8097/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
index 6f43de4..9d3ebde 100755
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_master.py
@@ -58,13 +58,13 @@ class ParserMaster(Script):
         env.set_params(params)
         self.configure(env)
         commands = ParserCommands(params)
-        commands.start_parser_topologies()
+        commands.start_parser_topologies(env)
 
     def stop(self, env, upgrade_type=None):
         from params import params
         env.set_params(params)
         commands = ParserCommands(params)
-        commands.stop_parser_topologies()
+        commands.stop_parser_topologies(env)
 
     def status(self, env):
         from params import status_params

Reply via email to