seniuts-b2 opened a new issue, #47025: URL: https://github.com/apache/airflow/issues/47025
### Apache Airflow version 2.10.5 ### If "Other Airflow 2 version" selected, which one? 2.10.4 ### What happened? I deployed Airflow on Azure K8s (AKS) via the Airflow Official Helm Chart. For UI authentication I use Azure AD via OAuth2 for that I have Azure App Registration for handling AIrflow access via Role-based access control (RBAC). Everything works as expected. There are a couple of settings in Azure App Registry: API permissions:  there are permissions for access to UI through `Delegated` permission type and `Application` permission type to Airflow API access via custom API backend through Application URI ID (using as a `scope` to fetch access token):  So, I use a custom Airflow API backend to access Airflow and manage Roles via endpoint:` https://airflow-ui-test.westeurope.cloudapp.azure.com/auth/fab/v1/roles`. In values.yaml Helm chart file in env section I have: ``` - name: AIRFLOW__API__AUTH_BACKENDS value: "airflow_utility.dag_level_access_control.azure_ad_auth_backend,airflow.api.auth.backend.session" ``` here is my custom API backend: `airflow_utility.dag_level_access_control.azure_ad_auth_backend` And I get error: ``` { "detail": null, "status": 403, "title": "Forbidden", "type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied" } ``` There is the custom Airflow API Backend: ``` from __future__ import annotations import os import json import requests import jwt import logging from jwt.algorithms import RSAAlgorithm from flask import Response, request from functools import wraps from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast T = TypeVar("T", bound=Callable) # TypeVar for function decorators # ------------------------------ # Logging Configuration # ------------------------------ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # ------------------------------ # Azure AD Configuration # ------------------------------ # AAD_TENANT_ID and AAD_CLIENT_ID parameters passed as environment variables though the values.yaml file webserver section AAD_TENANT_ID = os.getenv("AAD_TENANT_ID") AAD_CLIENT_ID = os.getenv("AAD_CLIENT_ID") JWKS_URL = f"https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys" # JSON Web Key Set (JWKS) URL ISSUER = f"https://sts.windows.net/{AAD_TENANT_ID}/" #f"https://login.microsoftonline.com/{AAD_TENANT_ID}/v2.0" # Issuer (iss) claim in the token AUDIENCE = f"api://{AAD_CLIENT_ID}" # Expected audience (should match the Application ID URI) LEEWAY_SECONDS = 60 # Allow 60 seconds of clock drift for `exp`, `nbf`, `iat` validation if not AAD_TENANT_ID or not AAD_CLIENT_ID: msg = "Missing required environment variables (should be passed as env variable through values.yaml file webserver secrion): AAD_TENANT_ID, AAD_CLIENT_ID" logger.error(msg) raise ValueError(msg) # logger.info(f"Using AAD_TENANT_ID: {AAD_TENANT_ID}") # logger.info(f"Using CLIENT_ID: {AAD_CLIENT_ID}") logger.info(f"Fetching JWKS from: {JWKS_URL}") # Cache JWKS keys to avoid frequent network requests jwks_cache = {} def get_azure_jwks(): """ Fetch and cache Azure AD JWKS (JSON Web Key Set) to verify token signatures. JWKS is a public key repository published by Azure AD. It contains multiple keys that are used to verify JWT signatures. Azure AD JWKS URL looks like: https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys Azure AD rotates keys from time to time, so fetching them once per process is enough: https://learn.microsoft.com/en-us/entra/identity-platform/signing-key-rollover#:~:text=metadata%20document.-,For%20security%20purposes%2C%20the%20Microsoft%20identity%20platform%E2%80%99s%20signing%20key%20rolls%20on,a%20key%20rollover%20event%20no%20matter%20how%20frequently%20it%20may%20occur.,-If%20your%20application """ global jwks_cache if not jwks_cache: logger.info("Fetching JWKS from Azure AD...") response = requests.get(JWKS_URL) response.raise_for_status() jwks_cache = response.json() logger.info("JWKS fetched and cached.") return jwks_cache def validate_azure_token(token: str) -> bool: """ token (str): there is received Airflow API request token of Azure AD JWT access token. JWT (JSON Web Token) has three parts: # Header (contains metadata like algorithm and key ID kid) # Payload (contains claims like iss, exp, aud, etc.) # Signature (ensures integrity) Validate an Azure AD JWT access token workflow: 1. Extract the JWT Header → Get the "kid" 2. Fetch JWKS from Azure → Get the list of public keys 3. Find the matching kid → Use it to verify the JWT 4. Decode JWT using the correct key → Validate signature and claims 5. If everything is valid, grant access Short workflow: The function fetches the JWKS, finds the correct key, and verifies the token signature. """ try: logger.info("Validating Azure AD access token...") # Decode the JWT header to get the key ID (kid): ## The JWT Header contains a "kid" (Key ID) field, which tells which public key was used to sign the JWT. header = jwt.get_unverified_header(token) logger.info(f"Token: {token}") logger.info(f"Token header: {header}") logger.info(f"Token key ID (kid): {header['kid']}") # Fetch JWKS and find the correct key. jwks = get_azure_jwks() # logger.info(f"JWKS: {jwks}") key = next((json.dumps(k) for k in jwks["keys"] if k["kid"] == header["kid"]), None) logger.info(f"JWKS key: {key}") if not key: logger.error("Token key ID (kid) not found in JWKS. Token is invalid.") return False logger.info("Matching JWKS key found. Verifying token signature...") # Decode and verify the token: public_key = RSAAlgorithm.from_jwk(key) ## Verify the jwt token signature and return the token claims: https://pyjwt.readthedocs.io/en/stable/api.html#jwt.decode decoded_token = jwt.decode( jwt=token, key=public_key, algorithms=["RS256"], # extended decoding and validation options audience=AUDIENCE, # Ensures token is meant for this API issuer=ISSUER, # Ensures token was issued by Azure AD # leeway=LEEWAY_SECONDS, # Allow 60 seconds of clock drift for exp/nbf validation options={ "verify_signature": False, # Ensure signature is verified "require": ["exp", "iat", "nbf"], # Ensure these claims exist "verify_exp": True, # Ensure token is not expired "verify_iat": True, # Ensure issued-at (iat) is valid "verify_nbf": True, # Ensure not-before (nbf) is valid "verify_aud": True, # Ensure audience (aud) claim is valid "verify_iss": True, # Ensure issuer (iss) claim is valid "strict_aud": False # check that the aud claim is a single value (not a list), and matches audience exactly } ) logger.info(f"Decoded token: {decoded_token}") logger.info("Token is valid and contains required scope. Access granted.") return True # Token is valid except jwt.ExpiredSignatureError as e: logger.error(f"Token has expired. Access denied: {e}") return False except jwt.InvalidAudienceError as e: logger.error(f"Invalid audience. Expected: {AUDIENCE}: {e}") return False except jwt.InvalidIssuerError as e: logger.error(f"Invalid issuer. Expected: {ISSUER}: {e}") return False except jwt.InvalidTokenError as e: logger.error(f"Invalid token error: {e}") return False def requires_authentication(function: T): """ Decorator to enforce authentication on API endpoints using Azure AD token. """ @wraps(function) def decorated(*args, **kwargs): logger.info(" ### Custome Airflow API Backend to validate Azure AD Authentication ###") logger.info(f"Executing function: {function.__name__}") # Extract function name logger.info(f"request: {request}") auth_header = request.headers.get("Authorization") logger.info(f"auth_header: {auth_header}") if not auth_header or not auth_header.startswith("Bearer "): logger.error("Missing or malformed Authorization header. Access denied.") return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"}) # As the token is the part of Authorization header we can extract it by splitting the header (authorization header is in the format "Bearer <token>") token = auth_header.split(" ")[1] if not validate_azure_token(token): logger.error("Invalid or expired token. Access denied.") return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"}) logger.info("Authentication successful. Processing request.") return function(*args, **kwargs) return cast(T, decorated) # return decorated def init_app(_): """ Initialize the authentication backend (required by Airflow). """ ``` I use `"verify_signature": False` because using `jwt.decode` function I get an every time error that the signature is invalid. If someone knows why and how to fix it, I would be glad to get any recommendations. But this isn't the main question. The main question is: Why do I get the error above? I expect to have access if I use a custom API backend to validate the token. The gotten token includes claim `roles` (for mapping Airflow roles and App registration roles) as I use `Application` permission type in App registry API permissions. There are a couple of logs from API backend: ``` [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:153} INFO - ### Custome Airflow API Backend to validate Azure AD Authentication ### [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:154} INFO - Executing function: Response [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:155} INFO - request: <Request 'https://airflow-ui-test.westeurope.cloudapp.azure.com/api/v1/roles/DataEngineer' [GET]> [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:158} INFO - auth_header: Bearer ...{{token}}... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:85} INFO - Validating Azure AD access token... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:90} INFO - Token: ...{token}... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:91} INFO - Token header: {'typ': 'JWT', 'alg': 'RS256', 'x5t': 'imi0Y2z0dYKxBttAqK_Tt5hYBTk', 'kid': '{{ some kid}}'} [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:92} INFO - Token key ID (kid): {{ some kid}} [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:98} INFO - JWKS key: {"kty": "RSA", "use": "sig", "kid": "{{some kid}}", "x5t": "{{some values}}", "n": "{{some values}}", "e": "AQAB", "x5c": ["{{some values}}"], "cloud_instance_name": "microsoftonline.com", "issuer": "https://login.microsoftonline.com/{{ tenant_id }}/v2.0"} [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:102} INFO - Matching JWKS key found. Verifying token signature... [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:107} INFO - !!!! Public key: <cryptography.hazmat.bindings._rust.openssl.rsa.RSAPublicKey object at 0x7f11e134aa50> [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:128} INFO - Decoded token: {'aud': 'api://{{ app client id }}', 'iss': 'https://sts.windows.net/{{ tenant_id }}/', 'iat': 1740401432, 'nbf': 1740401432, 'exp': 1740405332, 'aio': '{{some values}}', 'appid': '{{ app client id }}', 'appidacr': '1', 'idp': 'https://sts.windows.net/{{ tenant id }}/', 'oid': '{{some values}}', 'rh': '{{some values}}', 'roles': ['Admin'], 'sub': '{{some values}}', 'tid': '{{some values}}', 'uti': '{{some values}}', 'ver': '1.0'} [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:130} INFO - Token is valid and contains required scope. Access granted. [2025-02-24T13:00:04.018+0000] {azure_ad_auth_backend.py:171} INFO - Authentication successful. Processing request. 10.224.0.199 - - [24/Feb/2025:13:00:04 +0000] "GET /api/v1/roles/DataEngineer HTTP/1.1" 403 186 "-" "PostmanRuntime/7.43.0" ``` I get the same error: ``` { "detail": null, "status": 403, "title": "Forbidden", "type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied" } ``` using Postman and just pure python through GET request and {"Authorization": f"Bearer {access_token}"} in header ### What you think should happen instead? I expect that if I use the Airflow API backend I should have access to API if the backend code was passed.`` ### How to reproduce deploy Airflow on AKS and use custom API backend to use Azure AD token. ### Operating System PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" ### Versions of Apache Airflow Providers airflow@airflow-webserver-7fdffbcd6b-cznnc:/opt/airflow$ pip freeze | grep apache-airflow-provider apache-airflow-providers-amazon==9.2.0 apache-airflow-providers-celery==3.10.0 apache-airflow-providers-cncf-kubernetes==10.1.0 apache-airflow-providers-common-compat==1.3.0 apache-airflow-providers-common-io==1.5.0 apache-airflow-providers-common-sql==1.21.0 apache-airflow-providers-docker==4.0.0 apache-airflow-providers-elasticsearch==6.0.0 apache-airflow-providers-fab==1.5.2 apache-airflow-providers-ftp==3.12.0 apache-airflow-providers-google==12.0.0 apache-airflow-providers-grpc==3.7.0 apache-airflow-providers-hashicorp==4.0.0 apache-airflow-providers-http==5.0.0 apache-airflow-providers-imap==3.8.0 apache-airflow-providers-microsoft-azure==10.3.0 apache-airflow-providers-mysql==6.0.0 apache-airflow-providers-odbc==4.9.0 apache-airflow-providers-openlineage==2.0.0 apache-airflow-providers-postgres==6.0.0 apache-airflow-providers-redis==4.0.0 apache-airflow-providers-sendgrid==4.0.0 apache-airflow-providers-sftp==5.0.0 apache-airflow-providers-slack==9.0.0 apache-airflow-providers-smtp==1.9.0 apache-airflow-providers-snowflake==6.0.0 apache-airflow-providers-sqlite==4.0.0 apache-airflow-providers-ssh==4.0.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details AKS, Official Airflow Helm Chart. ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
