Taragolis commented on code in PR #27866: URL: https://github.com/apache/airflow/pull/27866#discussion_r1036387724
########## airflow/providers/cloudera/hooks/cde_hook.py: ########## @@ -0,0 +1,375 @@ +# Licensed to the Apache Software Foundation (ASF) under one Review Comment: Since Airflow 2.0 and [AIP-21](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-21%3A+Changes+in+import+paths) modules should not contain suffixes `_hook`, `_operator`, `_sensor` ########## airflow/providers/cloudera/operators/cde_operator.py: ########## @@ -0,0 +1,252 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import time +from typing import Any + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.cloudera.hooks.cde_hook import CdeHook, CdeHookException + + +class CdeRunJobOperator(BaseOperator): + """ + Runs a job in a CDE Virtual Cluster. The ``CdeRunJobOperator`` runs the + named job with optional variables and overrides. The job and its resources + must have already been created via the specified virtual cluster jobs API. + + The virtual cluster API endpoint is specified by setting the + ``connection_id`` parameter. The "local" virtual cluster jobs API is the + default and has a special value of ``cde_runtime_api``. Authentication to + the API is handled automatically and any jobs in the DAG will run as the + user who submitted the DAG. + + Jobs can be defined in a virtual cluster with variable placeholders, + e.g. ``{{ inputdir }}``. Currently the fields supporting variable expansion + are Spark application name, Spark arguments, and Spark configurations. + Variables can be passed to the operator as a dictionary of key-value string + pairs. In addition to any user variables passed via the ``variables`` + parameter, the following standard Airflow macros are automatically + populated as variables by the operator (see + https://airflow.apache.org/docs/stable/macros-ref): + + * ``ds``: the execution date as ``YYYY-MM-DD`` + * ``ds_nodash``: the execution date as ``YYYYMMDD`` + * ``ts``: execution date in ISO 8601 format + * ``ts_nodash``: execution date in ISO 8601 format without '-', ':' or + timezone information + * ``run_id``: the run_id of the current DAG run + + If a CDE job needs to run with a different configuration, a task can be + configured with runtime overrides. For example to override the Spark + executor memory and cores for a task and to supply an additional config + parameter you could supply the following dictionary can be supplied to + the ``overrides`` parameter:: + + { + 'spark': { + 'executorMemory': '8g', + 'executorCores': '4', + 'conf': { + 'spark.kubernetes.memoryOverhead': '2048' + } + } + } + + See the CDE Jobs API documentation for the full list of parameters that + can be overridden. + + Via the ``wait`` parameter, jobs can either be submitted asynchronously to + the API (``wait=False``) or the task can wait until the job is complete + before exiting the task (default is ``wait=True``). If ``wait`` is + ``True``, the task exit status will reflect the final status of the + submitted job (or the task will fail on timeout if specified). If ``wait`` + is ``False`` the task status will reflect whether the job was successfully + submitted to the API or not. + + Note: all parameters below can also be provided through the + ``default_args`` field of the DAG. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CdeRunJobOperator` + + :param job_name: the name of the job in the target cluster, required + :param connection_id: the Airflow connection id for the target API + endpoint, default value ``'cde_runtime_api'`` + :param variables: a dictionary of key-value pairs to populate in the + job configuration, default empty dict. + :param overrides: a dictionary of key-value pairs to override in the + job configuration, default empty dict. + :param wait: if set to true, the operator will wait for the job to + complete in the target cluster. The task exit status will reflect the + status of the completed job. Default ``True`` + :param timeout: the maximum time to wait in seconds for the job to + complete if ``wait=True``. If set to ``None``, 0 or a negative number, + the task will never be timed out. Default ``0``. + :param job_poll_interval: the interval in seconds at which the target API + is polled for the job status. Default ``10``. + :param api_retries: the number of times to retry an API request in the event + of a connection failure or non-fatal API error. Default ``9``. + """ + + template_fields = ("variables", "overrides") + ui_color = "#385f70" + ui_fgcolor = "#fff" + + DEFAULT_WAIT = True + DEFAULT_POLL_INTERVAL = 10 + DEFAULT_TIMEOUT = 0 + DEFAULT_RETRIES = 9 + DEFAULT_CONNECTION_ID = "cde_runtime_api" + DEFAULT_API_TIMEOUT = 30 + + def __init__( # pylint: disable=too-many-arguments + self, + job_name: str, + variables: dict[str, Any] | None = None, + overrides: dict[str, Any] | None = None, + connection_id: str = DEFAULT_CONNECTION_ID, + wait: bool = DEFAULT_WAIT, + timeout: int = DEFAULT_TIMEOUT, + job_poll_interval: int = DEFAULT_POLL_INTERVAL, + api_retries: int = DEFAULT_RETRIES, + api_timeout: int = DEFAULT_API_TIMEOUT, + user=None, + **kwargs, + ): + super().__init__(**kwargs) + self.job_name = job_name + self.variables = variables or {} + self.overrides = overrides or {} + self.connection_id = connection_id + self.wait = wait + self.timeout = timeout + self.job_poll_interval = job_poll_interval + if user: + self.log.warning("Proxy user is not yet supported. Setting it to None.") + self.user = None + self.api_retries = api_retries + self.api_timeout = api_timeout + if not self.job_name: + raise ValueError("job_name required") + # Set internal state + self._hook = self.get_hook() Review Comment: Seems like this might add additional Metadabase / Secrets Backend usage. `CdeHook` itself obtain credentials during `__init__` so that mean operator also will get connection during each DagProcessor parsing loop. Better to move this to cached property which only accessible from `execute` method, for example https://github.com/apache/airflow/blob/430e930902792fc37cdd2c517783f7dd544fbebf/airflow/providers/amazon/aws/operators/athena.py#L98-L103 ########## tests/providers/cloudera/model/test_connection.py: ########## @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests related to the CDE connection""" + +from __future__ import annotations + +import json +import unittest + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from airflow.providers.cloudera.model.connection import CdeConnection + + +class CdeConnectionTest(unittest.TestCase): + Review Comment: We try to use native `pytest`, there are exists some old tests which use `unittest.TestCase` for historical reason (sooner or later we would replace all of them) Some suggestion - Get rid of `unittests.TestCase` class and **TestCase.assert*** methods - Convert classes **setUp*** and **tearDown*** to [appropriate pytest methods](https://docs.pytest.org/en/6.2.x/xunit_setup.html#classic-xunit-style-setup) ########## airflow/providers/cloudera/operators/cdw_operator.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import re + +from airflow.models import BaseOperator +from airflow.providers.cloudera.hooks.cdw_hook import CdwHook +from airflow.utils.operator_helpers import context_to_airflow_vars + + +class CdwExecuteQueryOperator(BaseOperator): + """ + Executes hql code in CDW. This class inherits behavior + from HiveOperator, and instantiates a CdwHook to do the work. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CdwExecuteQueryOperator` + """ + + template_fields = ("hql", "schema", "hiveconfs") + template_ext = ( + ".hql", + ".sql", + ) + ui_color = "#522a9f" + ui_fgcolor = "#fff" + + def __init__( + self, + hql, + schema="default", + hiveconfs=None, + hiveconf_jinja_translate=False, + cli_conn_id="hive_cli_default", + jdbc_driver=None, + # new CDW args + use_proxy_user=False, # pylint: disable=unused-argument + query_isolation=True, # TODO: implement + *args, Review Comment: Airflow operators as well as sensors supports only keyword arguments so `*arg` redundant ########## airflow/providers/cloudera/hooks/cdw_hook.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import csv +import os +import subprocess +import time +from io import StringIO +from tempfile import NamedTemporaryFile + +from airflow.exceptions import AirflowException +from airflow.providers.apache.hive.hooks.hive import HiveCliHook # type: ignore +from airflow.utils.file import TemporaryDirectory +from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING + +HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"] +JDBC_BACKEND_HIVE = "hive2" +JDBC_BACKEND_IMPALA = "impala" + + +def get_context_from_env_var(): + """ + Extract context from env variable, e.g. dag_id, task_id and execution_date, + so that they can be used inside BashOperator and PythonOperator. + + :return: The context of interest. + """ + return { + format_map["default"]: os.environ.get(format_map["env_var_format"], "") + for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values() + } + + +class CdwHook(HiveCliHook): + """Simple CDW hive cli hook which extends the functionality of HiveCliHook + in order to conform the parameter needs. + + :param cli_conn_id: airflow connection id to be used. + :param query_isolation: controls whether to use cdw's query isolation feature. + Only hive warehouses support this at the moment. + :jdbc_driver: a safety valve for JDBC driver class. It's not supposed to be changed by default as + CdwHook guesses and uses the correct driver for impala. The environment provides both JDBC 4.1 + and JDBC 4.2 driver. Currently, JDBC 4.1 is used for cdw. + For hive, the driver class is not defined at all in beeline cli. + """ + + def __init__( + self, + cli_conn_id=None, + query_isolation=True, + jdbc_driver="com.cloudera.impala.jdbc41.Driver", + ): + super().__init__(cli_conn_id) + self.conn = self.get_connection(cli_conn_id) + self.query_isolation = query_isolation + self.jdbc_driver = jdbc_driver if jdbc_driver is not None else "com.cloudera.impala.jdbc41.Driver" + self.sub_process = None + + def get_cli_cmd(self): + """This is supposed to be visible for testing.""" + return self._prepare_cli_cmd() + + def _prepare_cli_cmd(self, hide_secrets=False): + """ + This function creates the command list from available information. + :param hide_secrets: whether to mask secrets with asterisk + """ + conn = self.conn + cmd_extra = [] + + hive_bin = "beeline" # only beeline is supported as client while connecting to CDW + jdbc_backend = CdwHook.get_jdbc_backend(conn) + + jdbc_url = f"jdbc:{jdbc_backend}://{conn.host}{CdwHook.get_port_string(conn)}/{conn.schema}" + + # HTTP+SSL is default for CDW, but it can be overwritten in connection extra params if needed + if jdbc_backend == JDBC_BACKEND_IMPALA: + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "AuthMech", "3") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "transportMode", "http") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "httpPath", "cliservice") + jdbc_url = self.add_parameter_to_jdbc_url( + conn.extra_dejson, jdbc_url, "ssl", CdwHook.get_ssl_parameter(conn) + ) + + if jdbc_backend == JDBC_BACKEND_IMPALA: + cmd_extra += ["-d", self.jdbc_driver] + + cmd_extra += ["-u", jdbc_url] + if conn.login: + cmd_extra += ["-n", conn.login] + if conn.password: + cmd_extra += ["-p", conn.password if not hide_secrets else "********"] + + self.add_extra_parameters(jdbc_backend, cmd_extra) + + return [hive_bin] + cmd_extra + + def add_extra_parameters(self, jdbc_backend, cmd_extra): + """ + Adds extra parameters to the beeline command in addition to the basic, needed ones. + This can be overridden in subclasses in order to change beeline behavior. + """ + # this hive option is supposed to enforce query isolation regardless + # of the initial settings used while creating the virtual warehouse + if self.query_isolation and jdbc_backend == JDBC_BACKEND_HIVE: + cmd_extra += ["--hiveconf", "hive.query.isolation.scan.size.threshold=0B"] + cmd_extra += ["--hiveconf", "hive.query.results.cache.enabled=false"] + cmd_extra += [ + "--hiveconf", + "hive.auto.convert.join.noconditionaltask.size=2505397589", + ] + + @staticmethod + def get_jdbc_backend(conn): + """ + Tries to guess the underlying database from connection host. In CDW, JDBC urls are like below: + hive: + - hs2-lbodor-airflow-hive.env-xkg48s.dwx.dev.cldr.work + impala: + - impala-proxy-lbodor-airflow-impala.env-xkg48s.dwx.dev.cldr.work:443 + - coordinator-lbodor-impala-test.env-xkg48s.dwx.dev.cldr.work:443 + So this method returns the database kind string which can be used in jdbc string: + hive: 'hive2' + impala: 'impala' + """ + return ( + JDBC_BACKEND_IMPALA + if (conn.host.find("coordinator-") == 0 or conn.host.find("impala-proxy") == 0) + else JDBC_BACKEND_HIVE + ) + + @staticmethod + def get_port_string(conn): + """ + hive: '' + impala: ':443' + """ + backend = CdwHook.get_jdbc_backend(conn) + return ":443" if backend == JDBC_BACKEND_IMPALA else "" + + @staticmethod + def get_ssl_parameter(conn): + """ + hive: 'true' + impala: '1' + """ + backend = CdwHook.get_jdbc_backend(conn) + return "1" if backend == JDBC_BACKEND_IMPALA else "true" + + def run_cli(self, hql, schema="default", verbose=True, hive_conf=None): + """Copied from hive hook, but removed unnecessary parts, e.g. mapred queue.""" + conn = self.conn + schema = schema or conn.schema + if schema: + hql = f"USE {schema};\n{hql}" + + with TemporaryDirectory(prefix="airflow_hiveop_") as tmp_dir: + with NamedTemporaryFile(dir=tmp_dir) as f: + hql = hql + "\n" + f.write(hql.encode("UTF-8")) + f.flush() + hive_cmd = self._prepare_cli_cmd() + env_context = get_context_from_env_var() + # Only extend the hive_conf if it is defined. + if hive_conf: + env_context.update(hive_conf) + hive_conf_params = self._prepare_hiveconf(env_context) + hive_cmd.extend(hive_conf_params) + hive_cmd.extend(["-f", f.name]) + + if verbose: + self.log.info("%s", " ".join(self._prepare_cli_cmd(hide_secrets=True))) + sub_process = subprocess.Popen( + hive_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=tmp_dir, + close_fds=True, + start_new_session=True, + ) + self.sub_process = sub_process + stdout = "" + while True: + line = sub_process.stdout.readline() + if not line: + break + stdout += line.decode("UTF-8") + if verbose: + self.log.info(line.decode("UTF-8").strip()) + sub_process.wait() + + if sub_process.returncode: + raise AirflowException(stdout) + + return stdout + + def kill(self): + if hasattr(self, "sub_process") and self.sub_process is not None: + if self.sub_process.poll() is None: + print("Killing the Hive job") + self.sub_process.terminate() + time.sleep(60) + self.sub_process.kill() + + @staticmethod + def add_parameter_to_jdbc_url(extra_dejson, jdbc_url, parameter_name, default_value=None): + """ + Appends a parameter to jdbc url if found in connection json extras + or there is a not None default value. + """ + if extra_dejson is None or extra_dejson.get(parameter_name, default_value) is None: + return jdbc_url + + return jdbc_url + f";{parameter_name}={extra_dejson.get(parameter_name, default_value)}" + + +class CdwHiveMetastoreHook(CdwHook): + """A hive metastore hook which should behave the same as HiveMetastoreHook, + but instead of a kerberized, binary thrift connection it uses beeline as the client, + which connects to sys database. + """ + + def __init__(self, cli_conn_id="metastore_default"): + """ + In CdwHiveMetastoreHook this is supposed to be a beeline connection, + pointing to sys schema, so the conn should point to a hive cli wrapper connection in airflow, + similarly to CdwHook's cli_conn_id. + """ + super().__init__(cli_conn_id=cli_conn_id) + self.conn.schema = "sys" # metastore database + + def check_for_partition(self, schema, table, partition): + """ + Checks whether a partition exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Expression that matches the partitions to check for + :rtype: bool + """ + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name, partitions.part_name as part_name " + "from partitions " + "left outer join tbls on tbls.tbl_id = partitions.tbl_id " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{schema}' " + f"and tbls.tbl_name = '{table}' " + f"and partitions.part_name = '{partition}';" + ) + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + results_without_header = CdwHiveMetastoreHook.get_results_without_header( + result_lines, "db_name,tbl_name,part_name" + ) + + self.log.info("partitions: %s", results_without_header) + return len(results_without_header) > 0 + + def check_for_named_partition(self, schema, table, partition): + """ + Checks whether a partition with a given name exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Name of the partitions to check for (eg `a=b/c=d`) + :rtype: bool + """ + raise Exception("TODO IMPLEMENT") Review Comment: ```suggestion raise NotImplementedError ``` ########## airflow/providers/cloudera/hooks/cdw_hook.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import csv +import os +import subprocess +import time +from io import StringIO +from tempfile import NamedTemporaryFile + +from airflow.exceptions import AirflowException +from airflow.providers.apache.hive.hooks.hive import HiveCliHook # type: ignore +from airflow.utils.file import TemporaryDirectory +from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING + +HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"] +JDBC_BACKEND_HIVE = "hive2" +JDBC_BACKEND_IMPALA = "impala" + + +def get_context_from_env_var(): + """ + Extract context from env variable, e.g. dag_id, task_id and execution_date, + so that they can be used inside BashOperator and PythonOperator. + + :return: The context of interest. + """ + return { + format_map["default"]: os.environ.get(format_map["env_var_format"], "") + for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values() + } + + +class CdwHook(HiveCliHook): + """Simple CDW hive cli hook which extends the functionality of HiveCliHook + in order to conform the parameter needs. + + :param cli_conn_id: airflow connection id to be used. + :param query_isolation: controls whether to use cdw's query isolation feature. + Only hive warehouses support this at the moment. + :jdbc_driver: a safety valve for JDBC driver class. It's not supposed to be changed by default as + CdwHook guesses and uses the correct driver for impala. The environment provides both JDBC 4.1 + and JDBC 4.2 driver. Currently, JDBC 4.1 is used for cdw. + For hive, the driver class is not defined at all in beeline cli. + """ + + def __init__( + self, + cli_conn_id=None, + query_isolation=True, + jdbc_driver="com.cloudera.impala.jdbc41.Driver", + ): + super().__init__(cli_conn_id) + self.conn = self.get_connection(cli_conn_id) + self.query_isolation = query_isolation + self.jdbc_driver = jdbc_driver if jdbc_driver is not None else "com.cloudera.impala.jdbc41.Driver" + self.sub_process = None + + def get_cli_cmd(self): + """This is supposed to be visible for testing.""" + return self._prepare_cli_cmd() + + def _prepare_cli_cmd(self, hide_secrets=False): + """ + This function creates the command list from available information. + :param hide_secrets: whether to mask secrets with asterisk + """ + conn = self.conn + cmd_extra = [] + + hive_bin = "beeline" # only beeline is supported as client while connecting to CDW + jdbc_backend = CdwHook.get_jdbc_backend(conn) + + jdbc_url = f"jdbc:{jdbc_backend}://{conn.host}{CdwHook.get_port_string(conn)}/{conn.schema}" + + # HTTP+SSL is default for CDW, but it can be overwritten in connection extra params if needed + if jdbc_backend == JDBC_BACKEND_IMPALA: + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "AuthMech", "3") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "transportMode", "http") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "httpPath", "cliservice") + jdbc_url = self.add_parameter_to_jdbc_url( + conn.extra_dejson, jdbc_url, "ssl", CdwHook.get_ssl_parameter(conn) + ) + + if jdbc_backend == JDBC_BACKEND_IMPALA: + cmd_extra += ["-d", self.jdbc_driver] + + cmd_extra += ["-u", jdbc_url] + if conn.login: + cmd_extra += ["-n", conn.login] + if conn.password: + cmd_extra += ["-p", conn.password if not hide_secrets else "********"] + + self.add_extra_parameters(jdbc_backend, cmd_extra) + + return [hive_bin] + cmd_extra + + def add_extra_parameters(self, jdbc_backend, cmd_extra): + """ + Adds extra parameters to the beeline command in addition to the basic, needed ones. + This can be overridden in subclasses in order to change beeline behavior. + """ + # this hive option is supposed to enforce query isolation regardless + # of the initial settings used while creating the virtual warehouse + if self.query_isolation and jdbc_backend == JDBC_BACKEND_HIVE: + cmd_extra += ["--hiveconf", "hive.query.isolation.scan.size.threshold=0B"] + cmd_extra += ["--hiveconf", "hive.query.results.cache.enabled=false"] + cmd_extra += [ + "--hiveconf", + "hive.auto.convert.join.noconditionaltask.size=2505397589", + ] + + @staticmethod + def get_jdbc_backend(conn): + """ + Tries to guess the underlying database from connection host. In CDW, JDBC urls are like below: + hive: + - hs2-lbodor-airflow-hive.env-xkg48s.dwx.dev.cldr.work + impala: + - impala-proxy-lbodor-airflow-impala.env-xkg48s.dwx.dev.cldr.work:443 + - coordinator-lbodor-impala-test.env-xkg48s.dwx.dev.cldr.work:443 + So this method returns the database kind string which can be used in jdbc string: + hive: 'hive2' + impala: 'impala' + """ + return ( + JDBC_BACKEND_IMPALA + if (conn.host.find("coordinator-") == 0 or conn.host.find("impala-proxy") == 0) + else JDBC_BACKEND_HIVE + ) + + @staticmethod + def get_port_string(conn): + """ + hive: '' + impala: ':443' + """ + backend = CdwHook.get_jdbc_backend(conn) + return ":443" if backend == JDBC_BACKEND_IMPALA else "" + + @staticmethod + def get_ssl_parameter(conn): + """ + hive: 'true' + impala: '1' + """ + backend = CdwHook.get_jdbc_backend(conn) + return "1" if backend == JDBC_BACKEND_IMPALA else "true" + + def run_cli(self, hql, schema="default", verbose=True, hive_conf=None): + """Copied from hive hook, but removed unnecessary parts, e.g. mapred queue.""" + conn = self.conn + schema = schema or conn.schema + if schema: + hql = f"USE {schema};\n{hql}" + + with TemporaryDirectory(prefix="airflow_hiveop_") as tmp_dir: + with NamedTemporaryFile(dir=tmp_dir) as f: + hql = hql + "\n" + f.write(hql.encode("UTF-8")) + f.flush() + hive_cmd = self._prepare_cli_cmd() + env_context = get_context_from_env_var() + # Only extend the hive_conf if it is defined. + if hive_conf: + env_context.update(hive_conf) + hive_conf_params = self._prepare_hiveconf(env_context) + hive_cmd.extend(hive_conf_params) + hive_cmd.extend(["-f", f.name]) + + if verbose: + self.log.info("%s", " ".join(self._prepare_cli_cmd(hide_secrets=True))) + sub_process = subprocess.Popen( + hive_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=tmp_dir, + close_fds=True, + start_new_session=True, + ) + self.sub_process = sub_process + stdout = "" + while True: + line = sub_process.stdout.readline() + if not line: + break + stdout += line.decode("UTF-8") + if verbose: + self.log.info(line.decode("UTF-8").strip()) + sub_process.wait() + + if sub_process.returncode: + raise AirflowException(stdout) + + return stdout + + def kill(self): + if hasattr(self, "sub_process") and self.sub_process is not None: + if self.sub_process.poll() is None: + print("Killing the Hive job") + self.sub_process.terminate() + time.sleep(60) + self.sub_process.kill() + + @staticmethod + def add_parameter_to_jdbc_url(extra_dejson, jdbc_url, parameter_name, default_value=None): + """ + Appends a parameter to jdbc url if found in connection json extras + or there is a not None default value. + """ + if extra_dejson is None or extra_dejson.get(parameter_name, default_value) is None: + return jdbc_url + + return jdbc_url + f";{parameter_name}={extra_dejson.get(parameter_name, default_value)}" + + +class CdwHiveMetastoreHook(CdwHook): + """A hive metastore hook which should behave the same as HiveMetastoreHook, + but instead of a kerberized, binary thrift connection it uses beeline as the client, + which connects to sys database. + """ + + def __init__(self, cli_conn_id="metastore_default"): + """ + In CdwHiveMetastoreHook this is supposed to be a beeline connection, + pointing to sys schema, so the conn should point to a hive cli wrapper connection in airflow, + similarly to CdwHook's cli_conn_id. + """ + super().__init__(cli_conn_id=cli_conn_id) + self.conn.schema = "sys" # metastore database + + def check_for_partition(self, schema, table, partition): + """ + Checks whether a partition exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Expression that matches the partitions to check for + :rtype: bool + """ + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name, partitions.part_name as part_name " + "from partitions " + "left outer join tbls on tbls.tbl_id = partitions.tbl_id " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{schema}' " + f"and tbls.tbl_name = '{table}' " + f"and partitions.part_name = '{partition}';" + ) + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + results_without_header = CdwHiveMetastoreHook.get_results_without_header( + result_lines, "db_name,tbl_name,part_name" + ) + + self.log.info("partitions: %s", results_without_header) + return len(results_without_header) > 0 + + def check_for_named_partition(self, schema, table, partition): + """ + Checks whether a partition with a given name exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Name of the partitions to check for (eg `a=b/c=d`) + :rtype: bool + """ + raise Exception("TODO IMPLEMENT") + + def get_table(self, table_name, db="default"): + """Get a metastore table object""" + if db == "default" and "." in table_name: + db, table_name = table_name.split(".")[:2] + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name" + "from tbls " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{db}' " + f"and tbls.tbl_name = '{table_name}' " + ) + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + tables = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + return tables + + def get_tables(self, db, pattern="*"): + """Get a metastore table object.""" + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name " + "from tbls " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{db}' " + f"and tbls.tbl_name like '{pattern.replace('*', '%')}' " + ) + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + tables = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + + self.log.info("tables: %s", tables) + return len(tables) > 0 + + def get_databases(self, pattern="*"): + """Get a metastore table object.""" + hql = f"select dbs.name from dbs where dbs.name LIKE '{pattern.replace('*', '%')}' " + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + databases = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + + self.log.info("databases: %s", databases) + return databases + + def get_partitions(self, schema, table_name, partition_filter=None): + """Returns a list of all partitions in a table.""" + raise Exception("TODO IMPLEMENT") Review Comment: ```suggestion raise NotImplementedError ``` ########## airflow/providers/cloudera/hooks/cde_hook.py: ########## @@ -0,0 +1,375 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Holds airflow hook functionalities for CDE clusters like submitting a CDE job, +checking its status or killing it. +""" + +from __future__ import annotations + +import os +from typing import Any + +import requests +import tenacity # type: ignore +from cloudera.cdp.model.cde import VirtualCluster +from cloudera.cdp.security import SecurityError +from cloudera.cdp.security.cde_security import BearerAuth, CdeApiTokenAuth, CdeTokenAuthResponse +from cloudera.cdp.security.cdp_security import CdpAccessKeyCredentials, CdpAccessKeyV2TokenAuth +from cloudera.cdp.security.token_cache import EncryptedFileTokenCacheStrategy + +from airflow.exceptions import AirflowException # type: ignore +from airflow.hooks.base import BaseHook # type: ignore +from airflow.providers.cloudera.hooks import CdpHookException +from airflow.providers.cloudera.model.connection import CdeConnection +from airflow.providers.http.hooks.http import HttpHook # type: ignore + + +class CdeHookException(CdpHookException): + """Root exception for the CdeHook which is used to handle any known exceptions""" + + +class CdeHook(BaseHook): # type: ignore + """A wrapper around the CDE Virtual Cluster REST API.""" + + conn_name_attr = "cde_conn_id" + conn_type = "cloudera_data_engineering" + hook_name = "Cloudera Data Engineering" + + @staticmethod + def get_ui_field_behaviour() -> dict: # pragma: no cover , since it is an Airflow-related feature + """Returns custom field behaviour""" + return { + "hidden_fields": ["schema", "port"], + "relabeling": { + "host": "Virtual Cluster API endpoint", + "login": "CDP Access Key", + "password": "CDP Private Key", + }, + } + + DEFAULT_CONN_ID = "cde_runtime_api" + # Gives a total of at least 2^8+2^7+...2=510 seconds of retry with exponential backoff + DEFAULT_NUM_RETRIES = 9 + DEFAULT_API_TIMEOUT = 30 + + def __init__( + self, + connection_id: str = DEFAULT_CONN_ID, + num_retries: int = DEFAULT_NUM_RETRIES, + api_timeout: int = DEFAULT_API_TIMEOUT, + ) -> None: + """ + Create a new CdeHook. The connection parameters are eagerly validated to highlight + any problems early. + + :param connection_id: The connection name for the target virtual cluster API + (default: {CdeHook.DEFAULT_CONN_ID}). + :param num_retries: The number of times API requests should be retried if a server-side + error or transport error is encountered (default: {CdeHook.DEFAULT_NUM_RETRIES}). + :param api_timeout: The timeout in seconds after which, if no response has been received + from the API, a request should be abandoned and retried + (default: {CdeHook.DEFAULT_API_TIMEOUT}). + """ + super().__init__(connection_id) + self.cde_conn_id = connection_id + airflow_connection = self.get_connection(self.cde_conn_id) + self.connection = CdeConnection.from_airflow_connection(airflow_connection) + self.num_retries = num_retries + self.api_timeout = api_timeout + + def _do_api_call( + self, method: str, endpoint: str, params: dict[str, Any] | None = None + ) -> dict[str, Any] | None: + """ + Execute the API call. Requests are retried for connection errors and server-side errors + using an exponential backoff. + + :param method: HTTP method + :param endpoint: URL path of REST endpoint, excluding the API prefix, e.g "/jobs/myjob/run". + If the endpoint does not start with '/' this will be added + :param params: A dictionary of parameters to send in either HTTP body as a JSON document + or as URL parameters for GET requests + :param body: A dictionary to send in the HTTP body as a JSON document + :return: The API response converted to a Python dictionary + or an AirflowException if the API returns an error + """ + if self.connection.proxy: + self.log.debug("Setting up proxy environment variables") + os.environ["HTTPS_PROXY"] = self.connection.proxy + os.environ["https_proxy"] = self.connection.proxy + + if self.connection.is_external(): + cde_token = self.get_cde_token() + else: + self.log.info("Using internal authentication mechanisms.") + + endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}" + if self.connection.is_internal(): + endpoint = self.connection.api_base_route + endpoint + + self.log.debug( + "Executing API call: (Method: %s, Endpoint: %s, Parameters: %s, Timeout: %s, Retries: %s)", + method, + endpoint, + params, + self.api_timeout, + self.num_retries, + ) + http = HttpHook(method.upper(), http_conn_id=self.cde_conn_id) + retry_handler = RetryHandler() + + try: + extra_options: dict[str, Any] = dict( + timeout=self.api_timeout, + # we check the response ourselves in RetryHandler + check_response=False, + ) + + if self.connection.insecure: + self.log.debug("Setting session verify to False") + extra_options = {**extra_options, "verify": False} + else: + ca_cert = self.connection.ca_cert_path + self.log.debug("ca_cert is %s", ca_cert) + if ca_cert: + self.log.debug("Setting session verify to %s", ca_cert) + extra_options = {**extra_options, "verify": ca_cert} + else: + # Ensures secure connection by default, it is False in Airflow 1 + extra_options = {**extra_options, "verify": True} + + # Small hack to override the insecure header property passed from the + # extra in HTTPHook, which is a boolean but must be a string to be part + # of the headers + request_extra_headers = {"insecure": str(self.connection.insecure)} + + common_kwargs: dict[str, Any] = dict( + _retry_args=dict( + wait=tenacity.wait_exponential(), + stop=tenacity.stop_after_attempt(self.num_retries), + retry=retry_handler, + ), + endpoint=endpoint, + extra_options=extra_options, + headers=request_extra_headers, + ) + + if self.connection.is_external(): + common_kwargs = {**common_kwargs, "auth": BearerAuth(cde_token)} + + if method.upper() == "GET": + response = http.run_with_advanced_retry(data=params, **common_kwargs) + else: + response = http.run_with_advanced_retry(json=params, **common_kwargs) + + if response.content is None: + return None + return response.json() + + except Exception as err: + msg = "API call returned error(s)" + msg = f"{msg}:[{','.join(retry_handler.errors)}]" if retry_handler.errors else msg + self.log.error(msg) + raise CdeHookException(err) from err + + def get_cde_token(self) -> str: + """ + Obtains valid CDE token through CDP access token + + Returns: + cde_token: a valid token for submitting request to the CDE Cluster + """ + self.log.debug("Starting CDE token acquisition") + access_key, private_key = ( + self.connection.access_key, + self.connection.private_key, + ) + vcluster_endpoint = self.connection.get_vcluster_jobs_api_url() + try: + cdp_cred = CdpAccessKeyCredentials(access_key, private_key) + cde_vcluster = VirtualCluster(vcluster_endpoint) + cdp_auth = CdpAccessKeyV2TokenAuth( + cde_vcluster.get_service_id(), + cdp_cred, + region=self.connection.region, + cdp_endpoint=self.connection.cdp_endpoint, + altus_iam_endpoint=self.connection.altus_iam_endpoint, + ) + + cache_mech_extra_kw = {} + cache_dir = self.connection.cache_dir + if cache_dir: + cache_mech_extra_kw = {"cache_dir": cache_dir} + + cache_mech = EncryptedFileTokenCacheStrategy( + CdeTokenAuthResponse, + encryption_key=cdp_auth.get_auth_secret(), + **cache_mech_extra_kw, + ) + + cde_auth = CdeApiTokenAuth( + cde_vcluster, + cdp_auth, + cache_mech, + custom_ca_certificate_path=self.connection.ca_cert_path, + insecure=self.connection.insecure, + ) + cde_token = cde_auth.get_cde_authentication_token().access_token + self.log.debug("CDE token successfully acquired") + if not self.connection.region: + # Save region, so that any subsequent calls would not need to infer it again + self.log.debug( + "Saving inferred region %s to connection with connection_id %s", + cdp_auth.region, + self.connection.conn_id, + ) + self.connection.save_region(cdp_auth.region) + self.log.debug( + "Inferred region %s successfully saved to connection with connection_id %s", + cdp_auth.region, + self.connection.conn_id, + ) + + except SecurityError as err: + self.log.error( + "Failed to get the cde auth token for the connection %s, error: %s", + self.cde_conn_id, + err, + ) + raise CdeHookException(err) from err + + return cde_token + + def submit_job( + self, + job_name: str, + variables: dict[str, Any] | None = None, + overrides: dict[str, Any] | None = None, + proxy_user: str | None = None, + ) -> int: + """ + Submit a job run request + + :param job_name: The name of the job definition to run (should already be + defined in the virtual cluster). + :param variables: Runtime variables to pass to job run + :param overrides: Overrides of job parameters for this run + :return: the job run ID for a successful submission or an AirflowException + :rtype: int + """ + if proxy_user: + # Not tested because it should have never been introduced in the first place + self.log.warning("Proxy user is not yet supported. Setting it to None.") # pragma: no cover + + body = dict( + variables=variables, + overrides=overrides, + # Shall be updated to proxy_user when we support this feature + user=None, + ) + response = self._do_api_call("POST", f"/jobs/{job_name}/run", body) + if response is None: + msg = f"Unexpected 'None' response for '{job_name}' job." + self.log.error(msg) + raise CdeHookException(msg=msg) + try: + job_run_id: int = response["id"] + except KeyError as err: + msg = f"Response for '{job_name}' job do not contain 'id'." + raise CdeHookException(err, msg) from err + return job_run_id + + def kill_job_run(self, run_id: int) -> None: + """ + Kill a running job + + :param run_id: the run ID of the job run + """ + self._do_api_call("POST", f"/job-runs/{run_id}/kill") + + def check_job_run_status(self, run_id: int) -> str: + """ + Check and return the status of a job run + + :param run_id: the run ID of the job run + :return: the job run status + :rtype: str + """ + response = self._do_api_call("GET", f"/job-runs/{run_id}") + if response is None: + msg = f"Unexpected 'None' response for '{run_id}' job run." + self.log.error(msg) + raise CdeHookException(msg=msg) + + try: + response_status: str = response["status"] + except KeyError as err: + msg = f"Response for '{run_id}' job run do not contain 'status'." + raise CdeHookException(err, msg) from err + return response_status + + def get_conn(self): # pylint: disable=missing-function-docstring; not required for CdeHook + raise NotImplementedError Review Comment: Seems like it is a good idea to move obtain connection to this method instead of get it in class constructor ```python airflow_connection = self.get_connection(self.cde_conn_id) self.connection = CdeConnection.from_airflow_connection(airflow_connection) ``` ########## airflow/providers/cloudera/operators/cdw_operator.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import re + +from airflow.models import BaseOperator +from airflow.providers.cloudera.hooks.cdw_hook import CdwHook +from airflow.utils.operator_helpers import context_to_airflow_vars + + +class CdwExecuteQueryOperator(BaseOperator): + """ + Executes hql code in CDW. This class inherits behavior + from HiveOperator, and instantiates a CdwHook to do the work. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CdwExecuteQueryOperator` + """ + + template_fields = ("hql", "schema", "hiveconfs") + template_ext = ( + ".hql", + ".sql", + ) + ui_color = "#522a9f" + ui_fgcolor = "#fff" + + def __init__( + self, + hql, + schema="default", + hiveconfs=None, + hiveconf_jinja_translate=False, + cli_conn_id="hive_cli_default", + jdbc_driver=None, + # new CDW args + use_proxy_user=False, # pylint: disable=unused-argument + query_isolation=True, # TODO: implement + *args, + **kwargs, + ): + + super().__init__(*args, **kwargs) + self.hql = hql + self.schema = schema + self.hiveconfs = hiveconfs or {} + self.hiveconf_jinja_translate = hiveconf_jinja_translate + self.run_as = None + self.cli_conn_id = cli_conn_id + self.jdbc_driver = jdbc_driver + self.query_isolation = query_isolation + # assigned lazily - just for consistency we can create the attribute with a + # `None` initial value, later it will be populated by the execute method. + # This also makes `on_kill` implementation consistent since it assumes `self.hook` + # is defined. + self.hook = None Review Comment: This also might move into cached property ########## docs/apache-airflow-providers-cloudera/index.rst: ########## @@ -0,0 +1,98 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-cloudera`` +============================================ + +Content +------- + +.. toctree:: + :maxdepth: 1 + :caption: Guides + + Connection types <connections/index> + Operators <operators/index> + Sensors <sensors/hive_partition> + +.. toctree:: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/cloudera/index> + +.. toctree:: + :hidden: + :caption: System tests + + System Tests <_api/tests/system/providers/cloudera/index> + +.. toctree:: + :maxdepth: 1 + :caption: Resources + + PyPI Repository <https://pypi.org/project/apache-airflow-providers-cloudera/> + Installing from sources <installing-providers-from-sources> + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + + +.. toctree:: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits <commits> + + +Package apache-airflow-providers-cloudera +------------------------------------------------------ + +`Cloudera <https://cloudera.com/>`__ + + +Release: 2.0.0 + Review Comment: Is it actually should be version 1.0.0? ########## airflow/providers/cloudera/operators/cdw_operator.py: ########## @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import re + +from airflow.models import BaseOperator +from airflow.providers.cloudera.hooks.cdw_hook import CdwHook +from airflow.utils.operator_helpers import context_to_airflow_vars + + +class CdwExecuteQueryOperator(BaseOperator): + """ + Executes hql code in CDW. This class inherits behavior + from HiveOperator, and instantiates a CdwHook to do the work. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:CdwExecuteQueryOperator` + """ + + template_fields = ("hql", "schema", "hiveconfs") + template_ext = ( + ".hql", + ".sql", + ) + ui_color = "#522a9f" + ui_fgcolor = "#fff" + + def __init__( + self, + hql, + schema="default", + hiveconfs=None, + hiveconf_jinja_translate=False, + cli_conn_id="hive_cli_default", + jdbc_driver=None, + # new CDW args + use_proxy_user=False, # pylint: disable=unused-argument + query_isolation=True, # TODO: implement Review Comment: Seems like this is not implemented yet and might be better not include it? ########## airflow/providers/cloudera/hooks/cdw_hook.py: ########## @@ -0,0 +1,388 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import csv +import os +import subprocess +import time +from io import StringIO +from tempfile import NamedTemporaryFile + +from airflow.exceptions import AirflowException +from airflow.providers.apache.hive.hooks.hive import HiveCliHook # type: ignore +from airflow.utils.file import TemporaryDirectory +from airflow.utils.operator_helpers import AIRFLOW_VAR_NAME_FORMAT_MAPPING + +HIVE_QUEUE_PRIORITIES = ["VERY_HIGH", "HIGH", "NORMAL", "LOW", "VERY_LOW"] +JDBC_BACKEND_HIVE = "hive2" +JDBC_BACKEND_IMPALA = "impala" + + +def get_context_from_env_var(): + """ + Extract context from env variable, e.g. dag_id, task_id and execution_date, + so that they can be used inside BashOperator and PythonOperator. + + :return: The context of interest. + """ + return { + format_map["default"]: os.environ.get(format_map["env_var_format"], "") + for format_map in AIRFLOW_VAR_NAME_FORMAT_MAPPING.values() + } + + +class CdwHook(HiveCliHook): + """Simple CDW hive cli hook which extends the functionality of HiveCliHook + in order to conform the parameter needs. + + :param cli_conn_id: airflow connection id to be used. + :param query_isolation: controls whether to use cdw's query isolation feature. + Only hive warehouses support this at the moment. + :jdbc_driver: a safety valve for JDBC driver class. It's not supposed to be changed by default as + CdwHook guesses and uses the correct driver for impala. The environment provides both JDBC 4.1 + and JDBC 4.2 driver. Currently, JDBC 4.1 is used for cdw. + For hive, the driver class is not defined at all in beeline cli. + """ + + def __init__( + self, + cli_conn_id=None, + query_isolation=True, + jdbc_driver="com.cloudera.impala.jdbc41.Driver", + ): + super().__init__(cli_conn_id) + self.conn = self.get_connection(cli_conn_id) + self.query_isolation = query_isolation + self.jdbc_driver = jdbc_driver if jdbc_driver is not None else "com.cloudera.impala.jdbc41.Driver" + self.sub_process = None + + def get_cli_cmd(self): + """This is supposed to be visible for testing.""" + return self._prepare_cli_cmd() + + def _prepare_cli_cmd(self, hide_secrets=False): + """ + This function creates the command list from available information. + :param hide_secrets: whether to mask secrets with asterisk + """ + conn = self.conn + cmd_extra = [] + + hive_bin = "beeline" # only beeline is supported as client while connecting to CDW + jdbc_backend = CdwHook.get_jdbc_backend(conn) + + jdbc_url = f"jdbc:{jdbc_backend}://{conn.host}{CdwHook.get_port_string(conn)}/{conn.schema}" + + # HTTP+SSL is default for CDW, but it can be overwritten in connection extra params if needed + if jdbc_backend == JDBC_BACKEND_IMPALA: + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "AuthMech", "3") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "transportMode", "http") + jdbc_url = self.add_parameter_to_jdbc_url(conn.extra_dejson, jdbc_url, "httpPath", "cliservice") + jdbc_url = self.add_parameter_to_jdbc_url( + conn.extra_dejson, jdbc_url, "ssl", CdwHook.get_ssl_parameter(conn) + ) + + if jdbc_backend == JDBC_BACKEND_IMPALA: + cmd_extra += ["-d", self.jdbc_driver] + + cmd_extra += ["-u", jdbc_url] + if conn.login: + cmd_extra += ["-n", conn.login] + if conn.password: + cmd_extra += ["-p", conn.password if not hide_secrets else "********"] + + self.add_extra_parameters(jdbc_backend, cmd_extra) + + return [hive_bin] + cmd_extra + + def add_extra_parameters(self, jdbc_backend, cmd_extra): + """ + Adds extra parameters to the beeline command in addition to the basic, needed ones. + This can be overridden in subclasses in order to change beeline behavior. + """ + # this hive option is supposed to enforce query isolation regardless + # of the initial settings used while creating the virtual warehouse + if self.query_isolation and jdbc_backend == JDBC_BACKEND_HIVE: + cmd_extra += ["--hiveconf", "hive.query.isolation.scan.size.threshold=0B"] + cmd_extra += ["--hiveconf", "hive.query.results.cache.enabled=false"] + cmd_extra += [ + "--hiveconf", + "hive.auto.convert.join.noconditionaltask.size=2505397589", + ] + + @staticmethod + def get_jdbc_backend(conn): + """ + Tries to guess the underlying database from connection host. In CDW, JDBC urls are like below: + hive: + - hs2-lbodor-airflow-hive.env-xkg48s.dwx.dev.cldr.work + impala: + - impala-proxy-lbodor-airflow-impala.env-xkg48s.dwx.dev.cldr.work:443 + - coordinator-lbodor-impala-test.env-xkg48s.dwx.dev.cldr.work:443 + So this method returns the database kind string which can be used in jdbc string: + hive: 'hive2' + impala: 'impala' + """ + return ( + JDBC_BACKEND_IMPALA + if (conn.host.find("coordinator-") == 0 or conn.host.find("impala-proxy") == 0) + else JDBC_BACKEND_HIVE + ) + + @staticmethod + def get_port_string(conn): + """ + hive: '' + impala: ':443' + """ + backend = CdwHook.get_jdbc_backend(conn) + return ":443" if backend == JDBC_BACKEND_IMPALA else "" + + @staticmethod + def get_ssl_parameter(conn): + """ + hive: 'true' + impala: '1' + """ + backend = CdwHook.get_jdbc_backend(conn) + return "1" if backend == JDBC_BACKEND_IMPALA else "true" + + def run_cli(self, hql, schema="default", verbose=True, hive_conf=None): + """Copied from hive hook, but removed unnecessary parts, e.g. mapred queue.""" + conn = self.conn + schema = schema or conn.schema + if schema: + hql = f"USE {schema};\n{hql}" + + with TemporaryDirectory(prefix="airflow_hiveop_") as tmp_dir: + with NamedTemporaryFile(dir=tmp_dir) as f: + hql = hql + "\n" + f.write(hql.encode("UTF-8")) + f.flush() + hive_cmd = self._prepare_cli_cmd() + env_context = get_context_from_env_var() + # Only extend the hive_conf if it is defined. + if hive_conf: + env_context.update(hive_conf) + hive_conf_params = self._prepare_hiveconf(env_context) + hive_cmd.extend(hive_conf_params) + hive_cmd.extend(["-f", f.name]) + + if verbose: + self.log.info("%s", " ".join(self._prepare_cli_cmd(hide_secrets=True))) + sub_process = subprocess.Popen( + hive_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=tmp_dir, + close_fds=True, + start_new_session=True, + ) + self.sub_process = sub_process + stdout = "" + while True: + line = sub_process.stdout.readline() + if not line: + break + stdout += line.decode("UTF-8") + if verbose: + self.log.info(line.decode("UTF-8").strip()) + sub_process.wait() + + if sub_process.returncode: + raise AirflowException(stdout) + + return stdout + + def kill(self): + if hasattr(self, "sub_process") and self.sub_process is not None: + if self.sub_process.poll() is None: + print("Killing the Hive job") + self.sub_process.terminate() + time.sleep(60) + self.sub_process.kill() + + @staticmethod + def add_parameter_to_jdbc_url(extra_dejson, jdbc_url, parameter_name, default_value=None): + """ + Appends a parameter to jdbc url if found in connection json extras + or there is a not None default value. + """ + if extra_dejson is None or extra_dejson.get(parameter_name, default_value) is None: + return jdbc_url + + return jdbc_url + f";{parameter_name}={extra_dejson.get(parameter_name, default_value)}" + + +class CdwHiveMetastoreHook(CdwHook): + """A hive metastore hook which should behave the same as HiveMetastoreHook, + but instead of a kerberized, binary thrift connection it uses beeline as the client, + which connects to sys database. + """ + + def __init__(self, cli_conn_id="metastore_default"): + """ + In CdwHiveMetastoreHook this is supposed to be a beeline connection, + pointing to sys schema, so the conn should point to a hive cli wrapper connection in airflow, + similarly to CdwHook's cli_conn_id. + """ + super().__init__(cli_conn_id=cli_conn_id) + self.conn.schema = "sys" # metastore database + + def check_for_partition(self, schema, table, partition): + """ + Checks whether a partition exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Expression that matches the partitions to check for + :rtype: bool + """ + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name, partitions.part_name as part_name " + "from partitions " + "left outer join tbls on tbls.tbl_id = partitions.tbl_id " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{schema}' " + f"and tbls.tbl_name = '{table}' " + f"and partitions.part_name = '{partition}';" + ) + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + results_without_header = CdwHiveMetastoreHook.get_results_without_header( + result_lines, "db_name,tbl_name,part_name" + ) + + self.log.info("partitions: %s", results_without_header) + return len(results_without_header) > 0 + + def check_for_named_partition(self, schema, table, partition): + """ + Checks whether a partition with a given name exists + + :param schema: Name of hive schema (database) @table belongs to + :param table: Name of hive table @partition belongs to + :partition: Name of the partitions to check for (eg `a=b/c=d`) + :rtype: bool + """ + raise Exception("TODO IMPLEMENT") + + def get_table(self, table_name, db="default"): + """Get a metastore table object""" + if db == "default" and "." in table_name: + db, table_name = table_name.split(".")[:2] + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name" + "from tbls " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{db}' " + f"and tbls.tbl_name = '{table_name}' " + ) + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + tables = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + return tables + + def get_tables(self, db, pattern="*"): + """Get a metastore table object.""" + hql = ( + "select dbs.name as db_name, tbls.tbl_name as tbl_name " + "from tbls " + "left outer join dbs on dbs.db_id = tbls.db_id " + f"where dbs.name = '{db}' " + f"and tbls.tbl_name like '{pattern.replace('*', '%')}' " + ) + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + tables = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + + self.log.info("tables: %s", tables) + return len(tables) > 0 + + def get_databases(self, pattern="*"): + """Get a metastore table object.""" + hql = f"select dbs.name from dbs where dbs.name LIKE '{pattern.replace('*', '%')}' " + + response = self.run_cli(hql, self.conn.schema, verbose=True, hive_conf=None) + result_lines = CdwHiveMetastoreHook.parse_csv_lines(response) + + databases = CdwHiveMetastoreHook.get_results_without_header(result_lines, "db_name,tbl_name") + + self.log.info("databases: %s", databases) + return databases + + def get_partitions(self, schema, table_name, partition_filter=None): + """Returns a list of all partitions in a table.""" + raise Exception("TODO IMPLEMENT") + + def max_partition(self, schema, table_name, field=None, filter_map=None): + """ + Returns the maximum value for all partitions with given field in a table. + If only one partition key exist in the table, the key will be used as field. + filter_map should be a partition_key:partition_value map and will be used to + filter out partitions. + + :param schema: schema name. + :param table_name: table name. + :param field: partition key to get max partition from. + :param filter_map: partition_key:partition_value map used for partition filtering. + """ + raise Exception("TODO IMPLEMENT") Review Comment: ```suggestion raise NotImplementedError ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
