GitHub user seniuts-b2 edited a discussion: Airflow API Returns 403 Forbidden When Using Azure AD Authentication via Custom API Backend
### Apache Airflow version 2.10.5 ### If "Other Airflow 2 version" selected, which one? 2.10.4 ### What happened? I deployed Airflow on Azure K8s (AKS) via the Airflow Official Helm Chart. For UI authentication I use Azure AD via OAuth2 for that I have Azure App Registration for handling AIrflow access via Role-based access control (RBAC). Everything works as expected. There are a couple of settings in Azure App Registry: API permissions:  there are permissions for access to UI through `Delegated` permission type and `Application` permission type to Airflow API access via custom API backend through Application URI ID (using as a `scope` to fetch access token):  So, I use a custom Airflow API backend to access Airflow and manage Roles via endpoint:` https://airflow-ui-test.westeurope.cloudapp.azure.com/auth/fab/v1/roles`. In values.yaml Helm chart file in env section I have: ``` - name: AIRFLOW__API__AUTH_BACKENDS value: "airflow_utility.dag_level_access_control.azure_ad_auth_backend,airflow.api.auth.backend.session" ``` here is my custom API backend: `airflow_utility.dag_level_access_control.azure_ad_auth_backend` And I get error: ``` { "detail": null, "status": 403, "title": "Forbidden", "type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied" } ``` There is the custom Airflow API Backend: ``` from __future__ import annotations import os import json import requests import jwt import logging from jwt.algorithms import RSAAlgorithm from flask import Response, request from functools import wraps from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast T = TypeVar("T", bound=Callable) # TypeVar for function decorators # ------------------------------ # Logging Configuration # ------------------------------ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # ------------------------------ # Azure AD Configuration # ------------------------------ # AAD_TENANT_ID and AAD_CLIENT_ID parameters passed as environment variables though the values.yaml file webserver section AAD_TENANT_ID = os.getenv("AAD_TENANT_ID") AAD_CLIENT_ID = os.getenv("AAD_CLIENT_ID") JWKS_URL = f"https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys" # JSON Web Key Set (JWKS) URL ISSUER = f"https://sts.windows.net/{AAD_TENANT_ID}/" #f"https://login.microsoftonline.com/{AAD_TENANT_ID}/v2.0" # Issuer (iss) claim in the token AUDIENCE = f"api://{AAD_CLIENT_ID}" # Expected audience (should match the Application ID URI) LEEWAY_SECONDS = 60 # Allow 60 seconds of clock drift for `exp`, `nbf`, `iat` validation if not AAD_TENANT_ID or not AAD_CLIENT_ID: msg = "Missing required environment variables (should be passed as env variable through values.yaml file webserver secrion): AAD_TENANT_ID, AAD_CLIENT_ID" logger.error(msg) raise ValueError(msg) # logger.info(f"Using AAD_TENANT_ID: {AAD_TENANT_ID}") # logger.info(f"Using CLIENT_ID: {AAD_CLIENT_ID}") logger.info(f"Fetching JWKS from: {JWKS_URL}") # Cache JWKS keys to avoid frequent network requests jwks_cache = {} def get_azure_jwks(): """ Fetch and cache Azure AD JWKS (JSON Web Key Set) to verify token signatures. JWKS is a public key repository published by Azure AD. It contains multiple keys that are used to verify JWT signatures. Azure AD JWKS URL looks like: https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys Azure AD rotates keys from time to time, so fetching them once per process is enough: https://learn.microsoft.com/en-us/entra/identity-platform/signing-key-rollover#:~:text=metadata%20document.-,For%20security%20purposes%2C%20the%20Microsoft%20identity%20platform%E2%80%99s%20signing%20key%20rolls%20on,a%20key%20rollover%20event%20no%20matter%20how%20frequently%20it%20may%20occur.,-If%20your%20application """ global jwks_cache if not jwks_cache: logger.info("Fetching JWKS from Azure AD...") response = requests.get(JWKS_URL) response.raise_for_status() jwks_cache = response.json() logger.info("JWKS fetched and cached.") return jwks_cache def validate_azure_token(token: str) -> bool: """ token (str): there is received Airflow API request token of Azure AD JWT access token. JWT (JSON Web Token) has three parts: # Header (contains metadata like algorithm and key ID kid) # Payload (contains claims like iss, exp, aud, etc.) # Signature (ensures integrity) Validate an Azure AD JWT access token workflow: 1. Extract the JWT Header → Get the "kid" 2. Fetch JWKS from Azure → Get the list of public keys 3. Find the matching kid → Use it to verify the JWT 4. Decode JWT using the correct key → Validate signature and claims 5. If everything is valid, grant access Short workflow: The function fetches the JWKS, finds the correct key, and verifies the token signature. """ try: logger.info("Validating Azure AD access token...") # Decode the JWT header to get the key ID (kid): ## The JWT Header contains a "kid" (Key ID) field, which tells which public key was used to sign the JWT. header = jwt.get_unverified_header(token) logger.info(f"Token: {token}") logger.info(f"Token header: {header}") logger.info(f"Token key ID (kid): {header['kid']}") # Fetch JWKS and find the correct key. jwks = get_azure_jwks() # logger.info(f"JWKS: {jwks}") key = next((json.dumps(k) for k in jwks["keys"] if k["kid"] == header["kid"]), None) logger.info(f"JWKS key: {key}") if not key: logger.error("Token key ID (kid) not found in JWKS. Token is invalid.") return False logger.info("Matching JWKS key found. Verifying token signature...") # Decode and verify the token: public_key = RSAAlgorithm.from_jwk(key) ## Verify the jwt token signature and return the token claims: https://pyjwt.readthedocs.io/en/stable/api.html#jwt.decode decoded_token = jwt.decode( jwt=token, key=public_key, algorithms=["RS256"], # extended decoding and validation options audience=AUDIENCE, # Ensures token is meant for this API issuer=ISSUER, # Ensures token was issued by Azure AD # leeway=LEEWAY_SECONDS, # Allow 60 seconds of clock drift for exp/nbf validation options={ "verify_signature": False, # Ensure signature is verified "require": ["exp", "iat", "nbf"], # Ensure these claims exist "verify_exp": True, # Ensure token is not expired "verify_iat": True, # Ensure issued-at (iat) is valid "verify_nbf": True, # Ensure not-before (nbf) is valid "verify_aud": True, # Ensure audience (aud) claim is valid "verify_iss": True, # Ensure issuer (iss) claim is valid "strict_aud": False # check that the aud claim is a single value (not a list), and matches audience exactly } ) logger.info(f"Decoded token: {decoded_token}") logger.info("Token is valid and contains required scope. Access granted.") return True # Token is valid except jwt.ExpiredSignatureError as e: logger.error(f"Token has expired. Access denied: {e}") return False except jwt.InvalidAudienceError as e: logger.error(f"Invalid audience. Expected: {AUDIENCE}: {e}") return False except jwt.InvalidIssuerError as e: logger.error(f"Invalid issuer. Expected: {ISSUER}: {e}") return False except jwt.InvalidTokenError as e: logger.error(f"Invalid token error: {e}") return False def requires_authentication(function: T): """ Decorator to enforce authentication on API endpoints using Azure AD token. """ @wraps(function) def decorated(*args, **kwargs): logger.info(" ### Custome Airflow API Backend to validate Azure AD Authentication ###") logger.info(f"Executing function: {function.__name__}") # Extract function name logger.info(f"request: {request}") auth_header = request.headers.get("Authorization") logger.info(f"auth_header: {auth_header}") if not auth_header or not auth_header.startswith("Bearer "): logger.error("Missing or malformed Authorization header. Access denied.") return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"}) # As the token is the part of Authorization header we can extract it by splitting the header (authorization header is in the format "Bearer <token>") token = auth_header.split(" ")[1] if not validate_azure_token(token): logger.error("Invalid or expired token. Access denied.") return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"}) logger.info("Authentication successful. Processing request.") return function(*args, **kwargs) return cast(T, decorated) # return decorated def init_app(_): """ Initialize the authentication backend (required by Airflow). """ ``` I use `"verify_signature": False` because using `jwt.decode` function I get an every time error that the signature is invalid. If someone knows why and how to fix it, I would be glad to get any recommendations. But this isn't the main question. The main question is: Why do I get the error above? I expect to have access if I use a custom API backend to validate the token. The gotten token includes claim `roles` (for mapping Airflow roles and App registration roles) as I use `Application` permission type in App registry API permissions. There are a couple of logs from API backend: ``` [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:153} INFO - ### Custome Airflow API Backend to validate Azure AD Authentication ### [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:154} INFO - Executing function: Response [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:155} INFO - request: <Request 'https://airflow-ui-test.westeurope.cloudapp.azure.com/api/v1/roles/DataEngineer' [GET]> [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:158} INFO - auth_header: Bearer ...{{token}}... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:85} INFO - Validating Azure AD access token... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:90} INFO - Token: ...{token}... [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:91} INFO - Token header: {'typ': 'JWT', 'alg': 'RS256', 'x5t': 'imi0Y2z0dYKxBttAqK_Tt5hYBTk', 'kid': '{{ some kid}}'} [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:92} INFO - Token key ID (kid): {{ some kid}} [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:98} INFO - JWKS key: {"kty": "RSA", "use": "sig", "kid": "{{some kid}}", "x5t": "{{some values}}", "n": "{{some values}}", "e": "AQAB", "x5c": ["{{some values}}"], "cloud_instance_name": "microsoftonline.com", "issuer": "https://login.microsoftonline.com/{{ tenant_id }}/v2.0"} [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:102} INFO - Matching JWKS key found. Verifying token signature... [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:107} INFO - !!!! Public key: <cryptography.hazmat.bindings._rust.openssl.rsa.RSAPublicKey object at 0x7f11e134aa50> [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:128} INFO - Decoded token: {'aud': 'api://{{ app client id }}', 'iss': 'https://sts.windows.net/{{ tenant_id }}/', 'iat': 1740401432, 'nbf': 1740401432, 'exp': 1740405332, 'aio': '{{some values}}', 'appid': '{{ app client id }}', 'appidacr': '1', 'idp': 'https://sts.windows.net/{{ tenant id }}/', 'oid': '{{some values}}', 'rh': '{{some values}}', 'roles': ['Admin'], 'sub': '{{some values}}', 'tid': '{{some values}}', 'uti': '{{some values}}', 'ver': '1.0'} [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:130} INFO - Token is valid and contains required scope. Access granted. [2025-02-24T13:00:04.018+0000] {azure_ad_auth_backend.py:171} INFO - Authentication successful. Processing request. 10.224.0.199 - - [24/Feb/2025:13:00:04 +0000] "GET /api/v1/roles/DataEngineer HTTP/1.1" 403 186 "-" "PostmanRuntime/7.43.0" ``` I get the same error: ``` { "detail": null, "status": 403, "title": "Forbidden", "type": "https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied" } ``` using Postman and just pure python through GET request and {"Authorization": f"Bearer {access_token}"} in header ### What you think should happen instead? I expect that if I use the Airflow API backend I should have access to API if the backend code was passed.`` ### How to reproduce deploy Airflow on AKS and use custom API backend to use Azure AD token. ### Operating System PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" ### Versions of Apache Airflow Providers airflow@airflow-webserver-7fdffbcd6b-cznnc:/opt/airflow$ pip freeze | grep apache-airflow-provider apache-airflow-providers-amazon==9.2.0 apache-airflow-providers-celery==3.10.0 apache-airflow-providers-cncf-kubernetes==10.1.0 apache-airflow-providers-common-compat==1.3.0 apache-airflow-providers-common-io==1.5.0 apache-airflow-providers-common-sql==1.21.0 apache-airflow-providers-docker==4.0.0 apache-airflow-providers-elasticsearch==6.0.0 apache-airflow-providers-fab==1.5.2 apache-airflow-providers-ftp==3.12.0 apache-airflow-providers-google==12.0.0 apache-airflow-providers-grpc==3.7.0 apache-airflow-providers-hashicorp==4.0.0 apache-airflow-providers-http==5.0.0 apache-airflow-providers-imap==3.8.0 apache-airflow-providers-microsoft-azure==10.3.0 apache-airflow-providers-mysql==6.0.0 apache-airflow-providers-odbc==4.9.0 apache-airflow-providers-openlineage==2.0.0 apache-airflow-providers-postgres==6.0.0 apache-airflow-providers-redis==4.0.0 apache-airflow-providers-sendgrid==4.0.0 apache-airflow-providers-sftp==5.0.0 apache-airflow-providers-slack==9.0.0 apache-airflow-providers-smtp==1.9.0 apache-airflow-providers-snowflake==6.0.0 apache-airflow-providers-sqlite==4.0.0 apache-airflow-providers-ssh==4.0.0 ### Deployment Official Apache Airflow Helm Chart ### Deployment details AKS, Official Airflow Helm Chart. ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) GitHub link: https://github.com/apache/airflow/discussions/47029 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
