anishgirianish opened a new pull request, #60108:
URL: https://github.com/apache/airflow/pull/60108

   Tasks waiting in Celery queue may have their JWT tokens expire before 
execution starts. This adds a token refresh endpoint that allows the supervisor 
to refresh expired tokens before task execution.
   
   Changes:
   - Add /token/refresh endpoint to Execution API
   - Add client-side token refresh logic in supervisor.py
   - Add tests for the new endpoint
   
   Fixes: #59553
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
     ---
     Summary
   
     This PR implements a token refresh mechanism for the Execution API to fix 
issue #59553 where tasks waiting in Celery queue fail because their JWT tokens 
expire before execution starts.
   
     Problem
   
     When using Celery executor with limited worker concurrency (e.g., GPU 
workers), tasks can queue for extended periods. The JWT tokens issued at task 
submission time may expire before the worker picks up the task, causing 
authentication failures with "Token has expired" errors.
   
     Solution
   
     Added a token refresh endpoint that allows the supervisor to detect and 
refresh expired tokens before task execution begins.
   
     Changes
   
     New Files
   
     airflow-core/src/airflow/api_fastapi/execution_api/routes/tokens.py
     - New /execution/token/refresh endpoint
     - Accepts expired tokens and validates signature (not expiry)
     - Verifies task instance exists and is in valid state (QUEUED or RUNNING)
     - Issues new token with fresh expiration time
     - Security: Only refreshes tokens for tasks that are legitimately waiting
   
     
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_tokens.py
     - Comprehensive tests for the token refresh endpoint
     - Tests for valid refresh, invalid tokens, non-existent tasks, and invalid 
task states
   
     Modified Files
   
     airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
     - Registers the new token router on the execution API
   
     task-sdk/src/airflow/sdk/execution_time/supervisor.py
     - Added _decode_token_claims() - Decodes JWT payload without verification
     - Added _is_token_expired_or_expiring() - Checks if token is expired or 
expiring within buffer (60s)
     - Added _refresh_token() - Calls the refresh endpoint to get a new token
     - Token check happens at supervisor startup before task execution begins
   
     task-sdk/src/airflow/sdk/api/datamodels/_generated.py
     - Auto-generated datamodel updates for the new endpoint
   
     task-sdk/tests/task_sdk/execution_time/test_supervisor.py
     - Tests for token decode, expiry detection, and refresh helper functions
   
     Testing
   
     Environment Variables for Long-Queue Testing
   
     When testing with tasks that queue for extended periods, use these 
settings:
   
     # Increase task queued timeout (default: 600s / 10 min)
     export AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT=86400  # 24 hours
   
     # Increase stuck-in-queued retries (default: 2)
     export AIRFLOW__SCHEDULER__NUM_STUCK_IN_QUEUED_RETRIES=50
   
     # Celery visibility timeout - already defaults to 86400 (24h)
     export AIRFLOW__CELERY__VISIBILITY_TIMEOUT=86400
   
     # Worker prefetch multiplier - already defaults to 1
     export AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER=1
   
     Test Scenario
   
     1. Create a DAG with tasks targeting a limited-concurrency queue (e.g., 
queue="gpu_queue")
     2. Trigger multiple DAG runs (e.g., 20 runs)
     3. Start only 1 worker for that queue: airflow celery worker -q gpu_queue 
--concurrency 1
     4. Tasks will queue and their tokens will expire
     5. With this fix, tokens are refreshed before execution starts
   
     Expected Logs
   
     When token refresh occurs:
     Token expired or expiring, refreshing before execution
     Token refreshed successfully
   
     Security Considerations
   
     - The refresh endpoint validates the JWT signature (ensures token was 
issued by this server)
     - Only refreshes tokens for task instances in QUEUED or RUNNING state
     - Invalid or tampered tokens are rejected
     - The endpoint is registered on the unauthenticated router since expired 
tokens cannot pass normal auth
   
     closes: #59553
   
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to