anishgirianish opened a new pull request, #60108:
URL: https://github.com/apache/airflow/pull/60108
Tasks waiting in Celery queue may have their JWT tokens expire before
execution starts. This adds a token refresh endpoint that allows the supervisor
to refresh expired tokens before task execution.
Changes:
- Add /token/refresh endpoint to Execution API
- Add client-side token refresh logic in supervisor.py
- Add tests for the new endpoint
Fixes: #59553
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
<!-- Please keep an empty line above the dashes. -->
---
---
Summary
This PR implements a token refresh mechanism for the Execution API to fix
issue #59553 where tasks waiting in Celery queue fail because their JWT tokens
expire before execution starts.
Problem
When using Celery executor with limited worker concurrency (e.g., GPU
workers), tasks can queue for extended periods. The JWT tokens issued at task
submission time may expire before the worker picks up the task, causing
authentication failures with "Token has expired" errors.
Solution
Added a token refresh endpoint that allows the supervisor to detect and
refresh expired tokens before task execution begins.
Changes
New Files
airflow-core/src/airflow/api_fastapi/execution_api/routes/tokens.py
- New /execution/token/refresh endpoint
- Accepts expired tokens and validates signature (not expiry)
- Verifies task instance exists and is in valid state (QUEUED or RUNNING)
- Issues new token with fresh expiration time
- Security: Only refreshes tokens for tasks that are legitimately waiting
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_tokens.py
- Comprehensive tests for the token refresh endpoint
- Tests for valid refresh, invalid tokens, non-existent tasks, and invalid
task states
Modified Files
airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py
- Registers the new token router on the execution API
task-sdk/src/airflow/sdk/execution_time/supervisor.py
- Added _decode_token_claims() - Decodes JWT payload without verification
- Added _is_token_expired_or_expiring() - Checks if token is expired or
expiring within buffer (60s)
- Added _refresh_token() - Calls the refresh endpoint to get a new token
- Token check happens at supervisor startup before task execution begins
task-sdk/src/airflow/sdk/api/datamodels/_generated.py
- Auto-generated datamodel updates for the new endpoint
task-sdk/tests/task_sdk/execution_time/test_supervisor.py
- Tests for token decode, expiry detection, and refresh helper functions
Testing
Environment Variables for Long-Queue Testing
When testing with tasks that queue for extended periods, use these
settings:
# Increase task queued timeout (default: 600s / 10 min)
export AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT=86400 # 24 hours
# Increase stuck-in-queued retries (default: 2)
export AIRFLOW__SCHEDULER__NUM_STUCK_IN_QUEUED_RETRIES=50
# Celery visibility timeout - already defaults to 86400 (24h)
export AIRFLOW__CELERY__VISIBILITY_TIMEOUT=86400
# Worker prefetch multiplier - already defaults to 1
export AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER=1
Test Scenario
1. Create a DAG with tasks targeting a limited-concurrency queue (e.g.,
queue="gpu_queue")
2. Trigger multiple DAG runs (e.g., 20 runs)
3. Start only 1 worker for that queue: airflow celery worker -q gpu_queue
--concurrency 1
4. Tasks will queue and their tokens will expire
5. With this fix, tokens are refreshed before execution starts
Expected Logs
When token refresh occurs:
Token expired or expiring, refreshing before execution
Token refreshed successfully
Security Considerations
- The refresh endpoint validates the JWT signature (ensures token was
issued by this server)
- Only refreshes tokens for task instances in QUEUED or RUNNING state
- Invalid or tampered tokens are rejected
- The endpoint is registered on the unauthenticated router since expired
tokens cannot pass normal auth
closes: #59553
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]