[SPOT-213][SPOT-250][OA][API] add kerberos support
Project: http://git-wip-us.apache.org/repos/asf/incubator-spot/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spot/commit/d1f5a67f Tree: http://git-wip-us.apache.org/repos/asf/incubator-spot/tree/d1f5a67f Diff: http://git-wip-us.apache.org/repos/asf/incubator-spot/diff/d1f5a67f Branch: refs/heads/master Commit: d1f5a67f929090e2bad865d53d4389c69b176fc5 Parents: 7376c5e Author: natedogs911 <natedogs...@gmail.com> Authored: Thu Jan 18 11:02:39 2018 -0800 Committer: natedogs911 <natedogs...@gmail.com> Committed: Thu Jan 18 11:02:39 2018 -0800 ---------------------------------------------------------------------- spot-oa/api/resources/configurator.py | 69 +++++++++- spot-oa/api/resources/hdfs_client.py | 201 ++++++++++++++++++++++++---- spot-oa/api/resources/impala_engine.py | 29 +++- 3 files changed, 262 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/configurator.py ---------------------------------------------------------------------- diff --git a/spot-oa/api/resources/configurator.py b/spot-oa/api/resources/configurator.py index 5bda045..017732d 100644 --- a/spot-oa/api/resources/configurator.py +++ b/spot-oa/api/resources/configurator.py @@ -14,35 +14,90 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import ConfigParser -import os +from io import open + def configuration(): - conf_file = "/etc/spot.conf" config = ConfigParser.ConfigParser() - config.readfp(SecHead(open(conf_file))) + + try: + conf = open("/etc/spot.conf", "r") + except (OSError, IOError) as e: + print("Error opening: spot.conf" + " error: " + e.errno) + raise e + + config.readfp(SecHead(conf)) return config + def db(): conf = configuration() - return conf.get('conf', 'DBNAME').replace("'","").replace('"','') + return conf.get('conf', 'DBNAME').replace("'", "").replace('"', '') + def impala(): conf = configuration() - return conf.get('conf', 'IMPALA_DEM'),conf.get('conf', 'IMPALA_PORT') + return conf.get('conf', 'IMPALA_DEM'), conf.get('conf', 'IMPALA_PORT') + def hdfs(): conf = configuration() name_node = conf.get('conf',"NAME_NODE") web_port = conf.get('conf',"WEB_PORT") hdfs_user = conf.get('conf',"HUSER") - hdfs_user = hdfs_user.split("/")[-1].replace("'","").replace('"','') + hdfs_user = hdfs_user.split("/")[-1].replace("'", "").replace('"', '') return name_node,web_port,hdfs_user + def spot(): conf = configuration() - return conf.get('conf',"HUSER").replace("'","").replace('"','') + return conf.get('conf',"HUSER").replace("'", "").replace('"', '') + + +def kerberos_enabled(): + conf = configuration() + enabled = conf.get('conf', 'KERBEROS').replace("'", "").replace('"', '') + if enabled.lower() == 'true': + return True + else: + return False + + +def kerberos(): + conf = configuration() + if kerberos_enabled(): + principal = conf.get('conf', 'PRINCIPAL') + keytab = conf.get('conf', 'KEYTAB') + sasl_mech = conf.get('conf', 'SASL_MECH') + security_proto = conf.get('conf', 'SECURITY_PROTO') + return principal, keytab, sasl_mech, security_proto + else: + raise KeyError + + +def ssl_enabled(): + conf = configuration() + enabled = conf.get('conf', 'SSL') + if enabled.lower() == 'true': + return True + else: + return False + + +def ssl(): + conf = configuration() + if ssl_enabled(): + ssl_verify = conf.get('conf', 'SSL_VERIFY') + ca_location = conf.get('conf', 'CA_LOCATION') + cert = conf.get('conf', 'CERT') + key = conf.get('conf', 'KEY') + return ssl_verify, ca_location, cert, key + else: + raise KeyError + class SecHead(object): def __init__(self, fp): http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/hdfs_client.py ---------------------------------------------------------------------- diff --git a/spot-oa/api/resources/hdfs_client.py b/spot-oa/api/resources/hdfs_client.py index 31c5eba..e7f6bec 100644 --- a/spot-oa/api/resources/hdfs_client.py +++ b/spot-oa/api/resources/hdfs_client.py @@ -14,63 +14,216 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from hdfs import InsecureClient + from hdfs.util import HdfsError +from hdfs import Client +from hdfs.ext.kerberos import KerberosClient +from requests import Session from json import dump -import api.resources.configurator as Config +from threading import Lock +import logging +import configurator as Config +from sys import stderr + + +class Progress(object): + + """Basic progress tracker callback.""" + + def __init__(self, hdfs_path, nbytes): + self._data = {} + self._lock = Lock() + self._hpath = hdfs_path + self._nbytes = nbytes + + def __call__(self): + with self._lock: + if self._nbytes >= 0: + self._data[self._hpath] = self._nbytes + else: + stderr.write('%s\n' % (sum(self._data.values()), )) + + +class SecureKerberosClient(KerberosClient): + + """A new client subclass for handling HTTPS connections with Kerberos. + + :param url: URL to namenode. + :param cert: Local certificate. See `requests` documentation for details + on how to use this. + :param verify: Whether to check the host's certificate. WARNING: non production use only + :param \*\*kwargs: Keyword arguments passed to the default `Client` + constructor. + + """ + + def __init__(self, url, mutual_auth, cert=None, verify='true', **kwargs): + + self._logger = logging.getLogger("SPOT.INGEST.HDFS_client") + session = Session() + + if verify == 'true': + self._logger.info('SSL verification enabled') + session.verify = True + if cert is not None: + self._logger.info('SSL Cert: ' + cert) + if ',' in cert: + session.cert = [path.strip() for path in cert.split(',')] + else: + session.cert = cert + elif verify == 'false': + session.verify = False + + super(SecureKerberosClient, self).__init__(url, mutual_auth, session=session, **kwargs) + +class HdfsException(HdfsError): + def __init__(self, message): + super(HdfsException, self).__init__(message) + self.message = message + + +def get_client(user=None): + # type: (object) -> Client + + logger = logging.getLogger('SPOT.INGEST.HDFS.get_client') + hdfs_nm, hdfs_port, hdfs_user = Config.hdfs() + conf = {'url': '{0}:{1}'.format(hdfs_nm, hdfs_port)} + + if Config.ssl_enabled(): + ssl_verify, ca_location, cert, key = Config.ssl() + conf.update({'verify': ssl_verify.lower()}) + if cert: + conf.update({'cert': cert}) + + if Config.kerberos_enabled(): + krb_conf = {'mutual_auth': 'OPTIONAL'} + conf.update(krb_conf) + + # TODO: possible user parameter + logger.info('Client conf:') + for k,v in conf.iteritems(): + logger.info(k + ': ' + v) + + client = SecureKerberosClient(**conf) -def _get_client(user=None): - hdfs_nm,hdfs_port,hdfs_user = Config.hdfs() - client = InsecureClient('http://{0}:{1}'.format(hdfs_nm,hdfs_port), user= user if user else hdfs_user) return client -def get_file(hdfs_file): - client = _get_client() + +def get_file(hdfs_file, client=None): + if not client: + client = get_client() + with client.read(hdfs_file) as reader: results = reader.read() return results -def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False): - + +def upload_file(hdfs_fp, local_fp, overwrite=False, client=None): + if not client: + client = get_client() + + try: + result = client.upload(hdfs_fp, local_fp, overwrite=overwrite, progress=Progress) + return result + except HdfsError as err: + return err + + +def download_file(hdfs_path, local_path, overwrite=False, client=None): + if not client: + client = get_client() + + try: + client.download(hdfs_path, local_path, overwrite=overwrite) + return True + except HdfsError: + return False + + +def mkdir(hdfs_path, client=None): + if client is not None: + client = get_client() + + try: + client.makedirs(hdfs_path) + return True + except HdfsError: + return False + + +def put_file_csv(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None): + if not client: + client = get_client() + try: - client = _get_client() hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name) with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file) as writer: for item in hdfs_file_content: data = ','.join(str(d) for d in item) writer.write("{0}\n".format(data)) return True - + except HdfsError: return False -def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False): - + +def put_file_json(hdfs_file_content,hdfs_path,hdfs_file_name,append_file=False,overwrite_file=False, client=None): + if not client: + client = get_client() + try: - client = _get_client() hdfs_full_name = "{0}/{1}".format(hdfs_path,hdfs_file_name) with client.write(hdfs_full_name,append=append_file,overwrite=overwrite_file,encoding='utf-8') as writer: - dump(hdfs_file_content, writer) + dump(hdfs_file_content, writer) return True except HdfsError: return False - -def delete_folder(hdfs_file,user=None): - client = _get_client(user) - client.delete(hdfs_file,recursive=True) -def list_dir(hdfs_path): +def delete_folder(hdfs_file, user=None, client=None): + if not client: + client = get_client() + + try: + client.delete(hdfs_file,recursive=True) + except HdfsError: + return False + + +def check_dir(hdfs_path, client=None): + """ + Returns True if directory exists + Returns False if directory does not exist + : param hdfs_path: path to check + : object client: hdfs client object for persistent connection + """ + if not client: + client = get_client() + + result = client.list(hdfs_path) + if None not in result: + return True + else: + return False + + +def list_dir(hdfs_path, client=None): + if not client: + client = get_client() + try: - client = _get_client() return client.list(hdfs_path) except HdfsError: return {} -def file_exists(hdfs_path,file_name): - files = list_dir(hdfs_path) + +def file_exists(hdfs_path, file_name, client=None): + if not client: + client = get_client() + + files = list_dir(client, hdfs_path) if str(file_name) in files: - return True + return True else: return False http://git-wip-us.apache.org/repos/asf/incubator-spot/blob/d1f5a67f/spot-oa/api/resources/impala_engine.py ---------------------------------------------------------------------- diff --git a/spot-oa/api/resources/impala_engine.py b/spot-oa/api/resources/impala_engine.py index b7d0148..542bbd0 100644 --- a/spot-oa/api/resources/impala_engine.py +++ b/spot-oa/api/resources/impala_engine.py @@ -15,15 +15,33 @@ # limitations under the License. # from impala.dbapi import connect -import api.resources.configurator as Config +import api.resources.configurator as config + def create_connection(): - impala_host, impala_port = Config.impala() - db = Config.db() - conn = connect(host=impala_host, port=int(impala_port),database=db) + impala_host, impala_port = config.impala() + conf = {} + + # TODO: if using hive, kerberos service name must be changed, impyla sets 'impala' as default + service_name = {'kerberos_service_name': 'impala'} + + if config.kerberos_enabled(): + principal, keytab, sasl_mech, security_proto = config.kerberos() + conf.update({'auth_mechanism': 'GSSAPI', + }) + + if config.ssl_enabled(): + ssl_verify, ca_location, cert, key = config.ssl() + conf.update({'ca_cert': cert, + 'use_ssl': ssl_verify + }) + + db = config.db() + conn = connect(host=impala_host, port=int(impala_port), database=db, **conf) return conn.cursor() + def execute_query(query,fetch=False): impala_cursor = create_connection() @@ -31,6 +49,7 @@ def execute_query(query,fetch=False): return impala_cursor if not fetch else impala_cursor.fetchall() + def execute_query_as_list(query): query_results = execute_query(query) @@ -46,5 +65,3 @@ def execute_query_as_list(query): row_result = {} return results - -