seniuts-b2 opened a new issue, #47025:
URL: https://github.com/apache/airflow/issues/47025

   ### Apache Airflow version
   
   2.10.5
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.10.4
   
   ### What happened?
   
   I deployed Airflow on Azure K8s (AKS) via the Airflow Official Helm Chart. 
For UI authentication I use Azure AD via OAuth2 for that I have Azure App 
Registration for handling AIrflow access via Role-based access control (RBAC). 
Everything works as expected.
   There are a couple of settings in Azure App Registry:
   API permissions:
   
![Image](https://github.com/user-attachments/assets/f920db31-62c1-4987-ab01-bb54814be438)
   there are permissions for access to UI through `Delegated` permission type 
and `Application` permission type to Airflow API access via custom API backend 
through Application URI ID (using as a `scope` to fetch access token): 
   
![Image](https://github.com/user-attachments/assets/7765768f-1a6d-42ca-a690-6af805db42e2)
   
   So, I use a custom Airflow API backend to access Airflow and manage Roles 
via endpoint:` 
https://airflow-ui-test.westeurope.cloudapp.azure.com/auth/fab/v1/roles`. In 
values.yaml Helm chart file in env section I have:
   ```
   - name: AIRFLOW__API__AUTH_BACKENDS
     value: 
"airflow_utility.dag_level_access_control.azure_ad_auth_backend,airflow.api.auth.backend.session"
   ```
   
   here is my custom API backend: 
`airflow_utility.dag_level_access_control.azure_ad_auth_backend`
   
   And I get error:
   ```
   {
       "detail": null,
       "status": 403,
       "title": "Forbidden",
       "type": 
"https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied";
   }
   ```
   
   There is the custom Airflow API Backend:
   ```
   from __future__ import annotations
   
   import os
   import json
   import requests
   import jwt
   import logging
   from jwt.algorithms import RSAAlgorithm
   from flask import Response, request
   from functools import wraps
   from typing import TYPE_CHECKING, Any, Callable, TypeVar, cast
   
   T = TypeVar("T", bound=Callable)  # TypeVar for function decorators
   
   # ------------------------------
   # Logging Configuration
   # ------------------------------
   logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s 
- %(message)s")
   logger = logging.getLogger(__name__)
   
   # ------------------------------
   # Azure AD Configuration
   # ------------------------------
   # AAD_TENANT_ID and AAD_CLIENT_ID parameters passed as environment variables 
though the values.yaml file webserver section
   AAD_TENANT_ID = os.getenv("AAD_TENANT_ID")
   AAD_CLIENT_ID = os.getenv("AAD_CLIENT_ID")
   
   JWKS_URL = 
f"https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys"; # JSON 
Web Key Set (JWKS) URL
   ISSUER = f"https://sts.windows.net/{AAD_TENANT_ID}/"; 
#f"https://login.microsoftonline.com/{AAD_TENANT_ID}/v2.0";  # Issuer (iss) 
claim in the token
   AUDIENCE = f"api://{AAD_CLIENT_ID}"  # Expected audience (should match the 
Application ID URI)
   LEEWAY_SECONDS = 60  # Allow 60 seconds of clock drift for `exp`, `nbf`, 
`iat` validation
   
   if not AAD_TENANT_ID or not AAD_CLIENT_ID:
       msg = "Missing required environment variables (should be passed as env 
variable through values.yaml file webserver secrion): AAD_TENANT_ID, 
AAD_CLIENT_ID"
       logger.error(msg)
       raise ValueError(msg)
   
   # logger.info(f"Using AAD_TENANT_ID: {AAD_TENANT_ID}")
   # logger.info(f"Using CLIENT_ID: {AAD_CLIENT_ID}")
   logger.info(f"Fetching JWKS from: {JWKS_URL}")
   
   # Cache JWKS keys to avoid frequent network requests
   jwks_cache = {}
   
   def get_azure_jwks():
       """
       Fetch and cache Azure AD JWKS (JSON Web Key Set) to verify token 
signatures.
   
       JWKS is a public key repository published by Azure AD.
       It contains multiple keys that are used to verify JWT signatures.
   
       Azure AD JWKS URL looks like: 
https://login.microsoftonline.com/{AAD_TENANT_ID}/discovery/v2.0/keys
   
       Azure AD rotates keys from time to time, so fetching them once per 
process is enough:
       
https://learn.microsoft.com/en-us/entra/identity-platform/signing-key-rollover#:~:text=metadata%20document.-,For%20security%20purposes%2C%20the%20Microsoft%20identity%20platform%E2%80%99s%20signing%20key%20rolls%20on,a%20key%20rollover%20event%20no%20matter%20how%20frequently%20it%20may%20occur.,-If%20your%20application
       """
       global jwks_cache
       if not jwks_cache:
           logger.info("Fetching JWKS from Azure AD...")
           response = requests.get(JWKS_URL)
           response.raise_for_status()
           jwks_cache = response.json()
           logger.info("JWKS fetched and cached.")
       return jwks_cache
   
   def validate_azure_token(token: str) -> bool:
       """
       token (str): there is received Airflow API request token of Azure AD JWT 
access token.
   
       JWT (JSON Web Token) has three parts:
       # Header (contains metadata like algorithm and key ID kid)
       # Payload (contains claims like iss, exp, aud, etc.)
       # Signature (ensures integrity)
   
       Validate an Azure AD JWT access token workflow:
       1. Extract the JWT Header → Get the "kid"
       2. Fetch JWKS from Azure → Get the list of public keys
       3. Find the matching kid → Use it to verify the JWT
       4. Decode JWT using the correct key → Validate signature and claims
       5. If everything is valid, grant access
   
       Short workflow: The function fetches the JWKS, finds the correct key, 
and verifies the token signature.
       """
       try:
           logger.info("Validating Azure AD access token...")
   
           # Decode the JWT header to get the key ID (kid):
           ## The JWT Header contains a "kid" (Key ID) field, which tells which 
public key was used to sign the JWT.
           header = jwt.get_unverified_header(token)
           logger.info(f"Token: {token}")
           logger.info(f"Token header: {header}")
           logger.info(f"Token key ID (kid): {header['kid']}")
   
           # Fetch JWKS and find the correct key.
           jwks = get_azure_jwks()
           # logger.info(f"JWKS: {jwks}")
           key = next((json.dumps(k) for k in jwks["keys"] if k["kid"] == 
header["kid"]), None)
           logger.info(f"JWKS key: {key}")
           if not key:
               logger.error("Token key ID (kid) not found in JWKS. Token is 
invalid.")
               return False
           logger.info("Matching JWKS key found. Verifying token signature...")
   
           # Decode and verify the token:
           public_key = RSAAlgorithm.from_jwk(key)
           ## Verify the jwt token signature and return the token claims: 
https://pyjwt.readthedocs.io/en/stable/api.html#jwt.decode
           decoded_token = jwt.decode(
               jwt=token,
               key=public_key,
               algorithms=["RS256"],
               # extended decoding and validation options
               audience=AUDIENCE,  # Ensures token is meant for this API
               issuer=ISSUER,  # Ensures token was issued by Azure AD
               # leeway=LEEWAY_SECONDS,  # Allow 60 seconds of clock drift for 
exp/nbf validation
               options={
                   "verify_signature": False,  # Ensure signature is verified
                   "require": ["exp", "iat", "nbf"],  # Ensure these claims 
exist
                   "verify_exp": True,  # Ensure token is not expired
                   "verify_iat": True,  # Ensure issued-at (iat) is valid
                   "verify_nbf": True,  # Ensure not-before (nbf) is valid
                   "verify_aud": True,  # Ensure audience (aud) claim is valid
                   "verify_iss": True,  # Ensure issuer (iss) claim is valid
                   "strict_aud": False  # check that the aud claim is a single 
value (not a list), and matches audience exactly
               }
           )
           logger.info(f"Decoded token: {decoded_token}")
   
           logger.info("Token is valid and contains required scope. Access 
granted.")
           return True  # Token is valid
   
       except jwt.ExpiredSignatureError as e:
           logger.error(f"Token has expired. Access denied: {e}")
           return False
       except jwt.InvalidAudienceError as e:
           logger.error(f"Invalid audience. Expected: {AUDIENCE}: {e}")
           return False
       except jwt.InvalidIssuerError as e:
           logger.error(f"Invalid issuer. Expected: {ISSUER}: {e}")
           return False
       except jwt.InvalidTokenError as e:
           logger.error(f"Invalid token error: {e}")
           return False
   
   def requires_authentication(function: T):
       """
       Decorator to enforce authentication on API endpoints using Azure AD 
token.
       """
   
       @wraps(function)
       def decorated(*args, **kwargs):
           logger.info(" ### Custome Airflow API Backend to validate Azure AD 
Authentication ###")
           logger.info(f"Executing function: {function.__name__}")  # Extract 
function name
           logger.info(f"request: {request}")
   
           auth_header = request.headers.get("Authorization")
           logger.info(f"auth_header: {auth_header}")
   
           if not auth_header or not auth_header.startswith("Bearer "):
               logger.error("Missing or malformed Authorization header. Access 
denied.")
               return Response("Unauthorized", 401, {"WWW-Authenticate": 
"Bearer"})
   
           # As the token is the part of Authorization header we can extract it 
by splitting the header (authorization header is in the format "Bearer <token>")
           token = auth_header.split(" ")[1]
   
           if not validate_azure_token(token):
               logger.error("Invalid or expired token. Access denied.")
               return Response("Unauthorized", 401, {"WWW-Authenticate": 
"Bearer"})
   
           logger.info("Authentication successful. Processing request.")
           return function(*args, **kwargs)
   
       return cast(T, decorated)
       # return decorated
   
   def init_app(_):
       """
       Initialize the authentication backend (required by Airflow).
       """
   
   ```
   
   I use `"verify_signature": False` because using `jwt.decode` function I get 
an every time error that the signature is invalid. If someone knows why and how 
to fix it, I would be glad to get any recommendations. But this isn't the main 
question.
   The main question is:
   Why do I get the error above? I expect to have access if I use a custom API 
backend to validate the token. The gotten token includes claim `roles` (for 
mapping Airflow roles and App registration roles) as I use `Application` 
permission type in App registry API permissions.
   
   There are a couple of logs from API backend:
   ```
   [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:153} INFO -  ### 
Custome Airflow API Backend to validate Azure AD Authentication ###
   [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:154} INFO - 
Executing function: Response
   [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:155} INFO - 
request: <Request 
'https://airflow-ui-test.westeurope.cloudapp.azure.com/api/v1/roles/DataEngineer'
 [GET]>
   [2025-02-24T13:00:04.015+0000] {azure_ad_auth_backend.py:158} INFO - 
auth_header: Bearer ...{{token}}...
   [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:85} INFO - 
Validating Azure AD access token...
   [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:90} INFO - Token: 
...{token}...
   [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:91} INFO - Token 
header: {'typ': 'JWT', 'alg': 'RS256', 'x5t': 'imi0Y2z0dYKxBttAqK_Tt5hYBTk', 
'kid': '{{ some kid}}'}
   [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:92} INFO - Token 
key ID (kid): {{ some kid}}
   [2025-02-24T13:00:04.016+0000] {azure_ad_auth_backend.py:98} INFO - JWKS 
key: {"kty": "RSA", "use": "sig", "kid": "{{some kid}}", "x5t": "{{some 
values}}", "n": "{{some values}}", "e": "AQAB", "x5c": ["{{some values}}"], 
"cloud_instance_name": "microsoftonline.com", "issuer": 
"https://login.microsoftonline.com/{{ tenant_id }}/v2.0"}
   [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:102} INFO - 
Matching JWKS key found. Verifying token signature...
   [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:107} INFO - !!!! 
Public key: <cryptography.hazmat.bindings._rust.openssl.rsa.RSAPublicKey object 
at 0x7f11e134aa50>
   [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:128} INFO - Decoded 
token: {'aud': 'api://{{ app client id }}', 'iss': 'https://sts.windows.net/{{ 
tenant_id }}/', 'iat': 1740401432, 'nbf': 1740401432, 'exp': 1740405332, 'aio': 
'{{some values}}', 'appid': '{{ app client id }}', 'appidacr': '1', 'idp': 
'https://sts.windows.net/{{ tenant id }}/', 'oid': '{{some values}}', 'rh': 
'{{some values}}', 'roles': ['Admin'], 'sub': '{{some values}}', 'tid': '{{some 
values}}', 'uti': '{{some values}}', 'ver': '1.0'}
   [2025-02-24T13:00:04.017+0000] {azure_ad_auth_backend.py:130} INFO - Token 
is valid and contains required scope. Access granted.
   [2025-02-24T13:00:04.018+0000] {azure_ad_auth_backend.py:171} INFO - 
Authentication successful. Processing request.
   10.224.0.199 - - [24/Feb/2025:13:00:04 +0000] "GET 
/api/v1/roles/DataEngineer HTTP/1.1" 403 186 "-" "PostmanRuntime/7.43.0"
   ```
   
   I get the same error:
   ```
   {
       "detail": null,
       "status": 403,
       "title": "Forbidden",
       "type": 
"https://airflow.apache.org/docs/apache-airflow/2.10.5/stable-rest-api-ref.html#section/Errors/PermissionDenied";
   }
   ```
   using Postman and just pure python through GET  request  and 
{"Authorization": f"Bearer {access_token}"} in header
   
   
   ### What you think should happen instead?
   
   I expect that if I use the Airflow API backend I should have access to API 
if the backend code was passed.``
   
   ### How to reproduce
   
   deploy Airflow on AKS and use custom API backend to use Azure AD token.
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" 
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   airflow@airflow-webserver-7fdffbcd6b-cznnc:/opt/airflow$ pip freeze | grep 
apache-airflow-provider
   apache-airflow-providers-amazon==9.2.0
   apache-airflow-providers-celery==3.10.0
   apache-airflow-providers-cncf-kubernetes==10.1.0
   apache-airflow-providers-common-compat==1.3.0
   apache-airflow-providers-common-io==1.5.0
   apache-airflow-providers-common-sql==1.21.0
   apache-airflow-providers-docker==4.0.0
   apache-airflow-providers-elasticsearch==6.0.0
   apache-airflow-providers-fab==1.5.2
   apache-airflow-providers-ftp==3.12.0
   apache-airflow-providers-google==12.0.0
   apache-airflow-providers-grpc==3.7.0
   apache-airflow-providers-hashicorp==4.0.0
   apache-airflow-providers-http==5.0.0
   apache-airflow-providers-imap==3.8.0
   apache-airflow-providers-microsoft-azure==10.3.0
   apache-airflow-providers-mysql==6.0.0
   apache-airflow-providers-odbc==4.9.0
   apache-airflow-providers-openlineage==2.0.0
   apache-airflow-providers-postgres==6.0.0
   apache-airflow-providers-redis==4.0.0
   apache-airflow-providers-sendgrid==4.0.0
   apache-airflow-providers-sftp==5.0.0
   apache-airflow-providers-slack==9.0.0
   apache-airflow-providers-smtp==1.9.0
   apache-airflow-providers-snowflake==6.0.0
   apache-airflow-providers-sqlite==4.0.0
   apache-airflow-providers-ssh==4.0.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   AKS, Official Airflow Helm Chart.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to