This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c7e94b6af2a Add PAT authentication support to SnowflakeSqlApiHook
(#62162)
c7e94b6af2a is described below
commit c7e94b6af2aaa36ad47d78972ebae7ffa079bb15
Author: Akshaykumar <[email protected]>
AuthorDate: Wed Mar 11 18:09:56 2026 +0530
Add PAT authentication support to SnowflakeSqlApiHook (#62162)
Co-authored-by: Akshay <[email protected]>
---
docs/spelling_wordlist.txt | 1 +
.../providers/snowflake/hooks/snowflake_sql_api.py | 39 ++++++++++++++++------
.../providers/snowflake/operators/snowflake.py | 11 ++++--
.../unit/snowflake/hooks/test_snowflake_sql_api.py | 39 ++++++++++++++++++++++
4 files changed, 77 insertions(+), 13 deletions(-)
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 359d619e26f..8dc6d26e91e 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1216,6 +1216,7 @@ proc
productionalize
ProductSearchClient
profiler
+Programmatic
programmatically
proj
projectId
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
index 2f52be48b88..ce68a6436ea 100644
---
a/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
+++
b/providers/snowflake/src/airflow/providers/snowflake/hooks/snowflake_sql_api.py
@@ -50,10 +50,15 @@ class SnowflakeSqlApiHook(SnowflakeHook):
In combination with aiohttp, make post request to submit SQL statements
for execution,
poll to check the status of the execution of a statement. Fetch query
results asynchronously.
- This hook requires the snowflake_conn_id connection. This hooks mainly
uses account, schema, database,
- warehouse, and an authentication mechanism from one of below:
- 1. JWT Token generated from private_key_file or private_key_content. Other
inputs can be defined in the connection or hook instantiation.
- 2. OAuth Token generated from the refresh_token, client_id and
client_secret specified in the connection
+ This hook requires the snowflake_conn_id connection. This hook mainly uses
account, schema, database,
+ warehouse, and an authentication mechanism from one of the following:
+
+ 1. JWT Token generated from ``private_key_file`` or
``private_key_content``. Other inputs can be
+ defined in the connection or hook instantiation.
+ 2. OAuth Token generated from the ``refresh_token``, ``client_id`` and
``client_secret`` specified
+ in the connection.
+ 3. PAT (Programmatic Access Token): set ``authenticator`` to
``programmatic_access_token`` in the
+ connection extras and put the PAT value in the connection ``password``
field.
:param snowflake_conn_id: Reference to
:ref:`Snowflake connection id<howto/connection:snowflake>`
@@ -212,7 +217,7 @@ class SnowflakeSqlApiHook(SnowflakeHook):
return self.query_ids
def get_headers(self) -> dict[str, Any]:
- """Form auth headers based on either OAuth token or JWT token from
private key."""
+ """Form auth headers based on OAuth token, PAT, or JWT token from
private key."""
conn_config = self._get_conn_params()
# Use OAuth if refresh_token and client_id and client_secret are
provided
@@ -220,16 +225,31 @@ class SnowflakeSqlApiHook(SnowflakeHook):
[conn_config.get("refresh_token"), conn_config.get("client_id"),
conn_config.get("client_secret")]
):
oauth_token = self.get_oauth_token(conn_config=conn_config)
- headers = {
+ return {
"Content-Type": "application/json",
"Authorization": f"Bearer {oauth_token}",
"Accept": "application/json",
"User-Agent": "snowflakeSQLAPI/1.0",
"X-Snowflake-Authorization-Token-Type": "OAUTH",
}
- return headers
- # Alternatively, get the JWT token from the connection details and the
private key
+ # Use PAT (Programmatic Access Token) when authenticator is set to
programmatic_access_token
+ if conn_config.get("authenticator") == "programmatic_access_token":
+ pat = conn_config.get("password")
+ if not pat:
+ raise AirflowException(
+ "Programmatic Access Token (PAT) authentication requires
the connection password "
+ "field to contain the PAT token value."
+ )
+ return {
+ "Content-Type": "application/json",
+ "Authorization": f"Bearer {pat}",
+ "Accept": "application/json",
+ "User-Agent": "snowflakeSQLAPI/1.0",
+ "X-Snowflake-Authorization-Token-Type":
"PROGRAMMATIC_ACCESS_TOKEN",
+ }
+
+ # Fall back to JWT token from the connection details and the private
key
if not self.private_key:
self.private_key = self.get_private_key()
@@ -241,14 +261,13 @@ class SnowflakeSqlApiHook(SnowflakeHook):
renewal_delay=self.token_renewal_delta,
).get_token()
- headers = {
+ return {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"User-Agent": "snowflakeSQLAPI/1.0",
"X-Snowflake-Authorization-Token-Type": "KEYPAIR_JWT",
}
- return headers
def get_oauth_token(
self,
diff --git
a/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
b/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
index 74ee1b4e115..8fa167f3fd3 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
@@ -292,12 +292,17 @@ class SnowflakeSqlApiOperator(SQLExecuteQueryOperator):
multiple SQL statements in a single request. It make post request to
submit SQL
statements for execution, poll to check the status of the execution of a
statement. Fetch query results
concurrently.
- This Operator currently uses key pair authentication, so you need to
provide private key raw content or
- private key file path in the snowflake connection along with other details
+
+ The operator supports the following authentication methods via the
Snowflake connection:
+
+ - **Key pair**: provide ``private_key_file`` or ``private_key_content`` in
the connection extras.
+ - **OAuth**: provide ``refresh_token``, ``client_id``, and
``client_secret`` in the connection extras.
+ - **Programmatic Access Token (PAT)**: set ``authenticator`` to
``programmatic_access_token`` in
+ the connection extras and put the PAT value in the connection
``password`` field.
.. seealso::
- `Snowflake SQL API key pair Authentication
<https://docs.snowflake.com/en/developer-guide/sql-api/authenticating.html#label-sql-api-authenticating-key-pair>`_
+ `Snowflake SQL API Authentication
<https://docs.snowflake.com/en/developer-guide/sql-api/authenticating>`_
Where can this operator fit in?
- To execute multiple SQL statements in a single request
diff --git
a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
index 32e475fa15b..062bee3a67f 100644
--- a/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
+++ b/providers/snowflake/tests/unit/snowflake/hooks/test_snowflake_sql_api.py
@@ -91,6 +91,20 @@ CONN_PARAMS_OAUTH = {
"warehouse": "af_wh",
}
+CONN_PARAMS_PAT = {
+ "account": "airflow",
+ "application": "AIRFLOW",
+ "authenticator": "programmatic_access_token",
+ "database": "db",
+ "password": "my_pat_token_value",
+ "region": "af_region",
+ "role": "af_role",
+ "schema": "public",
+ "session_parameters": None,
+ "user": "user",
+ "warehouse": "af_wh",
+}
+
HEADERS = {
"Content-Type": "application/json",
"Authorization": "Bearer newT0k3n",
@@ -107,6 +121,14 @@ HEADERS_OAUTH = {
"X-Snowflake-Authorization-Token-Type": "OAUTH",
}
+HEADERS_PAT = {
+ "Content-Type": "application/json",
+ "Authorization": "Bearer my_pat_token_value",
+ "Accept": "application/json",
+ "User-Agent": "snowflakeSQLAPI/1.0",
+ "X-Snowflake-Authorization-Token-Type": "PROGRAMMATIC_ACCESS_TOKEN",
+}
+
GET_RESPONSE = {
"resultSetMetaData": {
@@ -485,6 +507,23 @@ class TestSnowflakeSqlApiHook:
result = hook.get_headers()
assert result == HEADERS_OAUTH
+ @mock.patch(f"{HOOK_PATH}._get_conn_params")
+ def test_get_headers_should_support_pat(self, mock_conn_param):
+ """Test get_headers returns PROGRAMMATIC_ACCESS_TOKEN headers when
authenticator is PAT."""
+ mock_conn_param.return_value = CONN_PARAMS_PAT
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="mock_conn_id")
+ result = hook.get_headers()
+ assert result == HEADERS_PAT
+
+ @mock.patch(f"{HOOK_PATH}._get_conn_params")
+ def test_get_headers_pat_raises_when_password_missing(self,
mock_conn_param):
+ """Test get_headers raises AirflowException when PAT authenticator is
set but password is empty."""
+ conn_params_pat_no_password = {**CONN_PARAMS_PAT, "password": ""}
+ mock_conn_param.return_value = conn_params_pat_no_password
+ hook = SnowflakeSqlApiHook(snowflake_conn_id="mock_conn_id")
+ with pytest.raises(AirflowException, match="Programmatic Access
Token"):
+ hook.get_headers()
+
@mock.patch("airflow.providers.snowflake.hooks.snowflake.HTTPBasicAuth")
@mock.patch("requests.post")
@mock.patch(f"{HOOK_PATH}._get_conn_params")