This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c7e94b6af2a Add PAT authentication support to SnowflakeSqlApiHook 
(#62162)
c7e94b6af2a is described below

commit c7e94b6af2aaa36ad47d78972ebae7ffa079bb15
Author: Akshaykumar <[email protected]>
AuthorDate: Wed Mar 11 18:09:56 2026 +0530

    Add PAT authentication support to SnowflakeSqlApiHook (#62162)
    
    Co-authored-by: Akshay <[email protected]>
---
 docs/spelling_wordlist.txt                         |  1 +
 .../providers/snowflake/hooks/snowflake_sql_api.py | 39 ++++++++++++++++------
 .../providers/snowflake/operators/snowflake.py     | 11 ++++--
 .../unit/snowflake/hooks/test_snowflake_sql_api.py | 39 ++++++++++++++++++++++
 4 files changed, 77 insertions(+), 13 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 359d619e26f..8dc6d26e91e 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1216,6 +1216,7 @@ proc
 productionalize
 ProductSearchClient
 profiler
+Programmatic
 programmatically
 proj
 projectId
diff --git 
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
 
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
index 2f52be48b88..ce68a6436ea 100644
--- 
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
+++ 
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
@@ -50,10 +50,15 @@ class SnowflakeSqlApiHook(SnowflakeHook):
     In combination with aiohttp, make post request to submit SQL statements 
for execution,
     poll to check the status of the execution of a statement. Fetch query 
results asynchronously.
 
-    This hook requires the snowflake_conn_id connection. This hooks mainly 
uses account, schema, database,
-    warehouse, and an authentication mechanism from one of below:
-    1. JWT Token generated from private_key_file or private_key_content. Other 
inputs can be defined in the connection or hook instantiation.
-    2. OAuth Token generated from the refresh_token, client_id and 
client_secret specified in the connection
+    This hook requires the snowflake_conn_id connection. This hook mainly uses 
account, schema, database,
+    warehouse, and an authentication mechanism from one of the following:
+
+    1. JWT Token generated from ``private_key_file`` or 
``private_key_content``. Other inputs can be
+       defined in the connection or hook instantiation.
+    2. OAuth Token generated from the ``refresh_token``, ``client_id`` and 
``client_secret`` specified
+       in the connection.
+    3. PAT (Programmatic Access Token): set ``authenticator`` to 
``programmatic_access_token`` in the
+       connection extras and put the PAT value in the connection ``password`` 
field.
 
     :param snowflake_conn_id: Reference to
         :ref:`Snowflake connection id<howto/connection:snowflake>`
@@ -212,7 +217,7 @@ class SnowflakeSqlApiHook(SnowflakeHook):
         return self.query_ids
 
     def get_headers(self) -> dict[str, Any]:
-        """Form auth headers based on either OAuth token or JWT token from 
private key."""
+        """Form auth headers based on OAuth token, PAT, or JWT token from 
private key."""
         conn_config = self._get_conn_params()
 
         # Use OAuth if refresh_token and client_id and client_secret are 
provided
@@ -220,16 +225,31 @@ class SnowflakeSqlApiHook(SnowflakeHook):
             [conn_config.get("refresh_token"), conn_config.get("client_id"), 
conn_config.get("client_secret")]
         ):
             oauth_token = self.get_oauth_token(conn_config=conn_config)
-            headers = {
+            return {
                 "Content-Type": "application/json",
                 "Authorization": f"Bearer {oauth_token}",
                 "Accept": "application/json",
                 "User-Agent": "snowflakeSQLAPI/1.0",
                 "X-Snowflake-Authorization-Token-Type": "OAUTH",
             }
-            return headers
 
-        # Alternatively, get the JWT token from the connection details and the 
private key
+        # Use PAT (Programmatic Access Token) when authenticator is set to 
programmatic_access_token
+        if conn_config.get("authenticator") == "programmatic_access_token":
+            pat = conn_config.get("password")
+            if not pat:
+                raise AirflowException(
+                    "Programmatic Access Token (PAT) authentication requires 
the connection password "
+                    "field to contain the PAT token value."
+                )
+            return {
+                "Content-Type": "application/json",
+                "Authorization": f"Bearer {pat}",
+                "Accept": "application/json",
+                "User-Agent": "snowflakeSQLAPI/1.0",
+                "X-Snowflake-Authorization-Token-Type": 
"PROGRAMMATIC_ACCESS_TOKEN",
+            }
+
+        # Fall back to JWT token from the connection details and the private 
key
         if not self.private_key:
             self.private_key = self.get_private_key()
 
@@ -241,14 +261,13 @@ class SnowflakeSqlApiHook(SnowflakeHook):
             renewal_delay=self.token_renewal_delta,
         ).get_token()
 
-        headers = {
+        return {
             "Content-Type": "application/json",
             "Authorization": f"Bearer {token}",
             "Accept": "application/json",
             "User-Agent": "snowflakeSQLAPI/1.0",
             "X-Snowflake-Authorization-Token-Type": "KEYPAIR_JWT",
         }
-        return headers
 
     def get_oauth_token(
         self,
diff --git 
a/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py 
b/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
index 74ee1b4e115..8fa167f3fd3 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
@@ -292,12 +292,17 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
     multiple SQL statements in a single request. It make post request to 
submit SQL
     statements for execution, poll to check the status of the execution of a 
statement. Fetch query results
     concurrently.
-    This Operator currently uses key pair authentication, so you need to 
provide private key raw content or
-    private key file path in the snowflake connection along with other details
+
+    The operator supports the following authentication methods via the 
Snowflake connection:
+
+    - **Key pair**: provide ``private_key_file`` or ``private_key_content`` in 
the connection extras.
+    - **OAuth**: provide ``refresh_token``, ``client_id``, and 
``client_secret`` in the connection extras.
+    - **Programmatic Access Token (PAT)**: set ``authenticator`` to 
``programmatic_access_token`` in
+      the connection extras and put the PAT value in the connection 
``password`` field.
 
     .. seealso::
 
-        `Snowflake SQL API key pair Authentication 
<https://docs.snowflake.com/en/developer-guide/sql-api/authenticating.html#label-sql-api-authenticating-key-pair>`_
+        `Snowflake SQL API Authentication 
<https://docs.snowflake.com/en/developer-guide/sql-api/authenticating>`_
 
     Where can this operator fit in?
          - To execute multiple SQL statements in a single request
diff --git 
a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py 
b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
index 32e475fa15b..062bee3a67f 100644
--- a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
+++ b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
@@ -91,6 +91,20 @@ CONN_PARAMS_OAUTH = {
     "warehouse": "af_wh",
 }
 
+CONN_PARAMS_PAT = {
+    "account": "airflow",
+    "application": "AIRFLOW",
+    "authenticator": "programmatic_access_token",
+    "database": "db",
+    "password": "my_pat_token_value",
+    "region": "af_region",
+    "role": "af_role",
+    "schema": "public",
+    "session_parameters": None,
+    "user": "user",
+    "warehouse": "af_wh",
+}
+
 HEADERS = {
     "Content-Type": "application/json",
     "Authorization": "Bearer newT0k3n",
@@ -107,6 +121,14 @@ HEADERS_OAUTH = {
     "X-Snowflake-Authorization-Token-Type": "OAUTH",
 }
 
+HEADERS_PAT = {
+    "Content-Type": "application/json",
+    "Authorization": "Bearer my_pat_token_value",
+    "Accept": "application/json",
+    "User-Agent": "snowflakeSQLAPI/1.0",
+    "X-Snowflake-Authorization-Token-Type": "PROGRAMMATIC_ACCESS_TOKEN",
+}
+
 
 GET_RESPONSE = {
     "resultSetMetaData": {
@@ -485,6 +507,23 @@ class TestSnowflakeSqlApiHook:
         result = hook.get_headers()
         assert result == HEADERS_OAUTH
 
+    @mock.patch(f"{HOOK_PATH}._get_conn_params")
+    def test_get_headers_should_support_pat(self, mock_conn_param):
+        """Test get_headers returns PROGRAMMATIC_ACCESS_TOKEN headers when 
authenticator is PAT."""
+        mock_conn_param.return_value = CONN_PARAMS_PAT
+        hook = SnowflakeSqlApiHook(snowflake_conn_id="mock_conn_id")
+        result = hook.get_headers()
+        assert result == HEADERS_PAT
+
+    @mock.patch(f"{HOOK_PATH}._get_conn_params")
+    def test_get_headers_pat_raises_when_password_missing(self, 
mock_conn_param):
+        """Test get_headers raises AirflowException when PAT authenticator is 
set but password is empty."""
+        conn_params_pat_no_password = {**CONN_PARAMS_PAT, "password": ""}
+        mock_conn_param.return_value = conn_params_pat_no_password
+        hook = SnowflakeSqlApiHook(snowflake_conn_id="mock_conn_id")
+        with pytest.raises(AirflowException, match="Programmatic Access 
Token"):
+            hook.get_headers()
+
     @mock.patch("airflow.providers.snowflake.hooks.snowflake.HTTPBasicAuth")
     @mock.patch("requests.post")
     @mock.patch(f"{HOOK_PATH}._get_conn_params")

Reply via email to