This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ccb5ce934c TrinoHook add authentication via JWT token and 
Impersonation  (#23116)
ccb5ce934c is described below

commit ccb5ce934cd521dc3af74b83623ca0843211be62
Author: Pragya <[email protected]>
AuthorDate: Sat May 7 01:15:33 2022 +0530

    TrinoHook add authentication via JWT token and Impersonation  (#23116)
    
    * added trino authentication via JWT token and impersonation
    
    * added test cases for jwt verification in trino
    
    * added documenation for trino hook
---
 airflow/providers/trino/hooks/trino.py             | 12 ++++-
 .../apache-airflow-providers-trino/connections.rst | 52 ++++++++++++++++++++++
 docs/apache-airflow-providers-trino/index.rst      |  1 +
 tests/providers/trino/hooks/test_trino.py          | 16 +++++++
 4 files changed, 79 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/trino/hooks/trino.py 
b/airflow/providers/trino/hooks/trino.py
index 5e48ea2338..a6295f34ea 100644
--- a/airflow/providers/trino/hooks/trino.py
+++ b/airflow/providers/trino/hooks/trino.py
@@ -96,10 +96,13 @@ class TrinoHook(DbApiHook):
         db = self.get_connection(self.trino_conn_id)  # type: 
ignore[attr-defined]
         extra = db.extra_dejson
         auth = None
+        user = db.login
         if db.password and extra.get('auth') == 'kerberos':
             raise AirflowException("Kerberos authorization doesn't support 
password.")
         elif db.password:
             auth = trino.auth.BasicAuthentication(db.login, db.password)  # 
type: ignore[attr-defined]
+        elif extra.get('auth') == 'jwt':
+            auth = trino.auth.JWTAuthentication(token=extra.get('jwt__token'))
         elif extra.get('auth') == 'kerberos':
             auth = trino.auth.KerberosAuthentication(  # type: 
ignore[attr-defined]
                 config=extra.get('kerberos__config', 
os.environ.get('KRB5_CONFIG')),
@@ -115,18 +118,23 @@ class TrinoHook(DbApiHook):
                 ca_bundle=extra.get('kerberos__ca_bundle'),
             )
 
+        if _boolify(extra.get('impersonate_as_owner', False)):
+            user = os.getenv('AIRFLOW_CTX_DAG_OWNER', None)
+            if user is None:
+                user = db.login
         http_headers = {"X-Trino-Client-Info": generate_trino_client_info()}
         trino_conn = trino.dbapi.connect(
             host=db.host,
             port=db.port,
-            user=db.login,
+            user=user,
             source=extra.get('source', 'airflow'),
             http_scheme=extra.get('protocol', 'http'),
             http_headers=http_headers,
             catalog=extra.get('catalog', 'hive'),
             schema=db.schema,
             auth=auth,
-            isolation_level=self.get_isolation_level(),  # type: 
ignore[func-returns-value]
+            # type: ignore[func-returns-value]
+            isolation_level=self.get_isolation_level(),
             verify=_boolify(extra.get('verify', True)),
         )
 
diff --git a/docs/apache-airflow-providers-trino/connections.rst 
b/docs/apache-airflow-providers-trino/connections.rst
new file mode 100644
index 0000000000..111205b8a9
--- /dev/null
+++ b/docs/apache-airflow-providers-trino/connections.rst
@@ -0,0 +1,52 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Apache Trino Connection
+=======================
+
+The Apache Trino connection type enables connection to Trino which is a 
distributed SQL query engine designed to query large data sets distributed over 
one or more heterogeneous data sources.
+
+Default Connection IDs
+----------------------
+
+Trino Hook uses the parameter ``trino_conn_id`` for Connection IDs and the 
value of the parameter as ``trino_default`` by default.
+Trino Hook supports multiple authentication types to ensure all users of the 
system are authenticated, the parameter ``auth`` can be set to enable 
authentication. The value of the parameter is ``None`` by default.
+
+Configuring the Connection
+--------------------------
+Host
+    The host to connect to, it can be ``local``, ``yarn`` or an URL.
+
+Port
+    Specify the port in case of host is an URL.
+
+Login
+    Effective user for connection.
+
+Password
+    This can be to pass to enable Basic Authentication. This is an optional 
parameter and is not required if a different authentication mechanism is used.
+
+Extra (optional, connection parameters)
+    Specify the extra parameters (as json dictionary) that can be used in 
Trino connection. The following parameters out of the standard python 
parameters are supported:
+
+    * ``auth`` - Specifies which type of authentication needs to be enabled. 
The value can be ``kerberos``, ``jwt``.
+    * ``impersonate_as_owner`` - Boolean that allows to set 
``AIRFLOW_CTX_DAG_OWNER`` as a user of the connection.
+
+    The following extra parameters can be used to configure authentication:
+
+    * ``jwt__token`` - If jwt authentication should be used, the value of 
token is given via this parameter.
+    * ``kerberos__service_name``, ``kerberos__config``, 
``kerberos__mutual_authentication``, ``kerberos__force_preemptive``, 
``kerberos__hostname_override``, ``kerberos__sanitize_mutual_error_response``, 
``kerberos__principal``,``kerberos__delegate``, ``kerberos__ca_bundle`` - These 
parameters can be set when enabling ``kerberos`` authentication.
diff --git a/docs/apache-airflow-providers-trino/index.rst 
b/docs/apache-airflow-providers-trino/index.rst
index 239ec03cd8..785f262941 100644
--- a/docs/apache-airflow-providers-trino/index.rst
+++ b/docs/apache-airflow-providers-trino/index.rst
@@ -27,6 +27,7 @@ Content
     :caption: Guides
 
     TrinoTransferOperator types <operators/transfer/gcs_to_trino>
+    Connection types <connections>
 
 .. toctree::
     :maxdepth: 1
diff --git a/tests/providers/trino/hooks/test_trino.py 
b/tests/providers/trino/hooks/test_trino.py
index 883794bf0f..d721706858 100644
--- a/tests/providers/trino/hooks/test_trino.py
+++ b/tests/providers/trino/hooks/test_trino.py
@@ -34,6 +34,7 @@ HOOK_GET_CONNECTION = 
'airflow.providers.trino.hooks.trino.TrinoHook.get_connect
 BASIC_AUTHENTICATION = 
'airflow.providers.trino.hooks.trino.trino.auth.BasicAuthentication'
 KERBEROS_AUTHENTICATION = 
'airflow.providers.trino.hooks.trino.trino.auth.KerberosAuthentication'
 TRINO_DBAPI_CONNECT = 'airflow.providers.trino.hooks.trino.trino.dbapi.connect'
+JWT_AUTHENTICATION = 
'airflow.providers.trino.hooks.trino.trino.auth.JWTAuthentication'
 
 
 class TestTrinoHookConn:
@@ -94,6 +95,21 @@ class TestTrinoHookConn:
         ):
             TrinoHook().get_conn()
 
+    @patch(JWT_AUTHENTICATION)
+    @patch(TRINO_DBAPI_CONNECT)
+    @patch(HOOK_GET_CONNECTION)
+    def test_get_conn_jwt_auth(self, mock_get_connection, mock_connect, 
mock_jwt_auth):
+        extras = {
+            'auth': 'jwt',
+            'jwt__token': 'TEST_JWT_TOKEN',
+        }
+        self.set_get_connection_return_value(
+            mock_get_connection,
+            extra=json.dumps(extras),
+        )
+        TrinoHook().get_conn()
+        self.assert_connection_called_with(mock_connect, auth=mock_jwt_auth)
+
     @patch(KERBEROS_AUTHENTICATION)
     @patch(TRINO_DBAPI_CONNECT)
     @patch(HOOK_GET_CONNECTION)

Reply via email to