GitHub user seniuts-b2 edited a discussion: Airflow API Returns 403 Forbidden 
When Using Azure AD Authentication via Custom API Backend

### Apache Airflow version

2.10.5

### If "Other Airflow 2 version" selected, which one?

2.10.4

### What happened?

I deployed Airflow on Azure K8s (AKS) via the Airflow Official Helm Chart. For 
UI authentication I use Azure AD via OAuth2 for that I have Azure App 
Registration for handling AIrflow access via Role-based access control (RBAC). 
Everything works as expected.
There are a couple of settings in Azure App Registry:
API permissions:
![Image](https://github.com/user-attachments/assets/f920db31-62c1-4987-ab01-bb54814be438)
there are permissions for access to UI through `Delegated` permission type and 
`Application` permission type to Airflow API access via custom API backend 
through Application URI ID (using as a `scope` to fetch access token): 
![Image](https://github.com/user-attachments/assets/7765768f-1a6d-42ca-a690-6af805db42e2)

So, I use a custom Airflow API backend to access Airflow and manage Roles via 
endpoint:` 
https://airflow-ui-test.westeurope.cloudapp.azure.com/auth/fab/v1/roles`. In 
values.yaml Helm chart file in env section I have:
```
- name: AIRFLOW__API__AUTH_BACKENDS
  value: 
"airflow_utility.dag_level_access_control.azure_ad_auth_backend,airflow.api.auth.backend.session"
```

here is my custom API backend: 
`airflow_utility.dag_level_access_control.azure_ad_auth_backend`

And I get error:
```
{
    "detail": null,
    "status": 403,
    "title": "Forbidden",
    "type": 
"https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied";
}
```

There is the custom Airflow API Backend:
```
from __future__ import annotations

import os
import json
import requests
import jwt
import logging
from jwt.algorithms import RSAAlgorithm
from flask import Response, request
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast

T = TypeVar("T", bound=Callable)  # TypeVar for function decorators

# ------------------------------
# Logging Configuration
# ------------------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - 
%(message)s")
logger = logging.getLogger(__name__)

# ------------------------------
# Azure AD Configuration
# ------------------------------
# AAD_TENANT_ID and AAD_CLIENT_ID parameters passed as environment variables 
though the values.yaml file webserver section
AAD_TENANT_ID = os.getenv("AAD_TENANT_ID")
AAD_CLIENT_ID = os.getenv("AAD_CLIENT_ID")

JWKS_URL = 
f"https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys"; # JSON 
Web Key Set (JWKS) URL
ISSUER = f"https://sts.windows.net/{AAD_TENANT_ID}/"; 
#f"https://login.microsoftonline.com/{AAD_TENANT_ID}/v2.0";  # Issuer (iss) 
claim in the token
AUDIENCE = f"api://{AAD_CLIENT_ID}"  # Expected audience (should match the 
Application ID URI)
LEEWAY_SECONDS = 60  # Allow 60 seconds of clock drift for `exp`, `nbf`, `iat` 
validation

if not AAD_TENANT_ID or not AAD_CLIENT_ID:
    msg = "Missing required environment variables (should be passed as env 
variable through values.yaml file webserver secrion): AAD_TENANT_ID, 
AAD_CLIENT_ID"
    logger.error(msg)
    raise ValueError(msg)

# logger.info(f"Using AAD_TENANT_ID: {AAD_TENANT_ID}")
# logger.info(f"Using CLIENT_ID: {AAD_CLIENT_ID}")
logger.info(f"Fetching JWKS from: {JWKS_URL}")

# Cache JWKS keys to avoid frequent network requests
jwks_cache = {}

def get_azure_jwks():
    """
    Fetch and cache Azure AD JWKS (JSON Web Key Set) to verify token signatures.

    JWKS is a public key repository published by Azure AD.
    It contains multiple keys that are used to verify JWT signatures.

    Azure AD JWKS URL looks like: 
https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys

    Azure AD rotates keys from time to time, so fetching them once per process 
is enough:
    
https://learn.microsoft.com/en-us/entra/identity-platform/signing-key-rollover#:~:text=metadata%20document.-,For%20security%20purposes%2C%20the%20Microsoft%20identity%20platform%E2%80%99s%20signing%20key%20rolls%20on,a%20key%20rollover%20event%20no%20matter%20how%20frequently%20it%20may%20occur.,-If%20your%20application
    """
    global jwks_cache
    if not jwks_cache:
        logger.info("Fetching JWKS from Azure AD...")
        response = requests.get(JWKS_URL)
        response.raise_for_status()
        jwks_cache = response.json()
        logger.info("JWKS fetched and cached.")
    return jwks_cache

def validate_azure_token(token: str) -> bool:
    """
    token (str): there is received Airflow API request token of Azure AD JWT 
access token.

    JWT (JSON Web Token) has three parts:
    # Header (contains metadata like algorithm and key ID kid)
    # Payload (contains claims like iss, exp, aud, etc.)
    # Signature (ensures integrity)

    Validate an Azure AD JWT access token workflow:
    1. Extract the JWT Header → Get the "kid"
    2. Fetch JWKS from Azure → Get the list of public keys
    3. Find the matching kid → Use it to verify the JWT
    4. Decode JWT using the correct key → Validate signature and claims
    5. If everything is valid, grant access

    Short workflow: The function fetches the JWKS, finds the correct key, and 
verifies the token signature.
    """
    try:
        logger.info("Validating Azure AD access token...")

        # Decode the JWT header to get the key ID (kid):
        ## The JWT Header contains a "kid" (Key ID) field, which tells which 
public key was used to sign the JWT.
        header = jwt.get_unverified_header(token)
        logger.info(f"Token: {token}")
        logger.info(f"Token header: {header}")
        logger.info(f"Token key ID (kid): {header['kid']}")

        # Fetch JWKS and find the correct key.
        jwks = get_azure_jwks()
        # logger.info(f"JWKS: {jwks}")
        key = next((json.dumps(k) for k in jwks["keys"] if k["kid"] == 
header["kid"]), None)
        logger.info(f"JWKS key: {key}")
        if not key:
            logger.error("Token key ID (kid) not found in JWKS. Token is 
invalid.")
            return False
        logger.info("Matching JWKS key found. Verifying token signature...")

        # Decode and verify the token:
        public_key = RSAAlgorithm.from_jwk(key)
        ## Verify the jwt token signature and return the token claims: 
https://pyjwt.readthedocs.io/en/stable/api.html#jwt.decode
        decoded_token = jwt.decode(
            jwt=token,
            key=public_key,
            algorithms=["RS256"],
            # extended decoding and validation options
            audience=AUDIENCE,  # Ensures token is meant for this API
            issuer=ISSUER,  # Ensures token was issued by Azure AD
            # leeway=LEEWAY_SECONDS,  # Allow 60 seconds of clock drift for 
exp/nbf validation
            options={
                "verify_signature": False,  # Ensure signature is verified
                "require": ["exp", "iat", "nbf"],  # Ensure these claims exist
                "verify_exp": True,  # Ensure token is not expired
                "verify_iat": True,  # Ensure issued-at (iat) is valid
                "verify_nbf": True,  # Ensure not-before (nbf) is valid
                "verify_aud": True,  # Ensure audience (aud) claim is valid
                "verify_iss": True,  # Ensure issuer (iss) claim is valid
                "strict_aud": False  # check that the aud claim is a single 
value (not a list), and matches audience exactly
            }
        )
        logger.info(f"Decoded token: {decoded_token}")

        logger.info("Token is valid and contains required scope. Access 
granted.")
        return True  # Token is valid

    except jwt.ExpiredSignatureError as e:
        logger.error(f"Token has expired. Access denied: {e}")
        return False
    except jwt.InvalidAudienceError as e:
        logger.error(f"Invalid audience. Expected: {AUDIENCE}: {e}")
        return False
    except jwt.InvalidIssuerError as e:
        logger.error(f"Invalid issuer. Expected: {ISSUER}: {e}")
        return False
    except jwt.InvalidTokenError as e:
        logger.error(f"Invalid token error: {e}")
        return False

def requires_authentication(function: T):
    """
    Decorator to enforce authentication on API endpoints using Azure AD token.
    """

    @wraps(function)
    def decorated(*args, **kwargs):
        logger.info(" ### Custome Airflow API Backend to validate Azure AD 
Authentication ###")
        logger.info(f"Executing function: {function.__name__}")  # Extract 
function name
        logger.info(f"request: {request}")

        auth_header = request.headers.get("Authorization")
        logger.info(f"auth_header: {auth_header}")

        if not auth_header or not auth_header.startswith("Bearer "):
            logger.error("Missing or malformed Authorization header. Access 
denied.")
            return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"})

        # As the token is the part of Authorization header we can extract it by 
splitting the header (authorization header is in the format "Bearer <token>")
        token = auth_header.split(" ")[1]

        if not validate_azure_token(token):
            logger.error("Invalid or expired token. Access denied.")
            return Response("Unauthorized", 401, {"WWW-Authenticate": "Bearer"})

        logger.info("Authentication successful. Processing request.")
        return function(*args, **kwargs)

    return cast(T, decorated)
    # return decorated

def init_app(_):
    """
    Initialize the authentication backend (required by Airflow).
    """

```

I use `"verify_signature": False` because using `jwt.decode` function I get an 
every time error that the signature is invalid. If someone knows why and how to 
fix it, I would be glad to get any recommendations. But this isn't the main 
question.
The main question is:
Why do I get the error above? I expect to have access if I use a custom API 
backend to validate the token. The gotten token includes claim `roles` (for 
mapping Airflow roles and App registration roles) as I use `Application` 
permission type in App registry API permissions.

There are a couple of logs from API backend:
```
[2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:153} INFO -  ### 
Custome Airflow API Backend to validate Azure AD Authentication ###
[2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:154} INFO - Executing 
function: Response
[2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:155} INFO - request: 
<Request 
'https://airflow-ui-test.westeurope.cloudapp.azure.com/api/v1/roles/DataEngineer'
 [GET]>
[2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:158} INFO - 
auth_header: Bearer ...{{token}}...
[2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:85} INFO - Validating 
Azure AD access token...
[2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:90} INFO - Token: 
...{token}...
[2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:91} INFO - Token 
header: {'typ': 'JWT', 'alg': 'RS256', 'x5t': 'imi0Y2z0dYKxBttAqK_Tt5hYBTk', 
'kid': '{{ some kid}}'}
[2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:92} INFO - Token key 
ID (kid): {{ some kid}}
[2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:98} INFO - JWKS key: 
{"kty": "RSA", "use": "sig", "kid": "{{some kid}}", "x5t": "{{some values}}", 
"n": "{{some values}}", "e": "AQAB", "x5c": ["{{some values}}"], 
"cloud_instance_name": "microsoftonline.com", "issuer": 
"https://login.microsoftonline.com/{{ tenant_id }}/v2.0"}
[2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:102} INFO - Matching 
JWKS key found. Verifying token signature...
[2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:107} INFO - !!!! 
Public key: <cryptography.hazmat.bindings._rust.openssl.rsa.RSAPublicKey object 
at 0x7f11e134aa50>
[2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:128} INFO - Decoded 
token: {'aud': 'api://{{ app client id }}', 'iss': 'https://sts.windows.net/{{ 
tenant_id }}/', 'iat': 1740401432, 'nbf': 1740401432, 'exp': 1740405332, 'aio': 
'{{some values}}', 'appid': '{{ app client id }}', 'appidacr': '1', 'idp': 
'https://sts.windows.net/{{ tenant id }}/', 'oid': '{{some values}}', 'rh': 
'{{some values}}', 'roles': ['Admin'], 'sub': '{{some values}}', 'tid': '{{some 
values}}', 'uti': '{{some values}}', 'ver': '1.0'}
[2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:130} INFO - Token is 
valid and contains required scope. Access granted.
[2025-02-24T13:00:04.018+0000] {azure_ad_auth_backend.py:171} INFO - 
Authentication successful. Processing request.
10.224.0.199 - - [24/Feb/2025:13:00:04 +0000] "GET /api/v1/roles/DataEngineer 
HTTP/1.1" 403 186 "-" "PostmanRuntime/7.43.0"
```

I get the same error:
```
{
    "detail": null,
    "status": 403,
    "title": "Forbidden",
    "type": 
"https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied";
}
```
using Postman and just pure python through GET  request  and {"Authorization": 
f"Bearer {access_token}"} in header


### What you think should happen instead?

I expect that if I use the Airflow API backend I should have access to API if 
the backend code was passed.``

### How to reproduce

deploy Airflow on AKS and use custom API backend to use Azure AD token.

### Operating System

PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" 
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";

### Versions of Apache Airflow Providers

airflow@airflow-webserver-7fdffbcd6b-cznnc:/opt/airflow$ pip freeze | grep 
apache-airflow-provider
apache-airflow-providers-amazon==9.2.0
apache-airflow-providers-celery==3.10.0
apache-airflow-providers-cncf-kubernetes==10.1.0
apache-airflow-providers-common-compat==1.3.0
apache-airflow-providers-common-io==1.5.0
apache-airflow-providers-common-sql==1.21.0
apache-airflow-providers-docker==4.0.0
apache-airflow-providers-elasticsearch==6.0.0
apache-airflow-providers-fab==1.5.2
apache-airflow-providers-ftp==3.12.0
apache-airflow-providers-google==12.0.0
apache-airflow-providers-grpc==3.7.0
apache-airflow-providers-hashicorp==4.0.0
apache-airflow-providers-http==5.0.0
apache-airflow-providers-imap==3.8.0
apache-airflow-providers-microsoft-azure==10.3.0
apache-airflow-providers-mysql==6.0.0
apache-airflow-providers-odbc==4.9.0
apache-airflow-providers-openlineage==2.0.0
apache-airflow-providers-postgres==6.0.0
apache-airflow-providers-redis==4.0.0
apache-airflow-providers-sendgrid==4.0.0
apache-airflow-providers-sftp==5.0.0
apache-airflow-providers-slack==9.0.0
apache-airflow-providers-smtp==1.9.0
apache-airflow-providers-snowflake==6.0.0
apache-airflow-providers-sqlite==4.0.0
apache-airflow-providers-ssh==4.0.0

### Deployment

Official Apache Airflow Helm Chart

### Deployment details

AKS, Official Airflow Helm Chart.

### Anything else?

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)


GitHub link: https://github.com/apache/airflow/discussions/47029

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to