dondaum opened a new pull request, #66702:
URL: https://github.com/apache/airflow/pull/66702
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
This enables the use of Callables from the same Dag module for deadline
alert callbacks. Airflow's DAG serialization adjusts the name of the Dag module
during parsing to ensure its
[uniqueness](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/file.py#L184).
Consequently, once the callable is part of the same module, the module path
can no longer be used.
Currently, this works only if you define the Callable in a separate module
and subsequently import it into the Dag module—as described
[here](https://airflow.apache.org/docs/apache-airflow/stable/howto/deadline-alerts.html#using-callbacks).
How to reproduce
```Python
# sync_deadline_test.py
from datetime import datetime, timedelta
from airflow.sdk import SyncCallback, DAG, DeadlineAlert, DeadlineReference,
task
from airflow.providers.standard.operators.empty import EmptyOperator
def run_sync_callback(text: str):
print(text)
with DAG(
dag_id="custom_callback_deadline_alert_sync",
deadline=DeadlineAlert(
reference=DeadlineReference.DAGRUN_QUEUED_AT,
interval=timedelta(seconds=10),
callback=SyncCallback(
run_sync_callback,
kwargs={
"text": "🚨 Dag {{ dag_run.dag_id }} missed deadline at {{
deadline.deadline_time }}. DagRun: {{ dag_run }}"
},
),
),
):
c = EmptyOperator(task_id="example_task")
@task()
def wait():
import time
time.sleep(60)
c >> wait()
```
Logs
```shell
2026-05-05 10:22:17.561532+00:00 [error ] Callback execution failed
[callback_runner] callback_kwargs={'text': '🚨 Dag {{ dag_run.dag_id }} missed
deadline at {{ deadline.deadline_time }}. DagRun: {{ dag_run }}', 'context':
{'dag_run': {'dag_run_id': 'manual__2026-05-05T10:22:06.225020+00:00',
'dag_id': 'custom_callback_deadline_alert_sync', 'logical_date':
'2026-05-05T10:22:06Z', 'queued_at': '2026-05-05T10:22:06.233422Z',
'start_date': '2026-05-05T10:22:06.267176Z', 'end_date': None, 'duration':
None, 'data_interval_start': '2026-05-05T10:22:06Z', 'data_interval_end':
'2026-05-05T10:22:06Z', 'run_after': '2026-05-05T10:22:06.225020Z',
'last_scheduling_decision': '2026-05-05T10:22:16.977390Z', 'run_type':
'manual', 'state': 'running', 'triggered_by': 'ui', 'triggering_user_name':
'admin', 'conf': {}, 'note': None, 'dag_versions': [{'id':
'019df7a8-1f1a-771b-bdab-ed3e0499252d', 'version_number': 3, 'dag_id':
'custom_callback_deadline_alert_sync', 'bundle_name': 'dags-fol
der', 'bundle_version': None, 'created_at': '2026-05-05T10:21:23.610414Z',
'dag_display_name': 'custom_callback_deadline_alert_sync', 'bundle_url':
None}], 'bundle_version': None, 'dag_display_name':
'custom_callback_deadline_alert_sync', 'partition_key': None}, 'deadline':
{'id': '019df7a8-c5ac-787b-938b-4ab1a1b1d669', 'deadline_time':
'2026-05-05T10:22:16.233422Z'}}}
callback_path=unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback
error_detail=[{'exc_type': 'ModuleNotFoundError', 'exc_value': "No module
named
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'",
'exc_notes': [], 'syntax_error': None, 'is_cause': False, 'frames':
[{'filename':
'/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py',
'lineno': 111, 'name': 'execute_callback'}, {'filename':
'/usr/python/lib/python3.10/importlib/__init__.py', 'lineno': 126, 'name':
'import_module'}, {'filename': '<frozen importlib._bootstrap>', 'l
ineno': 1050, 'name': '_gcd_import'}, {'filename': '<frozen
importlib._bootstrap>', 'lineno': 1027, 'name': '_find_and_load'}, {'filename':
'<frozen importlib._bootstrap>', 'lineno': 1004, 'name':
'_find_and_load_unlocked'}], 'is_group': False, 'exceptions': []}]
error_msg="Callback execution failed: ModuleNotFoundError: No module named
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'"
loc=callback_supervisor.py:142
2026-05-05 10:22:17.562123+00:00 [error ] Callback failed
[callback_runner] error="Callback execution failed: ModuleNotFoundError: No
module named
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test'"
loc=callback_supervisor.py:218
2026-05-05T10:22:17.566313Z [info ] Workload finished
[callback_supervisor] duration=0.06016514600014489 exit_code=1
loc=callback_supervisor.py:368 workload_id=019df7a8-c5ab-7541-9d7f-c0ba0018c132
workload_type=ExecutorCallback
2026-05-05T10:22:17.566601Z [error ] Workload execution failed.
[airflow.executors.local_executor.LocalExecutor] loc=local_executor.py:110
workload_type=ExecuteCallback
Traceback (most recent call last):
File "/opt/airflow/airflow-core/src/airflow/executors/local_executor.py",
line 102, in _run_worker
BaseExecutor.run_workload(
File "/opt/airflow/airflow-core/src/airflow/executors/base_executor.py",
line 670, in run_workload
return supervise_callback(
File
"/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py",
line 376, in supervise_callback
raise RuntimeError(f"Callback subprocess exited with code {exit_code}")
RuntimeError: Callback subprocess exited with code 1
2026-05-05T10:22:18.504254Z [info ] Received executor event with state
failed for callback 019df7a8-c5ab-7541-9d7f-c0ba0018c132
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1271
2026-05-05T10:22:18.506357Z [error ] Callback
019df7a8-c5ab-7541-9d7f-c0ba0018c132 failed: Execution failed
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1296
2026-05-05T10:22:18.512786Z [info ] Getting all Callbacks
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1118
2026-05-05T10:22:18.513512Z [info ] executor
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1122
2026-05-05T10:22:18.513625Z [info ] failed
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1123
2026-05-05T10:22:18.513704Z [info ] 1
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1124
2026-05-05T10:22:18.513773Z [info ] {'path':
'unusual_prefix_895308c8175461ad72fc679b0fd85850413a13f3_sync_deadline_test.run_sync_callback',
'dag_id': 'custom_callback_deadline_alert_sync', 'kwargs': {'text': '🚨 Dag {{
dag_run.dag_id }} missed deadline at {{ deadline.deadline_time }}. DagRun: {{
dag_run }}'}, 'prefix': 'deadline_alerts', 'executor': None}
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1125
2026-05-05T10:22:18.513858Z [info ] Getting all Callbacks
[airflow.jobs.scheduler_job_runner.SchedulerJobRunner]
loc=scheduler_job_runner.py:1126
2026-05-05T10:22:18.513924Z [info ] No pending callb
```
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]