This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ccb5ce934c TrinoHook add authentication via JWT token and
Impersonation (#23116)
ccb5ce934c is described below
commit ccb5ce934cd521dc3af74b83623ca0843211be62
Author: Pragya <[email protected]>
AuthorDate: Sat May 7 01:15:33 2022 +0530
TrinoHook add authentication via JWT token and Impersonation (#23116)
* added trino authentication via JWT token and impersonation
* added test cases for jwt verification in trino
* added documenation for trino hook
---
airflow/providers/trino/hooks/trino.py | 12 ++++-
.../apache-airflow-providers-trino/connections.rst | 52 ++++++++++++++++++++++
docs/apache-airflow-providers-trino/index.rst | 1 +
tests/providers/trino/hooks/test_trino.py | 16 +++++++
4 files changed, 79 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/trino/hooks/trino.py
b/airflow/providers/trino/hooks/trino.py
index 5e48ea2338..a6295f34ea 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -96,10 +96,13 @@ class TrinoHook(DbApiHook):
db = self.get_connection(self.trino_conn_id) # type:
ignore[attr-defined]
extra = db.extra_dejson
auth = None
+ user = db.login
if db.password and extra.get('auth') == 'kerberos':
raise AirflowException("Kerberos authorization doesn't support
password.")
elif db.password:
auth = trino.auth.BasicAuthentication(db.login, db.password) #
type: ignore[attr-defined]
+ elif extra.get('auth') == 'jwt':
+ auth = trino.auth.JWTAuthentication(token=extra.get('jwt__token'))
elif extra.get('auth') == 'kerberos':
auth = trino.auth.KerberosAuthentication( # type:
ignore[attr-defined]
config=extra.get('kerberos__config',
os.environ.get('KRB5_CONFIG')),
@@ -115,18 +118,23 @@ class TrinoHook(DbApiHook):
ca_bundle=extra.get('kerberos__ca_bundle'),
)
+ if _boolify(extra.get('impersonate_as_owner', False)):
+ user = os.getenv('AIRFLOW_CTX_DAG_OWNER', None)
+ if user is None:
+ user = db.login
http_headers = {"X-Trino-Client-Info": generate_trino_client_info()}
trino_conn = trino.dbapi.connect(
host=db.host,
port=db.port,
- user=db.login,
+ user=user,
source=extra.get('source', 'airflow'),
http_scheme=extra.get('protocol', 'http'),
http_headers=http_headers,
catalog=extra.get('catalog', 'hive'),
schema=db.schema,
auth=auth,
- isolation_level=self.get_isolation_level(), # type:
ignore[func-returns-value]
+ # type: ignore[func-returns-value]
+ isolation_level=self.get_isolation_level(),
verify=_boolify(extra.get('verify', True)),
)
diff --git a/docs/apache-airflow-providers-trino/connections.rst
b/docs/apache-airflow-providers-trino/connections.rst
new file mode 100644
index 0000000000..111205b8a9
--- /dev/null
+++ b/docs/apache-airflow-providers-trino/connections.rst
@@ -0,0 +1,52 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Apache Trino Connection
+=======================
+
+The Apache Trino connection type enables connection to Trino which is a
distributed SQL query engine designed to query large data sets distributed over
one or more heterogeneous data sources.
+
+Default Connection IDs
+----------------------
+
+Trino Hook uses the parameter ``trino_conn_id`` for Connection IDs and the
value of the parameter as ``trino_default`` by default.
+Trino Hook supports multiple authentication types to ensure all users of the
system are authenticated, the parameter ``auth`` can be set to enable
authentication. The value of the parameter is ``None`` by default.
+
+Configuring the Connection
+--------------------------
+Host
+ The host to connect to, it can be ``local``, ``yarn`` or an URL.
+
+Port
+ Specify the port in case of host is an URL.
+
+Login
+ Effective user for connection.
+
+Password
+ This can be to pass to enable Basic Authentication. This is an optional
parameter and is not required if a different authentication mechanism is used.
+
+Extra (optional, connection parameters)
+ Specify the extra parameters (as json dictionary) that can be used in
Trino connection. The following parameters out of the standard python
parameters are supported:
+
+ * ``auth`` - Specifies which type of authentication needs to be enabled.
The value can be ``kerberos``, ``jwt``.
+ * ``impersonate_as_owner`` - Boolean that allows to set
``AIRFLOW_CTX_DAG_OWNER`` as a user of the connection.
+
+ The following extra parameters can be used to configure authentication:
+
+ * ``jwt__token`` - If jwt authentication should be used, the value of
token is given via this parameter.
+ * ``kerberos__service_name``, ``kerberos__config``,
``kerberos__mutual_authentication``, ``kerberos__force_preemptive``,
``kerberos__hostname_override``, ``kerberos__sanitize_mutual_error_response``,
``kerberos__principal``,``kerberos__delegate``, ``kerberos__ca_bundle`` - These
parameters can be set when enabling ``kerberos`` authentication.
diff --git a/docs/apache-airflow-providers-trino/index.rst
b/docs/apache-airflow-providers-trino/index.rst
index 239ec03cd8..785f262941 100644
--- a/docs/apache-airflow-providers-trino/index.rst
+++ b/docs/apache-airflow-providers-trino/index.rst
@@ -27,6 +27,7 @@ Content
:caption: Guides
TrinoTransferOperator types <operators/transfer/gcs_to_trino>
+ Connection types <connections>
.. toctree::
:maxdepth: 1
diff --git a/tests/providers/trino/hooks/test_trino.py
b/tests/providers/trino/hooks/test_trino.py
index 883794bf0f..d721706858 100644
--- a/tests/providers/trino/hooks/test_trino.py
+++ b/tests/providers/trino/hooks/test_trino.py
@@ -34,6 +34,7 @@ HOOK_GET_CONNECTION =
'airflow.providers.trino.hooks.trino.TrinoHook.get_connect
BASIC_AUTHENTICATION =
'airflow.providers.trino.hooks.trino.trino.auth.BasicAuthentication'
KERBEROS_AUTHENTICATION =
'airflow.providers.trino.hooks.trino.trino.auth.KerberosAuthentication'
TRINO_DBAPI_CONNECT = 'airflow.providers.trino.hooks.trino.trino.dbapi.connect'
+JWT_AUTHENTICATION =
'airflow.providers.trino.hooks.trino.trino.auth.JWTAuthentication'
class TestTrinoHookConn:
@@ -94,6 +95,21 @@ class TestTrinoHookConn:
):
TrinoHook().get_conn()
+ @patch(JWT_AUTHENTICATION)
+ @patch(TRINO_DBAPI_CONNECT)
+ @patch(HOOK_GET_CONNECTION)
+ def test_get_conn_jwt_auth(self, mock_get_connection, mock_connect,
mock_jwt_auth):
+ extras = {
+ 'auth': 'jwt',
+ 'jwt__token': 'TEST_JWT_TOKEN',
+ }
+ self.set_get_connection_return_value(
+ mock_get_connection,
+ extra=json.dumps(extras),
+ )
+ TrinoHook().get_conn()
+ self.assert_connection_called_with(mock_connect, auth=mock_jwt_auth)
+
@patch(KERBEROS_AUTHENTICATION)
@patch(TRINO_DBAPI_CONNECT)
@patch(HOOK_GET_CONNECTION)