Repository: metron Updated Branches: refs/heads/master 4c908b95b -> fef8833c1
METRON-1249 Improve Metron MPack Service Checks (nickwallen) closes apache/metron#799 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fef8833c Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fef8833c Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fef8833c Branch: refs/heads/master Commit: fef8833c153fabef597084f4aace8303d9f7116e Parents: 4c908b9 Author: nickwallen <[email protected]> Authored: Mon Oct 16 17:53:20 2017 -0400 Committer: nickallen <[email protected]> Committed: Mon Oct 16 17:53:20 2017 -0400 ---------------------------------------------------------------------- .../package/scripts/enrichment_commands.py | 62 +++- .../package/scripts/indexing_commands.py | 41 ++- .../package/scripts/management_ui_commands.py | 26 ++ .../package/scripts/management_ui_master.py | 12 +- .../CURRENT/package/scripts/metron_service.py | 281 ++++++++++++++++--- .../CURRENT/package/scripts/parser_commands.py | 66 +++-- .../package/scripts/profiler_commands.py | 44 ++- .../CURRENT/package/scripts/rest_commands.py | 29 +- .../CURRENT/package/scripts/service_check.py | 49 +++- 9 files changed, 520 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py index 794b6a5..90a690e 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/enrichment_commands.py @@ -18,6 +18,7 @@ limitations under the License. import os import time from datetime import datetime +from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute, File @@ -47,6 +48,12 @@ class EnrichmentCommands: self.__hbase_acl_configured = os.path.isfile(self.__params.enrichment_hbase_acl_configured_flag_file) self.__geo_configured = os.path.isfile(self.__params.enrichment_geo_configured_flag_file) + def __get_topics(self): + return [self.__enrichment_topic, self.__params.enrichment_error_topic] + + def __get_kafka_acl_groups(self): + return [self.__enrichment_topic] + def is_kafka_configured(self): return self.__kafka_configured @@ -105,15 +112,15 @@ class EnrichmentCommands: def init_kafka_topics(self): Logger.info('Creating Kafka topics for enrichment') # All errors go to indexing topics, so create it here if it's not already - metron_service.init_kafka_topics(self.__params, [self.__enrichment_topic, self.__params.enrichment_error_topic]) + metron_service.init_kafka_topics(self.__params, self.__get_topics()) self.set_kafka_configured() def init_kafka_acls(self): Logger.info('Creating Kafka ACls for enrichment') + metron_service.init_kafka_acls(self.__params, self.__get_topics()) + # Enrichment topic names matches group - metron_service.init_kafka_acls(self.__params, - [self.__enrichment_topic, self.__params.enrichment_error_topic], - [self.__enrichment_topic]) + metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) self.set_kafka_acl_configured() @@ -182,6 +189,7 @@ class EnrichmentCommands: self.__params.hbase_keytab_path, self.__params.hbase_principal_name, execute_user=self.__params.hbase_user) + cmd = "echo \"create '{0}','{1}'\" | hbase shell -n" add_enrichment_cmd = cmd.format(self.__params.enrichment_hbase_table, self.__params.enrichment_hbase_cf) Execute(add_enrichment_cmd, @@ -211,6 +219,7 @@ class EnrichmentCommands: self.__params.hbase_keytab_path, self.__params.hbase_principal_name, execute_user=self.__params.hbase_user) + cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n" add_enrichment_acl_cmd = cmd.format(self.__params.metron_user, self.__params.enrichment_hbase_table) Execute(add_enrichment_acl_cmd, @@ -232,3 +241,48 @@ class EnrichmentCommands: Logger.info("Done setting HBase ACLs") self.set_hbase_acl_configured() + + def service_check(self, env): + """ + Performs a service check for Enrichment. + :param env: Environment + """ + Logger.info("Checking for Geo database") + metron_service.check_hdfs_file_exists(self.__params, self.__params.geoip_hdfs_dir + "/GeoLite2-City.mmdb.gz") + + Logger.info('Checking Kafka topics for Enrichment') + metron_service.check_kafka_topics(self.__params, self.__get_topics()) + + Logger.info("Checking HBase for Enrichment") + metron_service.check_hbase_table( + self.__params, + self.__params.enrichment_hbase_table) + metron_service.check_hbase_column_family( + self.__params, + self.__params.enrichment_hbase_table, + self.__params.enrichment_hbase_cf) + + Logger.info("Checking HBase for Threat Intel") + metron_service.check_hbase_table( + self.__params, + self.__params.threatintel_hbase_table) + metron_service.check_hbase_column_family( + self.__params, + self.__params.threatintel_hbase_table, + self.__params.threatintel_hbase_cf) + + if self.__params.security_enabled: + + Logger.info('Checking Kafka ACLs for Enrichment') + metron_service.check_kafka_acls(self.__params, self.__get_topics()) + metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) + + Logger.info("Checking HBase ACLs for Enrichment") + metron_service.check_hbase_acls(self.__params, self.__params.enrichment_hbase_table) + metron_service.check_hbase_acls(self.__params, self.__params.threatintel_hbase_table) + + Logger.info("Checking for Enrichment topology") + if not self.is_topology_active(env): + raise Fail("Enrichment topology not running") + + Logger.info("Enrichment service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py index 50457d0..17374eb 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py @@ -19,6 +19,7 @@ import os import time from datetime import datetime +from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute, File @@ -49,6 +50,13 @@ class IndexingCommands: self.__hbase_acl_configured = os.path.isfile(self.__params.indexing_hbase_acl_configured_flag_file) self.__hdfs_perm_configured = os.path.isfile(self.__params.indexing_hdfs_perm_configured_flag_file) + def __get_topics(self): + return [self.__indexing_topic] + + def __get_kafka_acl_groups(self): + # Indexed topic names matches the group + return [self.__indexing_topic] + def is_configured(self): return self.__configured @@ -121,12 +129,12 @@ class IndexingCommands: def init_kafka_topics(self): Logger.info('Creating Kafka topics for indexing') - metron_service.init_kafka_topics(self.__params, [self.__indexing_topic]) + metron_service.init_kafka_topics(self.__params, self.__get_topics()) def init_kafka_acls(self): Logger.info('Creating Kafka ACLs for indexing') - # Indexed topic names matches the group - metron_service.init_kafka_acls(self.__params, [self.__indexing_topic], [self.__indexing_topic]) + metron_service.init_kafka_acls(self.__params, self.__get_topics()) + metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) def init_hdfs_dir(self): Logger.info('Setting up HDFS indexing directory') @@ -213,3 +221,30 @@ class IndexingCommands: is_running = topologies[self.__indexing_topology] in ['ACTIVE', 'REBALANCING'] active &= is_running return active + + def service_check(self, env): + """ + Performs a service check for Indexing. + :param env: Environment + """ + Logger.info('Checking Kafka topics for Indexing') + metron_service.check_kafka_topics(self.__params, self.__get_topics()) + + Logger.info("Checking HBase for Indexing") + metron_service.check_hbase_table(self.__params, self.__params.update_hbase_table) + metron_service.check_hbase_column_family(self.__params, self.__params.update_hbase_table, self.__params.update_hbase_cf) + + if self.__params.security_enabled: + + Logger.info('Checking Kafka ACLs for Indexing') + metron_service.check_kafka_acls(self.__params, self.__get_topics()) + metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) + + Logger.info("Checking HBase ACLs for Indexing") + metron_service.check_hbase_acls(self.__params, self.__params.update_hbase_table) + + Logger.info("Checking for Indexing topology") + if not self.is_topology_active(env): + raise Fail("Indexing topology not running") + + Logger.info("Indexing service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py index de67f64..7427046 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_commands.py @@ -20,6 +20,9 @@ limitations under the License. from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute, File +from resource_management.core.exceptions import ComponentIsNotRunning +from resource_management.core.exceptions import ExecutionFailed +from resource_management.libraries.functions.get_user_call_output import get_user_call_output # Wrap major operations and functionality in this class class ManagementUICommands: @@ -44,3 +47,26 @@ class ManagementUICommands: Logger.info('Restarting the Management UI') Execute('service metron-management-ui restart') Logger.info('Done restarting the Management UI') + + def check_status(self, env): + Logger.info('Status check the Management UI') + cmd = "curl --max-time 3 {0}:{1}" + try: + Execute( + cmd.format(self.__params.hostname, self.__params.metron_management_ui_port), + tries=3, + try_sleep=5, + logoutput=False, + user=self.__params.metron_user) + except: + raise ComponentIsNotRunning() + + def service_check(self, env): + """ + Performs a service check for the Management UI + :param env: Environment + """ + from params import status_params + env.set_params(status_params) + self.check_status(env) + Logger.info("Management UI service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py index 54e91aa..15bcd94 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/management_ui_master.py @@ -17,16 +17,12 @@ limitations under the License. """ -from resource_management.core.exceptions import ComponentIsNotRunning -from resource_management.core.exceptions import ExecutionFailed from resource_management.core.resources.system import Directory from resource_management.core.resources.system import File from resource_management.core.source import Template from resource_management.libraries.functions.format import format -from resource_management.libraries.functions.get_user_call_output import get_user_call_output from resource_management.libraries.script import Script from resource_management.core.resources.system import Execute - from resource_management.core.logger import Logger from management_ui_commands import ManagementUICommands @@ -40,7 +36,6 @@ class ManagementUIMaster(Script): self.install_packages(env) def configure(self, env, upgrade_type=None, config_dir=None): - print 'configure managment_ui' from params import params env.set_params(params) File(format("/etc/default/metron"), @@ -77,11 +72,8 @@ class ManagementUIMaster(Script): def status(self, env): from params import status_params env.set_params(status_params) - cmd = format('curl --max-time 3 {hostname}:{metron_management_ui_port}') - try: - get_user_call_output(cmd, user=status_params.metron_user) - except ExecutionFailed: - raise ComponentIsNotRunning() + commands = ManagementUICommands(status_params) + commands.check_status(env) def restart(self, env): from params import params http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py index 84dc805..2ae0b08 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py @@ -20,12 +20,12 @@ import subprocess from datetime import datetime from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail from resource_management.core.resources.system import Directory, File from resource_management.core.resources.system import Execute from resource_management.core.source import InlineTemplate from resource_management.libraries.functions import format as ambari_format -from resource_management.libraries.functions.get_user_call_output import \ - get_user_call_output +from resource_management.libraries.functions.get_user_call_output import get_user_call_output from metron_security import kinit @@ -134,7 +134,6 @@ def refresh_configs(params): def get_running_topologies(params): Logger.info('Getting Running Storm Topologies from Storm REST Server') - Logger.info('Security enabled? ' + str(params.security_enabled)) # Want to sudo to the metron user and kinit as them so we aren't polluting root with Metron's Kerberos tickets. @@ -201,36 +200,246 @@ def init_kafka_topics(params, topics): user=params.kafka_user, tries=3, try_sleep=5, logoutput=True) Logger.info("Done creating Kafka topics") -def init_kafka_acls(params, topics, groups): - Logger.info('Creating Kafka ACLs') - - acl_template = """{0}/kafka-acls.sh \ - --authorizer kafka.security.auth.SimpleAclAuthorizer \ - --authorizer-properties zookeeper.connect={1} \ - --add \ - --allow-principal User:{2} \ - --topic {3}""" - - for topic in topics: - Logger.info("Creating ACL for topic '{0}'".format(topic)) - Execute(acl_template.format(params.kafka_bin_dir, - params.zookeeper_quorum, - params.metron_user, - topic), - user=params.kafka_user, tries=3, try_sleep=5, logoutput=True) - - acl_template = """{0}/kafka-acls.sh \ - --authorizer kafka.security.auth.SimpleAclAuthorizer \ - --authorizer-properties zookeeper.connect={1} \ - --add \ - --allow-principal User:{2} \ - --group {3}""" - - for group in groups: - Logger.info("Creating ACL for group '{0}'".format(group)) - Execute(acl_template.format(params.kafka_bin_dir, - params.zookeeper_quorum, - params.metron_user, - group), - user=params.kafka_user, tries=3, try_sleep=5, logoutput=True) - Logger.info("Done creating Kafka ACLs") +def init_kafka_acls(params, topics): + Logger.info('Creating Kafka topic ACLs') + acl_template = """{0}/kafka-acls.sh \ + --authorizer kafka.security.auth.SimpleAclAuthorizer \ + --authorizer-properties zookeeper.connect={1} \ + --add \ + --allow-principal User:{2} \ + --topic {3}""" + + for topic in topics: + Logger.info("Creating ACL for topic '{0}'".format(topic)) + Execute(acl_template.format(params.kafka_bin_dir, + params.zookeeper_quorum, + params.metron_user, + topic), + user=params.kafka_user, tries=3, try_sleep=5, logoutput=True) + +def init_kafka_acl_groups(params, groups): + Logger.info('Creating Kafka group ACLs') + acl_template = """{0}/kafka-acls.sh \ + --authorizer kafka.security.auth.SimpleAclAuthorizer \ + --authorizer-properties zookeeper.connect={1} \ + --add \ + --allow-principal User:{2} \ + --group {3}""" + + for group in groups: + Logger.info("Creating ACL for group '{0}'".format(group)) + Execute(acl_template.format(params.kafka_bin_dir, + params.zookeeper_quorum, + params.metron_user, + group), + user=params.kafka_user, tries=3, try_sleep=5, logoutput=True) + +def execute(cmd, user, err_msg=None, tries=3, try_sleep=5, logoutput=True, path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin'): + """ + Executes a command and raises an appropriate error message if the command + fails. + :param cmd: The command to execute. + :param user: The user to execute the command as. + :param err_msg: The error message to display if the command fails. + :param tries: The number of attempts to execute the command. + :param try_sleep: The time between attempts. + :param logoutput: If true, log the command output. + :param path: The path use when running the command. + :return: + """ + try: + Execute(cmd, tries=tries, try_sleep=try_sleep, logoutput=logoutput, user=user, path=path) + except: + if err_msg is None: + err_msg = "Execution failed: cmd={0}, user={1}".format(cmd, user) + raise Fail(err_msg) + +def check_kafka_topics(params, topics): + """ + Validates that the Kafka topics exist. An exception is raised if any of the + topics do not exist. + :param params: + :param topics: A list of topic names. + """ + + # if needed kinit as 'metron' + if params.security_enabled: + kinit(params.kinit_path_local, + params.metron_keytab_path, + params.metron_principal_name, + execute_user=params.metron_user) + + template = """{0}/kafka-topics.sh \ + --zookeeper {1} \ + --list | \ + awk 'BEGIN {{cnt=0;}} /{2}/ {{cnt++}} END {{if (cnt > 0) {{exit 0}} else {{exit 1}}}}'""" + + for topic in topics: + Logger.info("Checking existence of Kafka topic '{0}'".format(topic)) + cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic) + err_msg = "Missing Kafka topic; topic={0}".format(topic) + execute(cmd, user=params.kafka_user, err_msg=err_msg) + +def check_hbase_table(params, table): + """ + Validates that an HBase table exists. An exception is raised if the table + does not exist. + :param params: + :param table: The name of the HBase table. + """ + Logger.info("Checking HBase table '{0}'".format(table)) + + # if needed kinit as 'hbase' + if params.security_enabled: + kinit(params.kinit_path_local, + params.hbase_keytab_path, + params.hbase_principal_name, + execute_user=params.hbase_user) + + template = "echo \"exists '{0}'\" | hbase shell -n | grep 'Table {1} does exist'" + cmd = template.format(table, table) + err_msg = "Missing HBase table; table={0}".format(table) + execute(cmd, user=params.hbase_user, err_msg=err_msg) + +def check_hbase_column_family(params, table, column_family): + """ + Validates that an HBase column family exists. An exception is raised if the + column family does not exist. + :param params: + :param table: The name of the HBase table. + :param column_family: The name of the HBase column family. + """ + Logger.info("Checking column family '{0}:{1}'".format(table, column_family)) + + # if needed kinit as 'hbase' + if params.security_enabled: + kinit(params.kinit_path_local, + params.hbase_keytab_path, + params.hbase_principal_name, + execute_user=params.hbase_user) + + template = "echo \"desc '{0}'\" | hbase shell -n | grep \"NAME => '{1}'\"" + cmd = template.format(table, column_family) + err_msg = "Missing HBase column family; table={0}, cf={1}".format(table, column_family) + execute(cmd, user=params.hbase_user, err_msg=err_msg) + +def check_hbase_acls(params, table, user=None, permissions="READ,WRITE"): + """ + Validates that HBase table permissions exist for a user. An exception is + raised if the permissions do not exist. + :param params: + :param table: The name of the HBase table. + :param user: The name of the user. + :param permissions: The permissions that should exist. + """ + if user is None: + user = params.metron_user + Logger.info("Checking HBase ACLs; table={0}, user={1}, permissions={2}".format(table, user, permissions)) + + # if needed kinit as 'hbase' + if params.security_enabled: + kinit(params.kinit_path_local, + params.hbase_keytab_path, + params.hbase_principal_name, + execute_user=params.hbase_user) + + + + template = """echo "user_permission '{0}'" | \ + hbase shell -n | \ + grep " {1} " | \ + grep "actions={2}" + """ + cmd = template.format(table, user, permissions) + err_msg = "Missing HBase access; table={0}, user={1}, permissions={2}".format(table, user, permissions) + execute(cmd, user=params.hbase_user, err_msg=err_msg) + +def check_hdfs_dir_exists(params, path, user=None): + """ + Validate that a directory exists in HDFS. + :param params: + :param path: The directory path in HDFS. + :param user: The user to execute the check under. + """ + if user is None: + user = params.metron_user + Logger.info("Checking HDFS; directory={0} user={1}".format(path, user)) + + # if needed kinit as 'metron' + if params.security_enabled: + kinit(params.kinit_path_local, + params.metron_keytab_path, + params.metron_principal_name, + execute_user=params.metron_user) + + template = "{0}/hdfs dfs -test -d {1}" + cmd = template.format(params.hadoop_bin_dir, path) + err_msg = "Missing directory in HDFS: directory={0} user={1}".format(path, user) + execute(cmd, user=params.metron_user, err_msg=err_msg) + +def check_hdfs_file_exists(params, path, user=None): + """ + Validate that a file exists in HDFS. + :param params: + :param path: The file path in HDFS. + :param user: The user to execute the check under. + """ + if user is None: + user = params.metron_user + Logger.info("Checking HDFS; file={0}, user={1}".format(path, user)) + + # if needed kinit as 'metron' + if params.security_enabled: + kinit(params.kinit_path_local, + params.metron_keytab_path, + params.metron_principal_name, + execute_user=params.metron_user) + + template = "{0}/hdfs dfs -test -f {1}" + cmd = template.format(params.hadoop_bin_dir, path) + err_msg = "Missing file in HDFS; file={0}".format(path) + execute(cmd, user=user, err_msg=err_msg) + +def check_kafka_acls(params, topics, user=None): + """ + Validate that permissions have been granted for a list of Kakfa topics. + :param params: + :param topics: A list of topic names. + :param user: The user whose access is checked. + """ + if user is None: + user = params.metron_user + + template = """{0}/kafka-acls.sh \ + --authorizer kafka.security.auth.SimpleAclAuthorizer \ + --authorizer-properties zookeeper.connect={1} \ + --topic {2} \ + --list | grep 'User:{3}'""" + + for topic in topics: + Logger.info("Checking ACL; topic={0}, user={1}'".format(topic, user)) + cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, topic, user) + err_msg = "Missing Kafka access; topic={0}, user={1}".format(topic, user) + execute(cmd, user=params.kafka_user, err_msg=err_msg) + +def check_kafka_acl_groups(params, groups, user=None): + """ + Validate that Kafka group permissions have been granted. + :param params: + :param groups: A list of group name. + :param user: The user whose access is checked. + """ + if user is None: + user = params.metron_user + + template = """{0}/kafka-acls.sh \ + --authorizer kafka.security.auth.SimpleAclAuthorizer \ + --authorizer-properties zookeeper.connect={1} \ + --group {2} \ + --list | grep 'User:{3}'""" + + for group in groups: + Logger.info("Checking group ACL for topic '{0}'".format(group)) + cmd = template.format(params.kafka_bin_dir, params.zookeeper_quorum, group, user) + err_msg = "Missing Kafka group access; group={0}, user={1}".format(group, user) + execute(cmd, user=params.kafka_user, err_msg=err_msg) http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py index 9483498..274306a 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/parser_commands.py @@ -24,6 +24,7 @@ import subprocess import time from datetime import datetime +from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute, File @@ -50,6 +51,17 @@ class ParserCommands: def __get_parsers(self, params): return params.parsers.replace(' ', '').split(',') + def __get_topics(self): + # All errors go to indexing topics, so create it here if it's not already + # Getting topics this way is a bit awkward, but I don't want to append to actual list, so copy it + topics = list(self.get_parser_list()) + topics.append(self.__params.enrichment_error_topic) + return topics + + def __get_kafka_acl_groups(self): + # Parser group is the parser name + '_parser' + return [parser + '_parser' for parser in self.get_parser_list()] + def is_configured(self): return self.__configured @@ -63,9 +75,13 @@ class ParserCommands: metron_service.set_configured(self.__params.metron_user, self.__params.parsers_acl_configured_flag_file, "Setting Parsers ACL configured to true") def init_parsers(self): - Logger.info( - "Copying grok patterns from local directory '{0}' to HDFS '{1}'".format(self.__params.local_grok_patterns_dir, - self.__params.hdfs_grok_patterns_dir)) + self.init_grok_patterns() + Logger.info("Done initializing parser configuration") + + def init_grok_patterns(self): + Logger.info("Copying grok patterns from local directory '{0}' to HDFS '{1}'" + .format(self.__params.local_grok_patterns_dir, + self.__params.hdfs_grok_patterns_dir)) self.__params.HdfsResource(self.__params.hdfs_grok_patterns_dir, type="directory", @@ -75,29 +91,17 @@ class ParserCommands: source=self.__params.local_grok_patterns_dir, recursive_chown = True) - Logger.info("Done initializing parser configuration") - def get_parser_list(self): return self.__parser_list def init_kafka_topics(self): - Logger.info('Creating Kafka topics for parsers') - # All errors go to indexing topics, so create it here if it's not already - # Getting topics this way is a bit awkward, but I don't want to append to actual list, so copy it - topics = list(self.get_parser_list()) - topics.append(self.__params.enrichment_error_topic) - metron_service.init_kafka_topics(self.__params, topics) + Logger.info('Creating Kafka topics for Parsers') + metron_service.init_kafka_topics(self.__params, self.__get_topics()) def init_kafka_acls(self): - Logger.info('Creating Kafka ACLs for parsers') - - # Getting topics this way is a bit awkward, but I don't want to modify the actual list, so copy it - topics = list(self.get_parser_list()) - topics.append(self.__params.enrichment_error_topic) - # Parser group is the parser name + '_parser' - metron_service.init_kafka_acls(self.__params, - topics, - [parser + '_parser' for parser in self.get_parser_list()]) + Logger.info('Creating Kafka ACLs for Parsers') + metron_service.init_kafka_acls(self.__params, self.__get_topics()) + metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) def start_parser_topologies(self, env): Logger.info("Starting Metron parser topologies: {0}".format(self.get_parser_list())) @@ -212,3 +216,25 @@ class ParserCommands: def __is_running(self, status): return status in ['ACTIVE', 'REBALANCING'] + + def service_check(self, env): + """ + Performs a service check for the Parsers. + :param env: Environment + """ + Logger.info("Checking for grok patterns in HDFS for Parsers") + metron_service.check_hdfs_dir_exists(self.__params, self.__params.hdfs_grok_patterns_dir) + + Logger.info('Checking Kafka topics for Parsers') + metron_service.check_kafka_topics(self.__params, self.__get_topics()) + + if self.__params.security_enabled: + Logger.info('Checking Kafka ACLs for Parsers') + metron_service.check_kafka_acls(self.__params, self.__get_topics()) + metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) + + Logger.info("Checking for Parser topologies") + if not self.topologies_running(env): + raise Fail("Parser topologies not running") + + Logger.info("Parser service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py index 21c1225..41cab06 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/profiler_commands.py @@ -19,6 +19,7 @@ import os import time from datetime import datetime +from resource_management.core.exceptions import Fail from resource_management.core.logger import Logger from resource_management.core.resources.system import Execute, File @@ -47,6 +48,12 @@ class ProfilerCommands: self.__hbase_configured = os.path.isfile(self.__params.profiler_hbase_configured_flag_file) self.__hbase_acl_configured = os.path.isfile(self.__params.profiler_hbase_acl_configured_flag_file) + def __get_topics(self): + return [self.__profiler_topic] + + def __get_kafka_acl_groups(self): + return ['profiler'] + def is_configured(self): return self.__configured @@ -72,7 +79,7 @@ class ProfilerCommands: metron_service.set_configured(self.__params.metron_user, self.__params.profiler_hbase_acl_configured_flag_file, "Setting HBase ACL configured to True for profiler") def create_hbase_tables(self): - Logger.info("Creating HBase Tables for profiler") + Logger.info("Creating HBase table '{0}' for profiler".format(self.__params.profiler_hbase_table)) if self.__params.security_enabled: metron_security.kinit(self.__params.kinit_path_local, self.__params.hbase_keytab_path, @@ -88,12 +95,13 @@ class ProfilerCommands: user=self.__params.hbase_user ) - Logger.info("Done creating HBase Tables for profiler") self.set_hbase_configured() + Logger.info("Done creating HBase Tables for profiler") def init_kafka_acls(self): Logger.info('Creating Kafka ACls for profiler') - metron_service.init_kafka_acls(self.__params, [self.__profiler_topic], ['profiler']) + metron_service.init_kafka_acls(self.__params, self.__get_topics()) + metron_service.init_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) def set_hbase_acls(self): Logger.info("Setting HBase ACLs for profiler") @@ -102,6 +110,7 @@ class ProfilerCommands: self.__params.hbase_keytab_path, self.__params.hbase_principal_name, execute_user=self.__params.hbase_user) + cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n" add_table_acl_cmd = cmd.format(self.__params.metron_user, self.__params.profiler_hbase_table) Execute(add_table_acl_cmd, @@ -112,8 +121,8 @@ class ProfilerCommands: user=self.__params.hbase_user ) - Logger.info("Done setting HBase ACLs for profiler") self.set_hbase_acl_configured() + Logger.info("Done setting HBase ACLs for profiler") def start_profiler_topology(self, env): Logger.info('Starting ' + self.__profiler_topology) @@ -182,3 +191,30 @@ class ProfilerCommands: is_running = topologies[self.__profiler_topology] in ['ACTIVE', 'REBALANCING'] active &= is_running return active + + def service_check(self, env): + """ + Performs a service check for the Profiler. + :param env: Environment + """ + Logger.info('Checking Kafka topics for Profiler') + metron_service.check_kafka_topics(self.__params, [self.__params.profiler_input_topic]) + + Logger.info("Checking HBase table for profiler") + metron_service.check_hbase_table(self.__params, self.__params.profiler_hbase_table) + metron_service.check_hbase_column_family(self.__params, self.__params.profiler_hbase_table, self.__params.profiler_hbase_cf) + + if self.__params.security_enabled: + + Logger.info('Checking Kafka ACLs for Profiler') + metron_service.check_kafka_acls(self.__params, self.__get_topics()) + metron_service.check_kafka_acl_groups(self.__params, self.__get_kafka_acl_groups()) + + Logger.info('Checking Kafka ACLs for Profiler') + metron_service.check_hbase_acls(self.__params, self.__params.profiler_hbase_table) + + Logger.info("Checking for Profiler topology") + if not self.is_topology_active(env): + raise Fail("Profiler topology not running") + + Logger.info("Profiler service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py index 09d7106..542fc08 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/rest_commands.py @@ -59,16 +59,22 @@ class RestCommands: def set_acl_configured(self): metron_service.set_configured(self.__params.metron_user, self.__params.rest_acl_configured_flag_file, "Setting REST ACL configured to true") + def __get_topics(self): + return [self.__params.metron_escalation_topic] + def init_kafka_topics(self): Logger.info('Creating Kafka topics for rest') - topics = [self.__params.metron_escalation_topic] - metron_service.init_kafka_topics(self.__params, topics) + metron_service.init_kafka_topics(self.__params, self.__get_topics()) def init_kafka_acls(self): Logger.info('Creating Kafka ACLs for rest') + # The following topics must be permissioned for the rest application list operation - topics = [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic, self.__params.metron_escalation_topic] - metron_service.init_kafka_acls(self.__params, topics, ['metron-rest']) + topics = self.__get_topics() + [self.__params.ambari_kafka_service_check_topic, self.__params.consumer_offsets_topic] + metron_service.init_kafka_acls(self.__params, topics) + + groups = ['metron-rest'] + metron_service.init_kafka_acl_groups(self.__params, groups) def start_rest_application(self): Logger.info('Starting REST application') @@ -151,3 +157,18 @@ class RestCommands: self.stop_rest_application() self.start_rest_application() Logger.info('Done restarting the REST application') + + def service_check(self, env): + """ + Performs a service check for the Metron API. + :param env: Environment + """ + Logger.info('Checking Kafka topics for the REST application') + metron_service.check_kafka_topics(self.__params, self.__get_topics()) + + if self.__params.security_enabled: + + Logger.info('Checking Kafka topic ACL for the REST application') + metron_service.check_kafka_acls(self.__params, self.__get_topics()) + + Logger.info("REST application service check completed successfully") http://git-wip-us.apache.org/repos/asf/metron/blob/fef8833c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py index 7dd9dfb..1aebecb 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/service_check.py @@ -19,22 +19,53 @@ limitations under the License. """ from __future__ import print_function +from resource_management.core.logger import Logger from resource_management.libraries.script import Script -from indexing_commands import IndexingCommands from parser_commands import ParserCommands - +from enrichment_commands import EnrichmentCommands +from indexing_commands import IndexingCommands +from profiler_commands import ProfilerCommands +from rest_commands import RestCommands +from management_ui_commands import ManagementUICommands class ServiceCheck(Script): + def service_check(self, env): from params import params - parsercommands = ParserCommands(params) - indexingcommands = IndexingCommands(params) - all_found = parsercommands.topologies_running(env) and indexingcommands.is_topology_active(env) - if all_found: - exit(0) - else: - exit(1) + + # check the parsers + Logger.info("Performing Parser service check") + parser_cmds = ParserCommands(params) + parser_cmds.service_check(env) + + # check enrichment + Logger.info("Performing Enrichment service check") + enrichment_cmds = EnrichmentCommands(params) + enrichment_cmds.service_check(env) + + # check indexing + Logger.info("Performing Indexing service check") + indexing_cmds = IndexingCommands(params) + indexing_cmds.service_check(env) + + # check the profiler + Logger.info("Performing Profiler service check") + profiler_cmds = ProfilerCommands(params) + profiler_cmds.service_check(env) + + # check the rest api + Logger.info("Performing REST application service check") + rest_cmds = RestCommands(params) + rest_cmds.service_check(env) + + # check the management UI + Logger.info("Performing Management UI service check") + mgmt_cmds = ManagementUICommands(params) + mgmt_cmds.service_check(env) + + Logger.info("Metron service check completed successfully") + exit(0) if __name__ == "__main__":
